apache-kafka-streams

    2熱度

    1回答

    我正在建立KTable的輸入主題,我加入了兩個Kafka流應用程序實例的KStream。 KTable的輸入主題已經是日誌壓縮主題。因此,當我的應用程序實例中的一個出現故障時,另一個實例狀態存儲庫似乎通過從輸入日誌壓縮主題中讀取而刷新整個狀態。 因此,不需要爲我的KTable商店啓用日誌記錄(更改日誌)? 我的源輸入日誌壓縮的主題可能有數百萬條記錄,所以如果我在該KTable狀態存儲上啓用日誌記錄

    1熱度

    1回答

    我試圖創建卡夫卡流的leftJoin的正常工作約10條記錄,然後將其與引起的NullPointerException這樣的代碼的異常崩潰: private static KafkaStreams getKafkaStreams() { StreamsConfig streamsConfig = new StreamsConfig(getProperties()); KStrea

    0熱度

    2回答

    我試圖過濾出在給定(跳躍)時間窗口中長度爲T的密鑰出現頻率高於閾值N的任何消息。 例如,下面的流中: #time, key 0, A 1, B 2, A 3, C 4, D 5, A 6, B 7, C 8, C 9, D 10, A 11, D 12, D 13, D 14, D 15, D 和N=2和T=3,結果應該是 0, A 2, A 7, C 8

    0熱度

    1回答

    我有一個非常簡單的應用程序KafkaStreams。它看起來像這樣: input topic --> extract smth., update aggregate in the local state -> output topic 在開始的時候輸入的話題只有1分,一切工作順利。 但經過我在輸入題目增加分區的數量我觀察到的,而不是單一的更新我的應用程序的每個分區實例化,所以我的輸出主題有多個

    0熱度

    1回答

    有一種方法可以通過Kafka流對PIVOT/UNPIVOT(爆炸,轉移)流嗎? 如果我有 machineId ts VarName VarValue m1 2017-10-01 00:00:00 var1 1.0 m1 2017-10-01 00:00:00 var2 2.0 m2 2017-10-01 00:00:00 var1 3.0 m2 2017-10-01 00:00:00 va

    1熱度

    1回答

    在使用持久狀態存儲託管KafkaStream(0.10.2.1)實例的多個節點的部署中,推薦重新啓動所有方法同時避免重播整個狀態存儲變更日誌主題?這必須在不改變application.id的情況下完成,因爲我不想丟失我已經在州商店中擁有的數據。 我增加了session.timeout.ms,這樣當代理開始重新分配分區時,所有節點都會啓動,並避免調用KafkaStreams.stop來防止不必要的分

    0熱度

    1回答

    我已經定義以下在卡夫卡拓撲流 Operation 1 : input_stream ----> filter ----> window_processing ----> write_to_topic Operation 2 : input_stream ----> write_to_topic 我觀察到,這兩個操作正在由同一線程(即使我增加線程StreamsConfig.NUM_STREAM

    0熱度

    1回答

    我需要知道如何使用「爲」我的卡夫卡KStreams線環......下面是我的「for」循環需要被列入KStreams for (int i = 0; i < 6 ; i++) { try { textlines.flatMapValues(value -> Arrays.asList(value.split("\\},\\{"))); Thread.s

    2熱度

    2回答

    Kafka Streams引擎將一個分區映射到一個工作人員(即Java App),以便該分區中的所有消息都由該工作人員處理。我有以下情況,並試圖瞭解它是否仍然可行。 我有一個主題A(有3個分區)。發送給它的消息由Kafka隨機分區(即沒有密鑰)。我發送給它的消息有像下面 {carModel: "Honda", color: "Red", timeStampEpoch: 14334343342}

    0熱度

    1回答

    我有一個應用程序需要監聽多個不同的主題;每個主題都有獨立的消息處理邏輯。我曾經想過爲每個KafkaStreams實例使用相同的kafka屬性,但是我得到如下所示的錯誤。 錯誤 java.lang.IllegalArgumentException: Assigned partition my-topic-1 for non-subscribed topic regex pattern; subscr