0

我是Spark的新手,使用我定義的類生成1000個不同的實例(在這些實例中的函數是相同的,但是細節函數的參數是不同的)。 sampler=generateClass()然後,我需要這些實例功能映射到我流。(來進行測試,只使用10個2實例)如何使用'for'循環在Spark-Streaming的DStream中進行轉換和輸出?

s=[] 
for i in range(10):   
    s.append(mappedStream.map(lambda x: sampler[i].insert(x)).reduce(min)) 

uStream=ssc.union(s[0],s[1],s[2],s[3],s[4],s[5],s[6],s[7],s[8],s[9]) 
uStream.pprint() 

但其輸出僅10相同的鍵值對,似乎這些代碼只是將我的數據映射到第一個實例,然後重複10次。

(85829323L, [2, 1]) 
(85829323L, [2, 1]) 
(85829323L, [2, 1]) 
(85829323L, [2, 1]) 
.... 

然後,我嘗試

myStream1=mappedStream.map(lambda x: sampler[0].insert(x)).reduce(min) 
myStream2=mappedStream.map(lambda x: sampler[1].insert(x)).reduce(min) 
ssc.union(myStream1,myStream2).pprint() 

輸出是正確的:

(85829323L, [2, 1]) 
(99580454L, [4, 1]) 

爲什麼出現這種情況?我該如何處理它?非常感謝你。

回答

0

發生這種情況是因爲python lambda的延遲評估以及您在s[0]上調用動作時是使用最後一個i參數來計算(您的情況爲9,它是最後一個循環值)。

可以使用函數發生器模式來「強制」使用適當的i,例如:

def call_sampler(i): 
    return lambda x: sampler[i].insert(x) 

s=[] 
for i in range(10):   
    s.append(mappedStream.map(call_sampler(i)).reduce(min)) 

uStream=ssc.union(s[0],s[1],s[2],s[3],s[4],s[5],s[6],s[7],s[8],s[9]) 
uStream.pprint()