2017-04-12 173 views
1

我一直在與卡夫卡消費者和生產者api工作了一段時間,並想嘗試我的手在流api。我在網上查了很多參考資料,但我無法弄清楚這一件簡單的事情。卡夫卡輸出流

如何製作只將消息發送到輸出主題的KStream。

拿他們在github上的這個最基本的例子來回答: https://github.com/confluentinc/examples/blob/3.2.x/kafka-streams/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java它從一個隊列中取得消息,並在操作它之後將它們發佈到另一個隊列。

事情是這樣的:

final KStreamBuilder builder = new KStreamBuilder(); 
final KStream<String, String> textLines = builder.stream(); 
// do the dirty work... 
textLines.to("outputTopic") 

builder.stream();不存在,它需要至少是一個輸入主題名稱。

我應該堅持一個普通的卡夫卡製作人嗎?如果是這樣的話,我一直沒有找到明確說明這一點的資源。

+0

我想知道,你想在「骯髒的工作」部分做什麼。我想了解您的用例場景。 –

+0

我有數據在窗體外部來源。其中一些需要在執行一些過濾之後發送到特定主題。我的想法是利用kstream,因爲將來也可能通過kafka發送消息,因此我可以輕鬆地重構它以使用輸入主題。現在不需要現在做一個提供商,後來移動。 – Rhed

+0

您可以使用Kafka Connect進行此操作 - 它允許執行「單個消息轉換」,包括過濾:http://kafka.apache.org/documentation/#connect_transforms –

回答

2

Kafka Streams API旨在將主題用作輸入流,處理記錄並將結果寫回主題。它不是爲了向卡夫卡寫數據而設計的。

所以是的,如果你想寫數據到一個主題,你應該使用KafkaProducer

+0

這是否有任何具體原因?只是想知道是否有一些基本的東西我錯過了API。儘管如此,我還是期待着。 – Rhed

+0

Streams旨在處理存儲在Kafka中的數據:)但是,您可以使用Kafka Connect進行此操作 - 它允許執行「單個消息轉換」,包括過濾:kafka.apache.org/documentation/#connect_transforms –