鑑於:我在卡夫卡有兩個主題讓我們說主題A和主題B.卡夫卡流從主題A讀取記錄,處理它併產生多個記錄(比如說recordA和recordB)對應於消耗的記錄。現在,問題是如何使用Kafka Streams來實現這一點。卡夫卡流:一個記錄到多個記錄
KStream<String, List<Message>> producerStreams[] = recordStream.mapValues(new ValueMapper<Message, List<Message>>() {
@Override
public List<Message> apply(final Message message) {
return consumerRecordHandler.process(message);
}
}).*someFunction*()
這裏,讀取的記錄是消息;處理後返回消息列表。我怎樣才能將這個列表分成兩個生產者流?任何幫助將不勝感激。
@ user2538255如果我的答案不清楚,請隨時跟進。 –
這就是我正在做的。在搜索了Abhishek的答案之後,我登陸了這個例子https://github.com/confluentinc/examples/blob/kafka-0.10.0.0-cp-3.0.0/kafka-streams/src/test/java/io/ confluent/examples/streams/WordCountLambdaIntegrationTest.java – user2538255
已經接受了正確的答案:)謝謝:) – user2538255