2016-08-17 59 views
1

假設我有一個名爲SensorData的卡夫卡主題,兩個傳感器S1和S2正在向兩個不同的分區發送數據(時間戳和值),例如S1→P1和S2→P2。現在我需要分別彙總這兩個傳感器的值,假設計算1小時時間窗內的平均傳感器值並將其寫入新主題SensorData1Hour。在這種情況下在Apache Kafka流中的特定分區上的聚合

  1. 如何使用KStreamBuilder#stream方法選擇特定主題分區?
  2. 是否可以在同一主題的兩個(多個)不同分區上應用某種聚合函數?

回答

3

您不能(直接)訪問單個分區,也不能(直接)將聚合函數應用於多個分區。

聚集總是每key完成:http://docs.confluent.io/current/streams/developer-guide.html#stateful-transformations

  1. 因此,你可以使用不同的密鑰爲每個分區和不是總體的關鍵。請參閱http://docs.confluent.io/current/streams/developer-guide.html#windowing-a-stream

最簡單的方法是讓每個生產者立即爲每條消息應用一個密鑰。

  1. 如果您想要聚合多個分區,首先需要設置一個新密鑰(例如,使用selectKey())併爲所有要聚合的數據設置相同的密鑰(如果您想要聚合所有分區,您只需使用一個鍵值 - 但請記住,這可能很快就會成爲瓶頸!)。
+0

感謝您的回覆。我會嘗試這一個。是否有任何可用於aggregateByKey的入門代碼示例? – Samy

+0

http://docs.confluent.io/3.0.0/streams/developer-guide.html#code-examples –

+0

@Samy這是否回答你的問題?如果是的話,隨時接受和/或upvote。 –