2016-08-18 108 views
0

我有一個kafka流進入一些輸入主題。 這是我爲接受kafka流寫的代碼。如何結合兩個DStreams(pyspark)?

conf = SparkConf().setAppName(appname) 
sc = SparkContext(conf=conf) 
ssc = StreamingContext(sc) 
kvs = KafkaUtils.createDirectStream(ssc, topics,\ 
      {"metadata.broker.list": brokers}) 

然後我創建了兩個DStreams的鍵和值的原始流。

keys = kvs.map(lambda x: x[0].split(" ")) 
values = kvs.map(lambda x: x[1].split(" ")) 

然後我在值DStream中執行一些計算。 例如,

val = values.flatMap(lambda x: x*2) 

現在,我需要鑰匙和VAL DSTREAM相結合,在卡夫卡流的形式返回結果。

如何將val與corressponding鍵結合?

回答

0

您可以使用2個DStream中的join運算符來合併它們。 當你做地圖時,你基本上正在創建另一個流。所以,加入會幫助你將它們合併在一起。

如:

Joined_Stream = keys.join(values).(any operation like map, flatmap...) 
+0

我沒有得到這部分'(如地圖的任何操作,flatmap ...)',你能更詳細說明。 – vidhan

+0

我不明白你想要做的事情(我提供了合併2個DStreams的通用答案)。 的事情是,如果你做的值的平面地圖,沒有辦法將他們映射回鍵,因爲這將是一個扁平列表的輸出.... 通過合併2個Dstreams,你可以創建RDD的每個元素這兩個鍵和值,只是不會有一對一映射... –