1   
  2   
  3   
  4   
  5   
  6   
  7   
  8   
  9   
 10   
 11   
 12   
 13   
 14   
 15   
 16   
 17   
 18  from numpy import array, dot 
 19  from math import sqrt 
 20  from pyspark import SparkContext 
 21  from pyspark.mllib._common import \ 
 22      _get_unmangled_rdd, _get_unmangled_double_vector_rdd, _squared_distance, \ 
 23      _serialize_double_matrix, _deserialize_double_matrix, \ 
 24      _serialize_double_vector, _deserialize_double_vector, \ 
 25      _get_initial_weights, _serialize_rating, _regression_train_wrapper 
 26  from pyspark.mllib.linalg import SparseVector 
 30      """A clustering model derived from the k-means method. 
 31   
 32      >>> data = array([0.0,0.0, 1.0,1.0, 9.0,8.0, 8.0,9.0]).reshape(4,2) 
 33      >>> model = KMeans.train( 
 34      ...     sc.parallelize(data), 2, maxIterations=10, runs=30, initializationMode="random") 
 35      >>> model.predict(array([0.0, 0.0])) == model.predict(array([1.0, 1.0])) 
 36      True 
 37      >>> model.predict(array([8.0, 9.0])) == model.predict(array([9.0, 8.0])) 
 38      True 
 39      >>> model = KMeans.train(sc.parallelize(data), 2) 
 40      >>> sparse_data = [ 
 41      ...     SparseVector(3, {1: 1.0}), 
 42      ...     SparseVector(3, {1: 1.1}), 
 43      ...     SparseVector(3, {2: 1.0}), 
 44      ...     SparseVector(3, {2: 1.1}) 
 45      ... ] 
 46      >>> model = KMeans.train(sc.parallelize(sparse_data), 2, initializationMode="k-means||") 
 47      >>> model.predict(array([0., 1., 0.])) == model.predict(array([0, 1.1, 0.])) 
 48      True 
 49      >>> model.predict(array([0., 0., 1.])) == model.predict(array([0, 0, 1.1])) 
 50      True 
 51      >>> model.predict(sparse_data[0]) == model.predict(sparse_data[1]) 
 52      True 
 53      >>> model.predict(sparse_data[2]) == model.predict(sparse_data[3]) 
 54      True 
 55      >>> type(model.clusterCenters) 
 56      <type 'list'> 
 57      """ 
 59          self.centers = centers 
  60   
 61      @property 
 63          """Get the cluster centers, represented as a list of NumPy arrays.""" 
 64          return self.centers 
  65   
 67          """Find the cluster to which x belongs in this model.""" 
 68          best = 0 
 69          best_distance = float("inf") 
 70          for i in range(0, len(self.centers)): 
 71              distance = _squared_distance(x, self.centers[i]) 
 72              if distance < best_distance: 
 73                  best = i 
 74                  best_distance = distance 
 75          return best 
   76   
 79      @classmethod 
 80 -    def train(cls, data, k, maxIterations=100, runs=1, initializationMode="k-means||"): 
  81          """Train a k-means clustering model.""" 
 82          sc = data.context 
 83          dataBytes = _get_unmangled_double_vector_rdd(data) 
 84          ans = sc._jvm.PythonMLLibAPI().trainKMeansModel( 
 85              dataBytes._jrdd, k, maxIterations, runs, initializationMode) 
 86          if len(ans) != 1: 
 87              raise RuntimeError("JVM call result had unexpected length") 
 88          elif type(ans[0]) != bytearray: 
 89              raise RuntimeError("JVM call result had first element of type " 
 90                                 + type(ans[0]) + " which is not bytearray") 
 91          matrix = _deserialize_double_matrix(ans[0]) 
 92          return KMeansModel([row for row in matrix]) 
   93   
 96      import doctest 
 97      globs = globals().copy() 
 98      globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) 
 99      (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) 
100      globs['sc'].stop() 
101      if failure_count: 
102          exit(-1) 
 103   
104   
105  if __name__ == "__main__": 
106      _test() 
107