我最近開始嘗試使用kafka流。我有一個場景,我需要加入KStream
和KTable
。可能是KTable
不包含某些密鑰。在這種情況下,我得到一個NullPointerException
。當KTable丟失密鑰時,處理KStream與KTable的連接
具體我是越來越
流線[StreamThread-1]處理過程中Streams應用程序錯誤: 顯示java.lang.NullPointerException
我不知道我怎麼可以搞定。我不能以某種方式過濾掉不符合表條目的流的記錄。
更新
看遠一點,我發現我可以查詢底層的店裏找是否通過ReadOnlyKeyValueStore
接口存在的關鍵。
在這種情況下,我的問題是,這是最好的方式嗎?即根據本地存儲中是否存在密鑰來過濾要加入的流?
我在這種情況下,第二個問題是,因爲我在乎利用在下一階段10.2
版中引入的Global State Store
,我應該預料到我會也能以同樣的方式來查詢Global State Store
?
更新
上一次更新是不準確的,因爲它無法從拓撲
最後更新
內部查詢狀態存儲理解加入語義好一點後我能夠解決這個問題只是簡化valueJoiner
只返回結果,而不是對連接的值執行操作,並在連接後添加額外的過濾步驟t o過濾掉空值。
我還是有點困惑。你什麼時候準確得到'NullPointerException'?你使用什麼版本的卡夫卡?你使用什麼「類型」連接(即內連接或左連接)?另外看看這個:http://docs.confluent.io/current/streams/developer-guide.html#kstream-ktable-join –
你可能想自己回答你的問題,並接受你自己的答案。或畢竟刪除了這個問題:) –
我試圖在流上做一個'leftJoin'。我用我收到的信息更新了問題。我在'0.10.1'版本。我會妥善制定答案並提交。謝謝:) – LetsPlayYahtzee