apache-kafka-streams

    0熱度

    1回答

    我們正在測試一個節點發生故障的情況下,3比如卡夫卡集羣上,用2 複製因子刪除實例後後卡夫卡消費者錯誤,消費是不斷失敗。 消費者使用卡夫卡流讀取郵件 感謝 這是消費者的故障日誌: 08:09:19.667 [ComponentsActivityEventsStream-608b9f05-0911-4b14-a1b1-37247747686a-StreamThread-2] ERROR o.a.k.c

    2熱度

    1回答

    我是新來流處理(kafka流/ flink /風暴/火花/等),並試圖找出處理現實世界問題的最佳方法,代表這裏以玩具爲例。我們與Kafka綁定用於我們的pubsub/data攝入,但對流處理器框架/方法沒有特別的依賴。 理論上,假設我有一個零星發射浮點值的源。在任何給定的點上都有一個乘數M應該應用於這個源的值;但是M可以改變,而且批判地說,我可能只在晚些時候纔會發現這種變化 - 甚至可能不是「按照

    0熱度

    1回答

    我玩的卡夫卡流API(Kakfa版本:0.10.2.0)試圖做一個簡單的wordcount示例工作:Wordcount App gist。我同時運行生產者和消費者的控制檯: ./kafka-console-producer.sh -topic input-topic --broker-list localhost:9092 ./kafka-console-consumer.sh --topic o

    3熱度

    1回答

    屬於同一消費羣組的多個消費者是否可以同時從分區讀取數據。 我猜想不是,以避免處理相同的消息不止一次。 是的,如果消費者來自不同的羣體,那麼是的,多個消費者可以毫無困難地從相同的分區讀取數據。 (A SOF question is already posted on this)。 但是我特別要問一下同一個分區的消費者。卡夫卡允許嗎?

    0熱度

    1回答

    我想知道爲什麼卡夫卡2個Kstreams共劃分需要相同數量的兩個流分區如文檔在下面的網址給出: enter link description here

    0熱度

    1回答

    我有一個Kstream與空鍵和值JSON作爲 { 「booking_id」:12,...} 和GlobalKtable其中也有在空鍵和預約ID及其json payload.I想要以booking_id爲關鍵字執行連接。 我知道Kstream的keyvaluemapper,但是因爲這兩個鍵都是空的,我如何執行連接操作?

    1熱度

    1回答

    我有一個Kafka讀數流,我檢查是否超過了某個閾值。如果第一次超出,我只想傳播警報。爲了實現這一點,我首先計算新狀態,將新狀態分組到KGroupedStream中。然後減少到一個KTable,我檢查狀態是否改變(保持一個布爾值),並改變到更改日誌流並過濾狀態改變的記錄。 我的理論是,這應該工作,但不是每個狀態改變傳播到更新日誌流,但只是在一段時間內更新日誌流似乎更新(無法真正看到模式)。任何人都知

    0熱度

    1回答

    我們在我們的系統中使用kafka進行流處理。輸入數據\消息的結構非常複雜。所以如何定義輸入消息的結構。這是輸入數據\消息的最合適的結構和序列化機制。

    0熱度

    1回答

    我想知道kafka平臺是否適合以下工作。 我想要攝取一個完整的數據庫與多個表。一旦卡夫卡攝入,我想根據病情過濾表格。 我認爲使用Kafka流是一件容易的事情,但是被過濾器拒絕的消息會發生什麼? 如果以日期爲例,未來可能會遇到條件,那麼是否有機會再次過濾被拒絕的郵件以最終通過過濾器並進一步處理? 在給卡夫卡餵食之前過濾數據行是否更好? 謝謝。

    1熱度

    1回答

    我對卡夫卡和卡夫卡流很新,所以請忍受我。我想知道我是否在這條正確的軌道上。 我正在寫卡夫卡話題,並嘗試通過休息服務訪問數據。原始數據類型在訪問之前需要進行轉換。 我到目前爲止是一個將原始數據寫入主題的生產者。 1.)現在我想要流應用程序(應該是一個容器中運行的jar),只需要轉換我所需的形狀的數據。遵循這裏的物化視圖範例。 超過1.簡化版本) KStreamBuilder builder = ne