1   
  2   
  3   
  4   
  5   
  6   
  7   
  8   
  9   
 10   
 11   
 12   
 13   
 14   
 15   
 16   
 17   
 18  import numpy as np 
 19   
 20  from pyspark.mllib.linalg import Vectors, SparseVector 
 21  from pyspark.mllib.regression import LabeledPoint 
 22  from pyspark.mllib._common import _convert_vector 
 26      """ 
 27      Helper methods to load, save and pre-process data used in MLlib. 
 28      """ 
 29   
 30      @staticmethod 
 32          """ 
 33          Parses a line in LIBSVM format into (label, indices, values). 
 34          """ 
 35          items = line.split(None) 
 36          label = float(items[0]) 
 37          if not multiclass: 
 38              label = 1.0 if label > 0.5 else 0.0 
 39          nnz = len(items) - 1 
 40          indices = np.zeros(nnz, dtype=np.int32) 
 41          values = np.zeros(nnz) 
 42          for i in xrange(nnz): 
 43              index, value = items[1 + i].split(":") 
 44              indices[i] = int(index) - 1 
 45              values[i] = float(value) 
 46          return label, indices, values 
  47   
 48      @staticmethod 
 50          """Converts a LabeledPoint to a string in LIBSVM format.""" 
 51          items = [str(p.label)] 
 52          v = _convert_vector(p.features) 
 53          if type(v) == np.ndarray: 
 54              for i in xrange(len(v)): 
 55                  items.append(str(i + 1) + ":" + str(v[i])) 
 56          elif type(v) == SparseVector: 
 57              nnz = len(v.indices) 
 58              for i in xrange(nnz): 
 59                  items.append(str(v.indices[i] + 1) + ":" + str(v.values[i])) 
 60          else: 
 61              raise TypeError("_convert_labeled_point_to_libsvm needs either ndarray or SparseVector" 
 62                              " but got " % type(v)) 
 63          return " ".join(items) 
  64   
 65      @staticmethod 
 66 -    def loadLibSVMFile(sc, path, multiclass=False, numFeatures=-1, minPartitions=None): 
  67          """ 
 68          Loads labeled data in the LIBSVM format into an RDD of 
 69          LabeledPoint. The LIBSVM format is a text-based format used by 
 70          LIBSVM and LIBLINEAR. Each line represents a labeled sparse 
 71          feature vector using the following format: 
 72   
 73          label index1:value1 index2:value2 ... 
 74   
 75          where the indices are one-based and in ascending order. This 
 76          method parses each line into a LabeledPoint, where the feature 
 77          indices are converted to zero-based. 
 78   
 79          @param sc: Spark context 
 80          @param path: file or directory path in any Hadoop-supported file 
 81                       system URI 
 82          @param multiclass: whether the input labels contain more than 
 83                             two classes. If false, any label with value 
 84                             greater than 0.5 will be mapped to 1.0, or 
 85                             0.0 otherwise. So it works for both +1/-1 and 
 86                             1/0 cases. If true, the double value parsed 
 87                             directly from the label string will be used 
 88                             as the label value. 
 89          @param numFeatures: number of features, which will be determined 
 90                              from the input data if a nonpositive value 
 91                              is given. This is useful when the dataset is 
 92                              already split into multiple files and you 
 93                              want to load them separately, because some 
 94                              features may not present in certain files, 
 95                              which leads to inconsistent feature 
 96                              dimensions. 
 97          @param minPartitions: min number of partitions 
 98          @return: labeled data stored as an RDD of LabeledPoint 
 99   
100          >>> from tempfile import NamedTemporaryFile 
101          >>> from pyspark.mllib.util import MLUtils 
102          >>> tempFile = NamedTemporaryFile(delete=True) 
103          >>> tempFile.write("+1 1:1.0 3:2.0 5:3.0\\n-1\\n-1 2:4.0 4:5.0 6:6.0") 
104          >>> tempFile.flush() 
105          >>> examples = MLUtils.loadLibSVMFile(sc, tempFile.name).collect() 
106          >>> multiclass_examples = MLUtils.loadLibSVMFile(sc, tempFile.name, True).collect() 
107          >>> tempFile.close() 
108          >>> examples[0].label 
109          1.0 
110          >>> examples[0].features.size 
111          6 
112          >>> print examples[0].features 
113          [0: 1.0, 2: 2.0, 4: 3.0] 
114          >>> examples[1].label 
115          0.0 
116          >>> examples[1].features.size 
117          6 
118          >>> print examples[1].features 
119          [] 
120          >>> examples[2].label 
121          0.0 
122          >>> examples[2].features.size 
123          6 
124          >>> print examples[2].features 
125          [1: 4.0, 3: 5.0, 5: 6.0] 
126          >>> multiclass_examples[1].label 
127          -1.0 
128          """ 
129   
130          lines = sc.textFile(path, minPartitions) 
131          parsed = lines.map(lambda l: MLUtils._parse_libsvm_line(l, multiclass)) 
132          if numFeatures <= 0: 
133              parsed.cache() 
134              numFeatures = parsed.map(lambda x: 0 if x[1].size == 0 else x[1][-1]).reduce(max) + 1 
135          return parsed.map(lambda x: LabeledPoint(x[0], Vectors.sparse(numFeatures, x[1], x[2]))) 
 136   
137      @staticmethod 
139          """ 
140          Save labeled data in LIBSVM format. 
141   
142          @param data: an RDD of LabeledPoint to be saved 
143          @param dir: directory to save the data 
144   
145          >>> from tempfile import NamedTemporaryFile 
146          >>> from fileinput import input 
147          >>> from glob import glob 
148          >>> from pyspark.mllib.util import MLUtils 
149          >>> examples = [LabeledPoint(1.1, Vectors.sparse(3, [(0, 1.23), (2, 4.56)])), \ 
150                          LabeledPoint(0.0, Vectors.dense([1.01, 2.02, 3.03]))] 
151          >>> tempFile = NamedTemporaryFile(delete=True) 
152          >>> tempFile.close() 
153          >>> MLUtils.saveAsLibSVMFile(sc.parallelize(examples), tempFile.name) 
154          >>> ''.join(sorted(input(glob(tempFile.name + "/part-0000*")))) 
155          '0.0 1:1.01 2:2.02 3:3.03\\n1.1 1:1.23 3:4.56\\n' 
156          """ 
157          lines = data.map(lambda p: MLUtils._convert_labeled_point_to_libsvm(p)) 
158          lines.saveAsTextFile(dir) 
  159   
162      import doctest 
163      from pyspark.context import SparkContext 
164      globs = globals().copy() 
165       
166       
167      globs['sc'] = SparkContext('local[2]', 'PythonTest', batchSize=2) 
168      (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) 
169      globs['sc'].stop() 
170      if failure_count: 
171          exit(-1) 
 172   
173   
174  if __name__ == "__main__": 
175      _test() 
176