2016-08-08 55 views
5

在過去的幾天裏,我一直在努力理解Spark執行者在導入時如何知道如何使用給定名稱的模塊。我正在研究AWS EMR。現狀: 我通過鍵入PySpark分發模塊導入

pyspark --master紗

然後,在pyspark初始化電子病歷pyspark,

import numpy as np ## notice the naming 

def myfun(x): 
    n = np.random.rand(1) 
    return x*n 

rdd = sc.parallelize([1,2,3,4], 2) 
rdd.map(lambda x: myfun(x)).collect() ## works! 

我的理解是,當我輸入numpy as np,主節點是唯一節點導入和識別numpynp。但是,對於EMR集羣(2個工作節點),如果我在rdd上運行映射函數,則驅動程序將該函數發送給工作節點,以便爲列表中的每個項目(對於每個分區)執行函數,並且返回成功結果。

我的問題是:工作人員如何知道numpy應該作爲np導入?每個工作人員都已安裝numpy,但我沒有明確定義每個節點導入模塊as np的方法。

請參考下面的發佈者Cloudera的更詳細的信息的依賴關係: http://blog.cloudera.com/blog/2015/09/how-to-prepare-your-apache-hadoop-cluster-for-pyspark-jobs/

複雜依賴它們具有其中熊貓模塊被明確地在每個節點上導入的一個例子(代碼)。

我聽說過的一個理論是驅動程序分發所有在pyspark交互式shell中傳遞的代碼。我對此持懷疑態度。我提出的例子是,如果在主節點上輸入:

print "hello" 

是否每個工作節點還打印「hello」?我不這麼認爲。但也許我錯了。

回答

3

當功能是串行化是有number of objects is being saved

  • 代碼
  • 全局
  • 默認
  • closure
  • 字典

它可以在以後用於恢復給定函數所需的完整環境。

from pyspark.cloudpickle import CloudPickler 

CloudPickler.extract_code_globals(myfun.__code__) 
## {'np'} 

和結合可以從其globals提取:

由於np通過它可以從它的代碼被提取的函數引用

myfun.__globals__['np'] 
## <module 'numpy' from ... 

所以串行化閉合(在廣義上)捕獲恢復環境所需的所有信息。當然,在閉包中訪問的所有模塊都必須能夠在每臺工作機器上導入。

其他一切只是閱讀和寫作機械。

在旁註中,主節點不應該執行任何Python代碼。它負責不運行應用程序代碼的資源分配。

+0

太好了,謝謝你的參與。這就是說,這是否意味着每個工作人員都會執行像'print'hello''這樣的代碼?或者被忽略,只執行該函數操作所需的代碼? – Jon

+1

只有閉包捕獲的代碼實際上在工人身上執行。其他一切都被忽略。 – zero323