我有一個與StateStore交互的處理器,用於過濾消息並對消息執行復雜的邏輯。在process(key,value)
方法中,我使用context.forward(key,value)
來發送我需要的鍵和值。出於調試的目的,我也打印這些。如何用處理器使用Kafka Stream DSL過濾密鑰和數值
我有一個KStream mergedStream
,這是由兩個其他流的連接產生的。我想將處理器應用於該流的記錄。我通過以下方式實現:mergedStream.process(myprocessor,"stateStoreName")
當我啓動該程序時,可以看到要打印到控制檯的正確值。但是,如果我使用mergedStream.to("topic")
將mergedStream發送到主題,則該主題上的值不是我在處理器中轉發的值,而是原始值。
我使用kafka-streams 0.10.1.0。
什麼是最好的方式來獲取我已經在處理器轉發到另一個流的值?
是否可以將Processor API與KStream DSL創建的流混合?