2016-11-26 58 views
3

我有一個與StateStore交互的處理器,用於過濾消息並對消息執行復雜的邏輯。在process(key,value)方法中,我使用context.forward(key,value)來發送我需要的鍵和值。出於調試的目的,我也打印這些。如何用處理器使用Kafka Stream DSL過濾密鑰和數值

我有一個KStream mergedStream,這是由兩個其他流的連接產生的。我想將處理器應用於該流的記錄。我通過以下方式實現:mergedStream.process(myprocessor,"stateStoreName")

當我啓動該程序時,可以看到要打印到控制檯的正確值。但是,如果我使用mergedStream.to("topic")將mergedStream發送到主題,則該主題上的值不是我在處理器中轉發的值,而是原始值。

我使用kafka-streams 0.10.1.0。

什麼是最好的方式來獲取我已經在處理器轉發到另一個流的值?

是否可以將Processor APIKStream DSL創建的流混合?

回答

9

簡稱:

解決你的問題,你可以使用transform(...)代替process(...),讓你訪問處理器API DSL內,太。

如果使用process(...)應用處理器流 - 然而,這是一個「終止」(或吸收)操作(它的返回類型是void),即,它不返回任何結果(這裏是「匯」並不只意味着運營商沒有繼任者 - 這並不意味着任何結果的地方寫的)

此外,如果你打電話mergedStream.process(...)mergedStream.to(...)你基本上分支定複製您的數據流並向每個下游運營商發送一個副本(即,一個副本到process a找一份to

混合DSL和處理器API是絕對有可能的(你已經做到了;))。但是,如果使用process(...),則無法在DSL中使用數據forward(...) - 如果要使用處理器API結果,則可以使用transform(...)而不是process(...)

相關問題