2017-07-15 102 views
0

我想從Spark MLlib中調用LogisticRegressionWithLBFGS.train,並提供解決多類邏輯迴歸的訓練數據。我的訓練集數據被表示爲:調用Spark MLlib時出現TypeError LogisticRegressionWithLBFGS.train

trainingData = sXYdf.rdd.map(lambda x: reg.LabeledPoint(x[0]-1,x[1:])) 
trainingData.take(2) 

的出LabeledPoints(2行)的是:(我不輸出完整的標籤和功能,因爲它是2x401標籤特徵矩陣與特徵從COL 1佔據-401而標籤是在山坳0相同的數據是這樣的: -

[LabeledPoint(9.0, [0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,8.56059679589e-06,1.94035947712e-06,-0.00073743872549,-0.0081340379902,-0.0186104473039,-0.0187412865354,-0.018757250817,-0.0190963541667...])] 

現在,當我打電話

lrm=LogisticRegressionWithLBFGS.train(trainingData,numClasses=10) 

我得到以下錯誤:

TypeError         Traceback (most recent call last) 
<ipython-input-20-9b0c5530b34b> in <module>() 
     1 #lr=LogisticRegression(maxIter=10, regParam=0.0, elasticNetParam=0.0) 
----> 2 lrm=LogisticRegressionWithLBFGS.train(trainingData,numClasses=10) 

C:\spark-2.1.1-bin-hadoop2.7\spark-2.1.1-bin-hadoop2.7\python\pyspark\mllib\classification.py in train(cls, data, iterations, initialWeights, regParam, regType, intercept, corrections, tolerance, validateData, numClasses) 
    396     else: 
    397      initialWeights = [0.0] * len(data.first().features) * (numClasses - 1) 
--> 398   return _regression_train_wrapper(train, LogisticRegressionModel, data, initialWeights) 
    399 
    400 

C:\spark-2.1.1-bin-hadoop2.7\spark-2.1.1-bin-hadoop2.7\python\pyspark\mllib\regression.py in _regression_train_wrapper(train_func, modelClass, data, initial_weights) 
    214   weights, intercept, numFeatures, numClasses = train_func(
    215    data, _convert_to_vector(initial_weights)) 
--> 216   return modelClass(weights, intercept, numFeatures, numClasses) 
    217  else: 
    218   weights, intercept = train_func(data, _convert_to_vector(initial_weights)) 

C:\spark-2.1.1-bin-hadoop2.7\spark-2.1.1-bin-hadoop2.7\python\pyspark\mllib\classification.py in __init__(self, weights, intercept, numFeatures, numClasses) 
    174    self._dataWithBiasSize = self._coeff.size/(self._numClasses - 1) 
    175    self._weightsMatrix = self._coeff.toArray().reshape(self._numClasses - 1, 
--> 176                 self._dataWithBiasSize) 
    177 
    178  @property 

TypeError: 'float' object cannot be interpreted as an integer 

增加了更多的日誌: - 貌似工作者線程的創建有問題..

17/07/15 19:59:14 WARN TaskSetManager: Stage 123 contains a task of very large size (17658 KB). The maximum recommended task size is 100 KB. 
17/07/15 19:59:24 ERROR Executor: Exception in task 0.0 in stage 123.0 (TID 123) 
org.apache.spark.SparkException: Python worker did not connect back in time 
     at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:138) 
     at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:67) 
     at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:116) 
     at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:128) 
     at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63) 
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
     at org.apache.spark.scheduler.Task.run(Task.scala:99) 
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
     at java.lang.Thread.run(Unknown Source) 
Caused by: java.net.SocketTimeoutException: Accept timed out 
     at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method) 
     at java.net.DualStackPlainSocketImpl.socketAccept(Unknown Source) 
     at java.net.AbstractPlainSocketImpl.accept(Unknown Source) 
     at java.net.PlainSocketImpl.accept(Unknown Source) 
     at java.net.ServerSocket.implAccept(Unknown Source) 
     at java.net.ServerSocket.accept(Unknown Source) 
     at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:133) 
     ... 27 more 
17/07/15 19:59:24 WARN TaskSetManager: Lost task 0.0 in stage 123.0 (TID 123, localhost, executor driver): org.apache.spark.SparkException: Python worker did not connect back in time 
     at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:138) 
     at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:67) 
     at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:116) 
     at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:128) 
     at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63) 
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
     at org.apache.spark.scheduler.Task.run(Task.scala:99) 
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
     at java.lang.Thread.run(Unknown Source) 
Caused by: java.net.SocketTimeoutException: Accept timed out 
     at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method) 
     at java.net.DualStackPlainSocketImpl.socketAccept(Unknown Source) 
     at java.net.AbstractPlainSocketImpl.accept(Unknown Source) 
     at java.net.PlainSocketImpl.accept(Unknown Source) 
     at java.net.ServerSocket.implAccept(Unknown Source) 
     at java.net.ServerSocket.accept(Unknown Source) 
     at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:133) 
     ... 27 more 

17/07/15 19:59:24 ERROR TaskSetManager: Task 0 in stage 123.0 failed 1 times; aborting job 
Traceback (most recent call last): 
    File "C:\Users\Sunil\Anaconda3\lib\runpy.py", line 193, in _run_module_as_main 
    "__main__", mod_spec) 
    File "C:\Users\Sunil\Anaconda3\lib\runpy.py", line 85, in _run_code 
    exec(code, run_globals) 
    File "C:\spark-2.1.1-bin-hadoop2.7\spark-2.1.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 211, in <module> 
ConnectionRefusedError: [WinError 10061] No connection could be made because the target machine actively refused it 
[I 20:01:12.525 NotebookApp] Saving file at /mltclasspyspark.ipynb 

回答

1

嗯,看來there is a bug在星火2.1.1與Python 3產生上述錯誤(我無法重現它與Python 2.7)。

因此,如果您不能升級到2.1.2星火或2.2,其中的問題已經解決了報道,或使用Python 2.7,而不是,我建議正在修改你的map功能如下,讓您的標籤現在整數,而不是浮動(雖然沒有測試它):

trainingData = sXYdf.rdd.map(lambda x: reg.LabeledPoint(int(x[0]-1),x[1:])) 
+0

不幸的是,類型轉換爲int沒有幫助。讓我試着升級火花並給它一個鏡頭。 – sunny

+1

感謝您的幫助。升級到火花2.2.0做了工作,雖然我保持了類型轉換。 – sunny