2017-02-27 74 views
0

試圖找出如何寫一個數據流作業,依賴於一些第三方的模塊和查表工作,具體如下:嘗試理解火花流工作

# custom.py 
# this is the 3rd party or user defined python module, 
# there're some module-level variables 
# and some functions which rely on the moduel-level variables to work 
VAR_A = ... 
VAR_B = ... 


# load external data files to initialize VAR_A and VAR_B 
def init_var(external_file): 
    with open(external_file, 'r') as f: 
    for l in f: 
     VAR_A.append(l) 
     VAR_B.check(l) 
     .... 

# relies on VAR_A and VAR_B to work 
def process(x): 
    if x in VAR_A: 
    ... 
    if VAR_B.check(x): 
    ... 

流驅動器如下,基本上,每個RDD我想通過handle申請customprocess功能,但是在process功能依賴於某些查找變量工作,即VAR_AVAR_B,所以我必須明確地播放這些查找瓦爾Spark contenxt?

# driver.py 
import custom 

def handle(x): 
    ... 
    custom = shared.value 
    return custom.process(x) 


if __name__ == '__main__': 
    sc = SparkContext(appName='porn_score_on_name') 
    ssc = StreamingContext(sc, 2) 

    custom.init('/path/to/external_file') 

    # since each task node will use custom, so I try to make it a shared one 
    # HOWEVER, this won't work, since module cannot be pickled 
    shared = sc.broadcast(custom) 

    # get stream data 
    data = ... 
    processed = data.map(handle) 

    # further processing 
    ... 

    ssc.start() 
    ssc.awaitTermination() 

我不知道如何使它工作,如果我不得不使用第三方模塊?

UPDATE

假設實時流輸入文字,例如線

word1 word2 
word3 
word5 word7 word1 
... 

我想找出預先計算好的詞彙表中的單詞(V)。

所以我有這樣的想法: 寫一個數據流作業來處理輸入數據,這意味着我有多個執行並行運行消耗數據,併爲每一個執行者,預先計算vocabular V,則所有可用時間。 現在的問題是如何讓它發生?

這是我INTIAL採取在此: 我做包含的詞彙和我的自定義代碼,例如一個zip包pack.zip,然後我通過​​提交此pack.zip,使這一pack.zip可用每個執行人的機器上,那麼我應該做些什麼,以使每個執行人從pack.zip詞彙加載到內存中的樣子-up表,以至於現在每次執行訪問的詞彙,使他們能夠正確處理實時流數據時,司機開始運行。

但事實證明,上述想法可行,但每個執行者一次又一次地加載詞彙表,這是不可接受的。 所以這裏是我的第二個看法: 我應該在驅動程序中加載詞彙表(所以這發生在本地機器上,而不是執行者上),然後向所有執行者廣播詞彙表查找表,然後做這項工作。

+0

我不明白的問題,但肯定的,所有的變量和模塊必須是可序列化到星火(或者通過Python醃) –

+0

@ cricket_007,基本上,我只是婉打電話'custom' 'handle'中的'process'函數。 – avocado

+0

@ cricket_007,更新後的一些細節。順便說一句,如果所有需要的模塊必須在星火被序列化,那麼什麼是通過'--py-files'或'--archives'等上傳包的地步? – avocado

回答

0

您的例子並不真的似乎是一個流動問題,只是如何加載一個全局變量...

我不會試圖播放一個整體模塊,只是個別需要的變量。

例如,你應該能夠使用廣播變量像這樣。 (未測試的代碼)

# One of the first things you do 
vocab = sc.broadcast(open('vocab.txt').readlines()) # broadcast to all executors 

def vocab_filter(line): 
    words = line.split() 
    return [w for w in words if w in vocab.value] 

ssc = StreamingContext(sc, 1) # Some streaming context 
lines = ssc.socketTextStream("localhost", 9999) # Some stream 
# remove extraneous words from the lines and flatten all words in the stream 
lines_filtered = lines.flatMap(vocab_filter) 
+0

是的,謝謝,我完全理解你的代碼示例。然而,我最擔心的問題是我需要使用第三方模塊的查找功能,例如, 'process',這個函數是以一種不友好的方式編寫的,就像這樣'def process(line_of_string):for line in line_of_string.split():yield word if if in GLOBAL_VOCAB',就是假設這是一個模塊在這個包含詞彙的第三方模塊中包含變量GLOBAL_VOCAB,所以現在即使我可以像你那樣做'broadcast',那麼我可以打電話給第三方'process'嗎? – avocado

+0

'GLOBAL_VOCAB'本身必須是廣播變量。否則,我不明白爲什麼你剛纔提到是行不通的 –

+0

BTW什麼,該詞彙裝載並不像你的代碼那麼簡單,除了閱讀它們,一個'trie'需求正在興建對這些詞彙的話,那麼如何加載詞彙表文件並在每個執行程序上建立'trie'一次,而不是一次又一次重建'trie'(這是我的代碼中的一個問題) – avocado