這是一個很好的問題。要回答這個問題,我要使用PySpark wordcount example。
在這種情況下,我創建了兩個文件,一個叫test.py
這是我要執行的文件和另一個名爲wordcount.py.zip
這是一個包含修改wordcount.py
文件設計用於模擬模塊,我想打電話給一個zip。
我test.py
文件看起來像這樣:
import wordcount
import sys
if __name__ == "__main__":
wordcount.wctest(sys.argv[1])
我修改了wordcount.py
文件消除的主要方法,並添加一個名爲方法:
...
from pyspark import SparkContext
...
def wctest(path):
sc = SparkContext(appName="PythonWordCount")
...
我可以調用Dataproc整個事情通過使用以下gcloud
命令:
gcloud beta dataproc jobs submit pyspark --cluster <cluster-name> \
--py-files gs://<bucket>/wordcount.py.zip gs://<bucket>/test.py \
gs://<bucket>/input/input.txt
在此示例中,<bucket>
是我的存儲桶的名稱(或路徑),<cluster-name>
是我的Dataproc集羣的名稱。