我一直在與卡夫卡消費者和生產者api工作了一段時間,並想嘗試我的手在流api。我在網上查了很多參考資料,但我無法弄清楚這一件簡單的事情。卡夫卡輸出流
如何製作只將消息發送到輸出主題的KStream。
拿他們在github上的這個最基本的例子來回答: https://github.com/confluentinc/examples/blob/3.2.x/kafka-streams/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java它從一個隊列中取得消息,並在操作它之後將它們發佈到另一個隊列。
事情是這樣的:
final KStreamBuilder builder = new KStreamBuilder();
final KStream<String, String> textLines = builder.stream();
// do the dirty work...
textLines.to("outputTopic")
但builder.stream();
不存在,它需要至少是一個輸入主題名稱。
我應該堅持一個普通的卡夫卡製作人嗎?如果是這樣的話,我一直沒有找到明確說明這一點的資源。
我想知道,你想在「骯髒的工作」部分做什麼。我想了解您的用例場景。 –
我有數據在窗體外部來源。其中一些需要在執行一些過濾之後發送到特定主題。我的想法是利用kstream,因爲將來也可能通過kafka發送消息,因此我可以輕鬆地重構它以使用輸入主題。現在不需要現在做一個提供商,後來移動。 – Rhed
您可以使用Kafka Connect進行此操作 - 它允許執行「單個消息轉換」,包括過濾:http://kafka.apache.org/documentation/#connect_transforms –