2017-08-23 40 views
3

我是Kafka Streams的新手,我一直在閱讀documentation如何設置Kafka Streams應用程序。Kafka Streams - 在Streams應用程序中存儲在內存和磁盤中的內容

但我不清楚,數據是如何處理的 - 內存中存儲了什麼以及磁盤上存儲了什麼。我已經看到RocksDB在某處提到過,但不在流文檔中。

我想解決的問題如下。我有兩個Kafka主題,這兩個鍵值存儲類型都保留每個鍵的最舊值。在我的流應用程序中,我想要加入這兩個主題並將連接輸出回kafka,稍後可以使用某些接收器。我擔心的是不清楚如何進行連接。這兩個主題都會有GB數據,因此沒有機會適合Streams App內存。

回答

1

您可以閱讀每個主題爲KTable,做一個表,表連接:

KTable table1 = builder.table("topic-1"); 
KTable table2 = builder.table("topic-2"); 

KTable joinResult = table1.join(table2, ...); 
joinResult.to("output-topic"); 

有關詳細信息,請參閱: http://docs.confluent.io/current/streams/developer-guide.html#ktable-ktable-join 還檢查了例子:https://github.com/confluentinc/examples/tree/3.3.0-post/kafka-streams

運行時,無論是主題將在RocksDB國有商店中實現。 RocksDB能夠溢出到磁盤。另請注意,單個RocksDB實例只需要保存單個輸入分區的數據。比較http://docs.confluent.io/current/streams/architecture.html#parallelism-model

+0

並澄清「兩個主題都會有GB數據,因此沒有機會適合Streams應用內存。」:在Kafka Streams中使用RocksDB的關鍵動機是能夠管理狀態(連接是有狀態操作),其大小大於可用主存(RAM /堆)的大小。這就是爲什麼您可以使用GB或甚至TB數據進行連接。 –

+1

感謝Matthias和Michael。我們在談論什麼機器仍然不清楚。比方說,我有Kafka節點A,B和C以及一個運行在X上的Kafka Stream應用程序,它完全符合您的示例代碼顯示的內容。因此,X必須在RocksDB本地保存這兩個主題的數據,還是隻提供加入機制,而服務器X並沒有太多的工作,所有的加入工作都通過RockDB與Kafka節點A,B和C連接他們保存所有處理的數據以及主題數據? – eddyP23

+2

所有處理都發生在應用程序節點X上(即,RocksDB將在X上,並且連接計算在X上發生)。經紀人(A,B,C)不參與加入(他們甚至不知道有一個Kafka Streams應用程序 - 對經紀人來說,它看起來像任何其他消費者)。順便說一句:RocksDB不需要保存「全部」數據:因爲這兩個主題都應該壓縮主題RocksDB只需要爲每個鍵保存最新的鍵值對。 –

相關問題