試圖找出如何寫一個數據流作業,依賴於一些第三方的模塊和查表工作,具體如下:嘗試理解火花流工作
# custom.py
# this is the 3rd party or user defined python module,
# there're some module-level variables
# and some functions which rely on the moduel-level variables to work
VAR_A = ...
VAR_B = ...
# load external data files to initialize VAR_A and VAR_B
def init_var(external_file):
with open(external_file, 'r') as f:
for l in f:
VAR_A.append(l)
VAR_B.check(l)
....
# relies on VAR_A and VAR_B to work
def process(x):
if x in VAR_A:
...
if VAR_B.check(x):
...
流驅動器如下,基本上,每個RDD我想通過handle
申請custom
的process
功能,但是在process
功能依賴於某些查找變量工作,即VAR_A
和VAR_B
,所以我必須明確地播放這些查找瓦爾Spark contenxt?
# driver.py
import custom
def handle(x):
...
custom = shared.value
return custom.process(x)
if __name__ == '__main__':
sc = SparkContext(appName='porn_score_on_name')
ssc = StreamingContext(sc, 2)
custom.init('/path/to/external_file')
# since each task node will use custom, so I try to make it a shared one
# HOWEVER, this won't work, since module cannot be pickled
shared = sc.broadcast(custom)
# get stream data
data = ...
processed = data.map(handle)
# further processing
...
ssc.start()
ssc.awaitTermination()
我不知道如何使它工作,如果我不得不使用第三方模塊?
UPDATE
假設實時流輸入文字,例如線
word1 word2
word3
word5 word7 word1
...
我想找出預先計算好的詞彙表中的單詞(V)。
所以我有這樣的想法: 寫一個數據流作業來處理輸入數據,這意味着我有多個執行並行運行消耗數據,併爲每一個執行者,預先計算vocabular V,則所有可用時間。 現在的問題是如何讓它發生?
這是我INTIAL採取在此: 我做包含的詞彙和我的自定義代碼,例如一個zip包pack.zip,然後我通過提交此pack.zip,使這一pack.zip可用每個執行人的機器上,那麼我應該做些什麼,以使每個執行人從pack.zip詞彙加載到內存中的樣子-up表,以至於現在每次執行訪問的詞彙,使他們能夠正確處理實時流數據時,司機開始運行。
但事實證明,上述想法可行,但每個執行者一次又一次地加載詞彙表,這是不可接受的。 所以這裏是我的第二個看法: 我應該在驅動程序中加載詞彙表(所以這發生在本地機器上,而不是執行者上),然後向所有執行者廣播詞彙表查找表,然後做這項工作。
我不明白的問題,但肯定的,所有的變量和模塊必須是可序列化到星火(或者通過Python醃) –
@ cricket_007,基本上,我只是婉打電話'custom' 'handle'中的'process'函數。 – avocado
@ cricket_007,更新後的一些細節。順便說一句,如果所有需要的模塊必須在星火被序列化,那麼什麼是通過'--py-files'或'--archives'等上傳包的地步? – avocado