1   
  2   
  3   
  4   
  5   
  6   
  7   
  8   
  9   
 10   
 11   
 12   
 13   
 14   
 15   
 16   
 17   
 18  import numpy 
 19   
 20  from numpy import array, shape 
 21  from pyspark import SparkContext 
 22  from pyspark.mllib._common import \ 
 23      _dot, _get_unmangled_rdd, _get_unmangled_double_vector_rdd, \ 
 24      _serialize_double_matrix, _deserialize_double_matrix, \ 
 25      _serialize_double_vector, _deserialize_double_vector, \ 
 26      _get_initial_weights, _serialize_rating, _regression_train_wrapper, \ 
 27      _linear_predictor_typecheck, _get_unmangled_labeled_point_rdd 
 28  from pyspark.mllib.linalg import SparseVector 
 29  from pyspark.mllib.regression import LabeledPoint, LinearModel 
 30  from math import exp, log 
 34      """A linear binary classification model derived from logistic regression. 
 35   
 36      >>> data = [ 
 37      ...     LabeledPoint(0.0, [0.0]), 
 38      ...     LabeledPoint(1.0, [1.0]), 
 39      ...     LabeledPoint(1.0, [2.0]), 
 40      ...     LabeledPoint(1.0, [3.0]) 
 41      ... ] 
 42      >>> lrm = LogisticRegressionWithSGD.train(sc.parallelize(data)) 
 43      >>> lrm.predict(array([1.0])) > 0 
 44      True 
 45      >>> lrm.predict(array([0.0])) <= 0 
 46      True 
 47      >>> sparse_data = [ 
 48      ...     LabeledPoint(0.0, SparseVector(2, {0: 0.0})), 
 49      ...     LabeledPoint(1.0, SparseVector(2, {1: 1.0})), 
 50      ...     LabeledPoint(0.0, SparseVector(2, {0: 0.0})), 
 51      ...     LabeledPoint(1.0, SparseVector(2, {1: 2.0})) 
 52      ... ] 
 53      >>> lrm = LogisticRegressionWithSGD.train(sc.parallelize(sparse_data)) 
 54      >>> lrm.predict(array([0.0, 1.0])) > 0 
 55      True 
 56      >>> lrm.predict(array([0.0, 0.0])) <= 0 
 57      True 
 58      >>> lrm.predict(SparseVector(2, {1: 1.0})) > 0 
 59      True 
 60      >>> lrm.predict(SparseVector(2, {1: 0.0})) <= 0 
 61      True 
 62      """ 
 64          _linear_predictor_typecheck(x, self._coeff) 
 65          margin = _dot(x, self._coeff) + self._intercept 
 66          prob = 1/(1 + exp(-margin)) 
 67          return 1 if prob > 0.5 else 0 
   68   
 71      @classmethod 
 72 -    def train(cls, data, iterations=100, step=1.0, miniBatchFraction=1.0, initialWeights=None): 
  73          """Train a logistic regression model on the given data.""" 
 74          sc = data.context 
 75          train_func = lambda d, i: sc._jvm.PythonMLLibAPI().trainLogisticRegressionModelWithSGD( 
 76              d._jrdd, iterations, step, miniBatchFraction, i) 
 77          return _regression_train_wrapper(sc, train_func, LogisticRegressionModel, data, 
 78                                           initialWeights) 
   79   
 82      """A support vector machine. 
 83   
 84      >>> data = [ 
 85      ...     LabeledPoint(0.0, [0.0]), 
 86      ...     LabeledPoint(1.0, [1.0]), 
 87      ...     LabeledPoint(1.0, [2.0]), 
 88      ...     LabeledPoint(1.0, [3.0]) 
 89      ... ] 
 90      >>> svm = SVMWithSGD.train(sc.parallelize(data)) 
 91      >>> svm.predict(array([1.0])) > 0 
 92      True 
 93      >>> sparse_data = [ 
 94      ...     LabeledPoint(0.0, SparseVector(2, {0: -1.0})), 
 95      ...     LabeledPoint(1.0, SparseVector(2, {1: 1.0})), 
 96      ...     LabeledPoint(0.0, SparseVector(2, {0: 0.0})), 
 97      ...     LabeledPoint(1.0, SparseVector(2, {1: 2.0})) 
 98      ... ] 
 99      >>> svm = SVMWithSGD.train(sc.parallelize(sparse_data)) 
100      >>> svm.predict(SparseVector(2, {1: 1.0})) > 0 
101      True 
102      >>> svm.predict(SparseVector(2, {0: -1.0})) <= 0 
103      True 
104      """ 
106          _linear_predictor_typecheck(x, self._coeff) 
107          margin = _dot(x, self._coeff) + self._intercept 
108          return 1 if margin >= 0 else 0 
  109   
112      @classmethod 
113 -    def train(cls, data, iterations=100, step=1.0, regParam=1.0, 
114                miniBatchFraction=1.0, initialWeights=None): 
 115          """Train a support vector machine on the given data.""" 
116          sc = data.context 
117          train_func = lambda d, i: sc._jvm.PythonMLLibAPI().trainSVMModelWithSGD( 
118              d._jrdd, iterations, step, regParam, miniBatchFraction, i) 
119          return _regression_train_wrapper(sc, train_func, SVMModel, data, initialWeights) 
  120   
123      """ 
124      Model for Naive Bayes classifiers. 
125   
126      Contains two parameters: 
127      - pi: vector of logs of class priors (dimension C) 
128      - theta: matrix of logs of class conditional probabilities (CxD) 
129   
130      >>> data = [ 
131      ...     LabeledPoint(0.0, [0.0, 0.0]), 
132      ...     LabeledPoint(0.0, [0.0, 1.0]), 
133      ...     LabeledPoint(1.0, [1.0, 0.0]), 
134      ... ] 
135      >>> model = NaiveBayes.train(sc.parallelize(data)) 
136      >>> model.predict(array([0.0, 1.0])) 
137      0.0 
138      >>> model.predict(array([1.0, 0.0])) 
139      1.0 
140      >>> sparse_data = [ 
141      ...     LabeledPoint(0.0, SparseVector(2, {1: 0.0})), 
142      ...     LabeledPoint(0.0, SparseVector(2, {1: 1.0})), 
143      ...     LabeledPoint(1.0, SparseVector(2, {0: 1.0})) 
144      ... ] 
145      >>> model = NaiveBayes.train(sc.parallelize(sparse_data)) 
146      >>> model.predict(SparseVector(2, {1: 1.0})) 
147      0.0 
148      >>> model.predict(SparseVector(2, {0: 1.0})) 
149      1.0 
150      """ 
151   
153          self.labels = labels 
154          self.pi = pi 
155          self.theta = theta 
 156   
158          """Return the most likely class for a data vector x""" 
159          return self.labels[numpy.argmax(self.pi + _dot(x, self.theta.transpose()))] 
  160   
163      @classmethod 
164 -    def train(cls, data, lambda_=1.0): 
 165          """ 
166          Train a Naive Bayes model given an RDD of (label, features) vectors. 
167   
168          This is the Multinomial NB (U{http://tinyurl.com/lsdw6p}) which can 
169          handle all kinds of discrete data.  For example, by converting 
170          documents into TF-IDF vectors, it can be used for document 
171          classification.  By making every vector a 0-1 vector, it can also be 
172          used as Bernoulli NB (U{http://tinyurl.com/p7c96j6}). 
173   
174          @param data: RDD of NumPy vectors, one per element, where the first 
175                 coordinate is the label and the rest is the feature vector 
176                 (e.g. a count vector). 
177          @param lambda_: The smoothing parameter 
178          """ 
179          sc = data.context 
180          dataBytes = _get_unmangled_labeled_point_rdd(data) 
181          ans = sc._jvm.PythonMLLibAPI().trainNaiveBayes(dataBytes._jrdd, lambda_) 
182          return NaiveBayesModel( 
183              _deserialize_double_vector(ans[0]), 
184              _deserialize_double_vector(ans[1]), 
185              _deserialize_double_matrix(ans[2])) 
  186   
189      import doctest 
190      globs = globals().copy() 
191      globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) 
192      (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) 
193      globs['sc'].stop() 
194      if failure_count: 
195          exit(-1) 
 196   
197  if __name__ == "__main__": 
198      _test() 
199