2017-08-29 68 views
0

我剛剛閱讀了findspark,發現它很有趣,因爲到目前爲止我只使用了​​這不適合在IDE上進行交互式開發。我試圖在Windows 10,蟒蛇4.4.0,3.6.1的Python,IPython的5.3.0,3.1.4的Spyder執行此文件,星火2.1.1:無法提交從Windows IDE到Linux集羣的Spark作業

def inc(i): 
    return i + 1 

import findspark 
findspark.init() 

import pyspark 
sc = pyspark.SparkContext(master='local', 
          appName='test1') 

print(repr(sc.parallelize(tuple(range(10))).map(inc).collect())) 

Spyder的生成命令runfile('C:/tests/temp1.py', wdir='C:/tests')和它打印出來[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]如預期。但是,如果我嘗試使用星火集羣運行在Ubuntu上我得到一個錯誤:

def inc(i): 
    return i + 1 

import findspark 
findspark.init() 

import pyspark 
sc = pyspark.SparkContext(master='spark://192.168.1.57:7077', 
          appName='test1') 

print(repr(sc.parallelize(tuple(range(10))).map(inc).collect())) 

IPython的錯誤:

Traceback (most recent call last): 

    File "<ipython-input-1-820bd4275b8c>", line 1, in <module> 
    runfile('C:/tests/temp.py', wdir='C:/tests') 

    File "C:\Anaconda3\lib\site-packages\spyder\utils\site\sitecustomize.py", line 880, in runfile 
    execfile(filename, namespace) 

    File "C:\Anaconda3\lib\site-packages\spyder\utils\site\sitecustomize.py", line 102, in execfile 
    exec(compile(f.read(), filename, 'exec'), namespace) 

    File "C:/tests/temp.py", line 11, in <module> 
    print(repr(sc.parallelize(tuple(range(10))).map(inc).collect())) 

    File "C:\projects\spark-2.1.1-bin-hadoop2.7\python\pyspark\rdd.py", line 808, in collect 
    port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) 

    File "C:\projects\spark-2.1.1-bin-hadoop2.7\python\lib\py4j-0.10.4-src.zip\py4j\java_gateway.py", line 1133, in __call__ 
    answer, self.gateway_client, self.target_id, self.name) 

    File "C:\projects\spark-2.1.1-bin-hadoop2.7\python\lib\py4j-0.10.4-src.zip\py4j\protocol.py", line 319, in get_return_value 
    format(target_id, ".", name), value) 

Py4JJavaError: An error occurred while calling 

工人標準錯誤:

ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) 
java.io.IOException: Cannot run program "C:\Anaconda3\pythonw.exe": error=2, No such file or directory 
    at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048) 
    at org.apache.spark.api.python.PythonWorkerFactory.startDaemon(PythonWorkerFactory.scala:163) 
    at org.apache.spark.api.python.PythonWorkerFactory.createThroughDaemon(PythonWorkerFactory.scala:89) 
    at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:65) 
    at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:116) 
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:128) 
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
    at org.apache.spark.scheduler.Task.run(Task.scala:99) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:748) 

出於某種原因,這種嘗試在Linux從機上使用Windows二進制路徑。任何想法如何克服這一點?我在Spyder上獲得了與Python控制檯相同的結果,除了錯誤是Cannot run program "C:\Anaconda3\python.exe": error=2, No such file or directory。實際上,它運行python temp.py時也從命令行發生。

這個版本從Windows到Linux的提交,即使正常工作:

def inc(i): 
    return i + 1 

import pyspark 
sc = pyspark.SparkContext(appName='test2') 

print(repr(sc.parallelize(tuple(range(10))).map(inc).collect())) 

spark-submit --master spark://192.168.1.57:7077 temp2.py

回答

0

我找到了解決辦法,這被證明是非常簡單的。 pyspark/context.py使用env變量PYSPARK_PYTHON來確定Python可執行文件的路徑,但默認爲「正確」python。但默認情況下findspark overrides這個env變量要匹配sys.executable,這顯然不能跨平臺工作。

反正這裏是供將來參考工作代碼:

def inc(i): 
    return i + 1 

import findspark 
findspark.init(python_path='python') # <-- so simple! 

import pyspark 
sc = pyspark.SparkContext(master='spark://192.168.1.57:7077', 
          appName='test1') 

print(repr(sc.parallelize(tuple(range(10))).map(inc).collect())) 
相關問題