0
我有一個配置流中提到,像這樣一個pyspark程序,可以接受多個卡夫卡:Pyspark多卡夫卡流覆蓋變量
[stream1]
server=10.0.0.1:9090
topic=log_topic
[stream2]
server=10.0.0.2:9090
topic=file_topic
所以我的代碼使用所提到的配置加載多個數據流是這樣的:
from configobj import ConfigObj
config = ConfigObj("my.conf")
for i, j in conf.iteritems():
stream = KafkaUtils.createStream(ssc, j['server'], "consumer_%s" % (i), {j['topic']: 1}).cache()
stream.pprint()
現在說,如果流1具有以下傳入內容:
aaaaa
aaaaa
aaaaa
...
和流有以下內容:
bbbbb
bbbbb
bbbbb
...
使用pprint功能,我期待看到下面的輸出:
-----------------------------
2017-09-13 16:54:32
-----------------------------
aaaaa
aaaaa
aaaaa
...
-----------------------------
2017-09-13 16:54:32
-----------------------------
bbbbb
bbbbb
bbbbb
...
但我看到下面的輸出:
-----------------------------
2017-09-13 16:54:32
-----------------------------
bbbbb
bbbbb
bbbbb
...
-----------------------------
2017-09-13 16:54:32
-----------------------------
bbbbb
bbbbb
bbbbb
...
我理解似乎有延遲加載或者在for循環的第二次迭代之後讀取變量stream
之後。任何人都可以讓我知道如何實現這一點,以便我可以在for循環中處理2個獨立的流。
謝謝!
張感謝您的回覆。但我不需要流的聯合,而是我想基於某些邏輯將兩個流的不同映射縮減函數應用於這兩個流。我需要分開的流。 –