2016-10-03 30 views
1

我有複雜的函數,我使用map函數在spark中運行數據集。它位於不同的python模塊中。當調用map時,執行程序節點沒有該代碼,然後map函數失敗。我怎樣才能讓Spark看到不同模塊中的代碼?

s_cobDates = getCobDates() #returns a list of dates 
sb_dataset = sc.broadcast(dataset) #fyi - it is not trivial to slice this into chunks per date 

def sparkInnerLoop(n_cobDate): 
    n_dataset = sb_dataset.value 
    import someOtherModule 
    return someOtherModule.myComplicatedCalc(n_dataset) 

results = s_cobDates.map(sparkInnerLoop).collect() 

由於Spark不能導入myOtherModule,所以Spark失敗。

到目前爲止,我已經通過創建一個包含someOtherModule並在Spark工作之前將其部署到集羣的python軟件包來實現它,但這不能用於快速原型設計。

如何獲取spark以將完整代碼發送到執行程序節點,而不將所有代碼內聯到「sparkInnerLoop」中?該代碼在我的解決方案的其他地方使用,我不希望代碼重複。

我在獨立模式下使用八節點羣集,v 1.6.2,並且驅動程序正在我的工作站上運行在pycharm中。

回答

0

那麼上面的答案是有效的,如果你的模塊是一個包的一部分,它會下降。相反,它可能會壓縮你的模塊,然後將zip文件添加到你的spark上下文中,然後他們有正確的包名。

def ziplib(): 
    libpath = os.path.dirname(__file__) # this should point to your packages directory 
    zippath = r'c:\Temp\mylib-' + randstr.randstr(6) + '.zip' 
    zippath = os.path.abspath(zippath) 
    zf = zipfile.PyZipFile(zippath, mode='w') 
    try: 
     zf.debug = 3 # making it verbose, good for debugging 
     zf.writepy(libpath) 
     return zippath # return path to generated zip archive 
    finally: 
     zf.close() 

sc = SparkContext(conf=conf) 

zip_path = ziplib() # generate zip archive containing your lib 
zip_path = pathlib.Path(zip_path).as_uri() 
sc.addPyFile(zip_path) # add the entire archive to SparkContext 
+0

這很好,謝謝你 – ThatDataGuy