2017-09-13 122 views
0

我有一個配置流中提到,像這樣一個pyspark程序,可以接受多個卡夫卡:Pyspark多卡夫卡流覆蓋變量

[stream1] 
server=10.0.0.1:9090 
topic=log_topic 

[stream2] 
server=10.0.0.2:9090 
topic=file_topic 

所以我的代碼使用所提到的配置加載多個數據流是這樣的:

from configobj import ConfigObj 
config = ConfigObj("my.conf") 
for i, j in conf.iteritems(): 
    stream = KafkaUtils.createStream(ssc, j['server'], "consumer_%s" % (i), {j['topic']: 1}).cache() 
    stream.pprint() 

現在說,如果流1具有以下傳入內容:

aaaaa 
aaaaa 
aaaaa 
... 

和流有以下內容:

bbbbb 
bbbbb 
bbbbb 
... 

使用pprint功能,我期待看到下面的輸出:

----------------------------- 
2017-09-13 16:54:32 
----------------------------- 
aaaaa 
aaaaa 
aaaaa 
... 

----------------------------- 
2017-09-13 16:54:32 
----------------------------- 
bbbbb 
bbbbb 
bbbbb 
... 

但我看到下面的輸出:

----------------------------- 
2017-09-13 16:54:32 
----------------------------- 
bbbbb 
bbbbb 
bbbbb 
... 

----------------------------- 
2017-09-13 16:54:32 
----------------------------- 
bbbbb 
bbbbb 
bbbbb 
... 

我理解似乎有延遲加載或者在for循環的第二次迭代之後讀取變量stream之後。任何人都可以讓我知道如何實現這一點,以便我可以在for循環中處理2個獨立的流。

謝謝!

回答

0

喜歡的東西:

streams = [] 
config = ConfigObj("my.conf") 
for i, j in conf.iteritems(): 
    stream = KafkaUtils.createStream(ssc, j['server'], "consumer_%s" % (i), {j['topic']: 1}).cache() 
    streams.append(stream) 

all_topic = ssc.union(*streams)  
all_topic.pprint() 
+0

張感謝您的回覆。但我不需要流的聯合,而是我想基於某些邏輯將兩個流的不同映射縮減函數應用於這兩個流。我需要分開的流。 –