1

鑑於:我在卡夫卡有兩個主題讓我們說主題A和主題B.卡夫卡流從主題A讀取記錄,處理它併產生多個記錄(比如說recordA和recordB)對應於消耗的記錄。現在,問題是如何使用Kafka Streams來實現這一點。卡夫卡流:一個記錄到多個記錄

KStream<String, List<Message>> producerStreams[] = recordStream.mapValues(new ValueMapper<Message, List<Message>>() { 
     @Override 
     public List<Message> apply(final Message message) { 
      return consumerRecordHandler.process(message); 
     } 
    }).*someFunction*() 

這裏,讀取的記錄是消息;處理後返回消息列表。我怎樣才能將這個列表分成兩個生產者流?任何幫助將不勝感激。

回答

5

我不是如果我正確理解了這個問題,並且我也不理解@Abhishek的答案:(

如果您有輸入流,並且您希望爲每個輸入記錄獲得零個,一個或多個輸出記錄,您將應用flatMap()flatMapValues()(具體取決於是否要修改密鑰)。

您也在問「如何將此列表分成兩個生產者流?」如果您的意思是將一個流拆分爲多個,則可以使用branch()

欲瞭解更多詳情,我指的是文檔:http://docs.confluent.io/current/streams/developer-guide.html#stateless-transformations

+0

@ user2538255如果我的答案不清楚,請隨時跟進。 –

+0

這就是我正在做的。在搜索了Abhishek的答案之後,我登陸了這個例子https://github.com/confluentinc/examples/blob/kafka-0.10.0.0-cp-3.0.0/kafka-streams/src/test/java/io/ confluent/examples/streams/WordCountLambdaIntegrationTest.java – user2538255

+0

已經接受了正確的答案:)謝謝:) – user2538255

2

你的鑰匙是什麼?我猜測它不是String。執行mapValues後,您將擁有這個 - KStream<K,List<Message>>。如果K不是String然後someFunction()可以是mapK轉換成String(如果是,你已經有了結果),並離開List<Message>(值)不變,因爲這是你想要的最終結果

+0

是啊,這works..thanks一噸! – user2538255