1   
  2   
  3   
  4   
  5   
  6   
  7   
  8   
  9   
 10   
 11   
 12   
 13   
 14   
 15   
 16   
 17   
 18  from numpy import array, ndarray 
 19  from pyspark import SparkContext 
 20  from pyspark.mllib._common import \ 
 21      _dot, _get_unmangled_rdd, _get_unmangled_double_vector_rdd, \ 
 22      _serialize_double_matrix, _deserialize_double_matrix, \ 
 23      _serialize_double_vector, _deserialize_double_vector, \ 
 24      _get_initial_weights, _serialize_rating, _regression_train_wrapper, \ 
 25      _linear_predictor_typecheck, _have_scipy, _scipy_issparse 
 26  from pyspark.mllib.linalg import SparseVector 
 30      """ 
 31      The features and labels of a data point. 
 32   
 33      @param label: Label for this data point. 
 34      @param features: Vector of features for this point (NumPy array, list, 
 35          pyspark.mllib.linalg.SparseVector, or scipy.sparse column matrix) 
 36      """ 
 38          self.label = label 
 39          if (type(features) == ndarray or type(features) == SparseVector 
 40                  or (_have_scipy and _scipy_issparse(features))): 
 41              self.features = features 
 42          elif type(features) == list: 
 43              self.features = array(features) 
 44          else: 
 45              raise TypeError("Expected NumPy array, list, SparseVector, or scipy.sparse matrix") 
   46   
 49      """A linear model that has a vector of coefficients and an intercept.""" 
 53   
 54      @property 
 57   
 58      @property 
 60          return self._intercept 
   61   
 64      """A linear regression model. 
 65   
 66      >>> lrmb = LinearRegressionModelBase(array([1.0, 2.0]), 0.1) 
 67      >>> abs(lrmb.predict(array([-1.03, 7.777])) - 14.624) < 1e-6 
 68      True 
 69      >>> abs(lrmb.predict(SparseVector(2, {0: -1.03, 1: 7.777})) - 14.624) < 1e-6 
 70      True 
 71      """ 
 73          """Predict the value of the dependent variable given a vector x""" 
 74          """containing values for the independent variables.""" 
 75          _linear_predictor_typecheck(x, self._coeff) 
 76          return _dot(x, self._coeff) + self._intercept 
   77   
 80      """A linear regression model derived from a least-squares fit. 
 81   
 82      >>> from pyspark.mllib.regression import LabeledPoint 
 83      >>> data = [ 
 84      ...     LabeledPoint(0.0, [0.0]), 
 85      ...     LabeledPoint(1.0, [1.0]), 
 86      ...     LabeledPoint(3.0, [2.0]), 
 87      ...     LabeledPoint(2.0, [3.0]) 
 88      ... ] 
 89      >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0])) 
 90      >>> abs(lrm.predict(array([0.0])) - 0) < 0.5 
 91      True 
 92      >>> abs(lrm.predict(array([1.0])) - 1) < 0.5 
 93      True 
 94      >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5 
 95      True 
 96      >>> data = [ 
 97      ...     LabeledPoint(0.0, SparseVector(1, {0: 0.0})), 
 98      ...     LabeledPoint(1.0, SparseVector(1, {0: 1.0})), 
 99      ...     LabeledPoint(3.0, SparseVector(1, {0: 2.0})), 
100      ...     LabeledPoint(2.0, SparseVector(1, {0: 3.0})) 
101      ... ] 
102      >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0])) 
103      >>> abs(lrm.predict(array([0.0])) - 0) < 0.5 
104      True 
105      >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5 
106      True 
107      """ 
 108   
111      @classmethod 
112 -    def train(cls, data, iterations=100, step=1.0, 
113                miniBatchFraction=1.0, initialWeights=None): 
 114          """Train a linear regression model on the given data.""" 
115          sc = data.context 
116          train_f = lambda d, i: sc._jvm.PythonMLLibAPI().trainLinearRegressionModelWithSGD( 
117              d._jrdd, iterations, step, miniBatchFraction, i) 
118          return _regression_train_wrapper(sc, train_f, LinearRegressionModel, data, initialWeights) 
  119   
122      """A linear regression model derived from a least-squares fit with an 
123      l_1 penalty term. 
124   
125      >>> from pyspark.mllib.regression import LabeledPoint 
126      >>> data = [ 
127      ...     LabeledPoint(0.0, [0.0]), 
128      ...     LabeledPoint(1.0, [1.0]), 
129      ...     LabeledPoint(3.0, [2.0]), 
130      ...     LabeledPoint(2.0, [3.0]) 
131      ... ] 
132      >>> lrm = LassoWithSGD.train(sc.parallelize(data), initialWeights=array([1.0])) 
133      >>> abs(lrm.predict(array([0.0])) - 0) < 0.5 
134      True 
135      >>> abs(lrm.predict(array([1.0])) - 1) < 0.5 
136      True 
137      >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5 
138      True 
139      >>> data = [ 
140      ...     LabeledPoint(0.0, SparseVector(1, {0: 0.0})), 
141      ...     LabeledPoint(1.0, SparseVector(1, {0: 1.0})), 
142      ...     LabeledPoint(3.0, SparseVector(1, {0: 2.0})), 
143      ...     LabeledPoint(2.0, SparseVector(1, {0: 3.0})) 
144      ... ] 
145      >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0])) 
146      >>> abs(lrm.predict(array([0.0])) - 0) < 0.5 
147      True 
148      >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5 
149      True 
150      """ 
 151   
154      @classmethod 
155 -    def train(cls, data, iterations=100, step=1.0, regParam=1.0, 
156                miniBatchFraction=1.0, initialWeights=None): 
 157          """Train a Lasso regression model on the given data.""" 
158          sc = data.context 
159          train_f = lambda d, i: sc._jvm.PythonMLLibAPI().trainLassoModelWithSGD( 
160              d._jrdd, iterations, step, regParam, miniBatchFraction, i) 
161          return _regression_train_wrapper(sc, train_f, LassoModel, data, initialWeights) 
  162   
165      """A linear regression model derived from a least-squares fit with an 
166      l_2 penalty term. 
167   
168      >>> from pyspark.mllib.regression import LabeledPoint 
169      >>> data = [ 
170      ...     LabeledPoint(0.0, [0.0]), 
171      ...     LabeledPoint(1.0, [1.0]), 
172      ...     LabeledPoint(3.0, [2.0]), 
173      ...     LabeledPoint(2.0, [3.0]) 
174      ... ] 
175      >>> lrm = RidgeRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0])) 
176      >>> abs(lrm.predict(array([0.0])) - 0) < 0.5 
177      True 
178      >>> abs(lrm.predict(array([1.0])) - 1) < 0.5 
179      True 
180      >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5 
181      True 
182      >>> data = [ 
183      ...     LabeledPoint(0.0, SparseVector(1, {0: 0.0})), 
184      ...     LabeledPoint(1.0, SparseVector(1, {0: 1.0})), 
185      ...     LabeledPoint(3.0, SparseVector(1, {0: 2.0})), 
186      ...     LabeledPoint(2.0, SparseVector(1, {0: 3.0})) 
187      ... ] 
188      >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0])) 
189      >>> abs(lrm.predict(array([0.0])) - 0) < 0.5 
190      True 
191      >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5 
192      True 
193      """ 
 194   
197      @classmethod 
198 -    def train(cls, data, iterations=100, step=1.0, regParam=1.0, 
199                miniBatchFraction=1.0, initialWeights=None): 
 200          """Train a ridge regression model on the given data.""" 
201          sc = data.context 
202          train_func = lambda d, i: sc._jvm.PythonMLLibAPI().trainRidgeModelWithSGD( 
203              d._jrdd, iterations, step, regParam, miniBatchFraction, i) 
204          return _regression_train_wrapper(sc, train_func, RidgeRegressionModel, data, initialWeights) 
  205   
208      import doctest 
209      globs = globals().copy() 
210      globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) 
211      (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) 
212      globs['sc'].stop() 
213      if failure_count: 
214          exit(-1) 
 215   
216  if __name__ == "__main__": 
217      _test() 
218