2016-11-29 191 views
2

我有一個LIBSVM縮放模型(用svm-scale生成),我想移植到PySpark。我天真地嘗試了以下內容:如何將LIBSVM模型(使用LIBSVM保存)讀入PySpark?

scaler_path = "path to model" 
a = MinMaxScaler().load(scaler_path) 

但我拋出一個錯誤,期待一個元數據目錄:

Py4JJavaErrorTraceback (most recent call last) 
<ipython-input-22-1942e7522174> in <module>() 
----> 1 a = MinMaxScaler().load(scaler_path) 

/srv/data/spark/spark-2.0.0-bin-hadoop2.6/python/pyspark/ml/util.pyc in load(cls, path) 
    226  def load(cls, path): 
    227   """Reads an ML instance from the input path, a shortcut of `read().load(path)`.""" 
--> 228   return cls.read().load(path) 
    229 
    230 

/srv/data/spark/spark-2.0.0-bin-hadoop2.6/python/pyspark/ml/util.pyc in load(self, path) 
    174   if not isinstance(path, basestring): 
    175    raise TypeError("path should be a basestring, got type %s" % type(path)) 
--> 176   java_obj = self._jread.load(path) 
    177   if not hasattr(self._clazz, "_from_java"): 
    178    raise NotImplementedError("This Java ML type cannot be loaded into Python currently: %r" 

/usr/local/lib/python2.7/dist-packages/py4j/java_gateway.pyc in __call__(self, *args) 
    1131   answer = self.gateway_client.send_command(command) 
    1132   return_value = get_return_value(
-> 1133    answer, self.gateway_client, self.target_id, self.name) 
    1134 
    1135   for temp_arg in temp_args: 

/srv/data/spark/spark-2.0.0-bin-hadoop2.6/python/pyspark/sql/utils.pyc in deco(*a, **kw) 
    61  def deco(*a, **kw): 
    62   try: 
---> 63    return f(*a, **kw) 
    64   except py4j.protocol.Py4JJavaError as e: 
    65    s = e.java_exception.toString() 

/usr/local/lib/python2.7/dist-packages/py4j/protocol.pyc in get_return_value(answer, gateway_client, target_id, name) 
    317     raise Py4JJavaError(
    318      "An error occurred while calling {0}{1}{2}.\n". 
--> 319      format(target_id, ".", name), value) 
    320    else: 
    321     raise Py4JError(

Py4JJavaError: An error occurred while calling o321.load. 
: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:[filename]/metadata 

```

有一個簡單的工作,周圍加載這個? LIBSVM模型的格式是

x 
0 1 
1 -1050 1030 
2 0 1 
3 0 3 
4 0 1 
5 0 1 

回答

5

首先,呈現的文件不是libsvm格式。 libsvm文件的正確格式如下:

<label> <index1>:<value1> <index2>:<value2> ... <indexN>:<valueN> 

因此,您的數據準備不正確,以開始。

其次,您使用的類方法load(path)MinMaxScaler中讀取輸入路徑中的ML實例。

記住:MinMaxScaler計算對數據集彙總統計併產生MinMaxScalerModel。然後模型可以單獨轉換每個特徵,使其位於給定範圍內。

e.g:

from pyspark.ml.linalg import Vectors 
from pyspark.mllib.regression import LabeledPoint 
from pyspark.ml.feature import MinMaxScaler 
df = spark.createDataFrame([(1.1, Vectors.sparse(3, [(0, 1.23), (2, 4.56)])) ,(0.0, Vectors.dense([1.01, 2.02, 3.03]))],['label','features']) 

df.show(truncate=False) 
# +-----+---------------------+ 
# |label|features    | 
# +-----+---------------------+ 
# |1.1 |(3,[0,2],[1.23,4.56])| 
# |0.0 |[1.01,2.02,3.03]  | 
# +-----+---------------------+ 

mmScaler = MinMaxScaler(inputCol="features", outputCol="scaled") 
temp_path = "/tmp/spark/" 
minMaxScalerPath = temp_path + "min-max-scaler" 
mmScaler.save(minMaxScalerPath) 

的片段上方將保存MinMaxScaler特徵變壓器,因此它可以與所述類方法負載之後被加載。

現在,我們來看看究竟發生了什麼。類方法save將創建以下文件結構:

/tmp/spark/ 
└── min-max-scaler 
    └── metadata 
     ├── part-00000 
     └── _SUCCESS 

讓我們檢查part-0000文件的內容:

$ cat /tmp/spark/min-max-scaler/metadata/part-00000 | python -m json.tool 
{ 
    "class": "org.apache.spark.ml.feature.MinMaxScaler", 
    "paramMap": { 
     "inputCol": "features", 
     "max": 1.0, 
     "min": 0.0, 
     "outputCol": "scaled" 
    }, 
    "sparkVersion": "2.0.0", 
    "timestamp": 1480501003244, 
    "uid": "MinMaxScaler_42e68455a929c67ba66f" 
} 

因此,實際上,當你裝載變壓器:

loadedMMScaler = MinMaxScaler.load(minMaxScalerPath) 

你實際上是加載該文件。 它不會需要libsvm文件!

現在還能用自己的變壓器以創建模型和轉換DataFrame

model = loadedMMScaler.fit(df) 

model.transform(df).show(truncate=False)          
# +-----+---------------------+-------------+ 
# |label|features    |scaled  | 
# +-----+---------------------+-------------+ 
# |1.1 |(3,[0,2],[1.23,4.56])|[1.0,0.0,1.0]| 
# |0.0 |[1.01,2.02,3.03]  |[0.0,1.0,0.0]| 
# +-----+---------------------+-------------+ 

現在讓我們回到那個libsvm的文件,並讓我們創造一些虛擬的數據,並用它保存到一個LIBSVM格式MLUtils

from pyspark.mllib.regression import LabeledPoint 
from pyspark.mllib.linalg import Vectors 
from pyspark.mllib.util import MLUtils 
data = sc.parallelize([LabeledPoint(1.1, Vectors.sparse(3, [(0, 1.23), (2, 4.56)])), LabeledPoint(0.0, Vectors.dense([1.01, 2.02, 3.03]))]) 
MLUtils.saveAsLibSVMFile(data, temp_path + "data") 

回到我們的文件結構:

/tmp/spark/ 
├── data 
│   ├── part-00000 
│   ├── part-00001 
│   ├── part-00002 
│   ├── part-00003 
│   ├── part-00004 
│   ├── part-00005 
│   ├── part-00006 
│   ├── part-00007 
│   └── _SUCCESS 
└── min-max-scaler 
    └── metadata 
     ├── part-00000 
     └── _SUCCESS 

您可以檢查這些文件,該文件是在現在LIBSVM格式的內容:

$ cat /tmp/spark/data/part-0000* 
1.1 1:1.23 3:4.56 
0.0 1:1.01 2:2.02 3:3.03 

現在讓我們來加載數據和應用:

loadedData = MLUtils.loadLibSVMFile(sc, temp_path + "data") 
loadedDataDF = spark.createDataFrame(loadedData.map(lambda lp : (lp.label, lp.features.asML())), ['label','features']) 

loadedDataDF.show(truncate=False) 
# +-----+----------------------------+            
# |label|features     | 
# +-----+----------------------------+ 
# |1.1 |(3,[0,2],[1.23,4.56])  | 
# |0.0 |(3,[0,1,2],[1.01,2.02,3.03])| 
# +-----+----------------------------+ 

注意在轉換MLlib Vectors到ML Vectors非常重要。你可以閱讀更多關於它here

model.transform(loadedDataDF).show(truncate=False) 
# +-----+----------------------------+-------------+ 
# |label|features     |scaled  | 
# +-----+----------------------------+-------------+ 
# |1.1 |(3,[0,2],[1.23,4.56])  |[1.0,0.0,1.0]| 
# |0.0 |(3,[0,1,2],[1.01,2.02,3.03])|[0.0,1.0,0.0]| 
# +-----+----------------------------+-------------+ 

我希望這能回答你的問題!