kafka-consumer-api

    1熱度

    4回答

    我正在使用Scala中的Spark來消費和處理卡夫卡消費者應用程序中的消息。有時,處理來自Kafka消息隊列的消息比平時花費更多的時間。那時候我需要消費最新的信息,而忽略那些已經由製片人發行但尚未消費的早期信息。 這裏是我的消費者代碼: object KafkaSparkConsumer extends MessageProcessor { def main(args: scala.Array

    1熱度

    2回答

    我正在設計高吞吐量系統,我要有幾個生產者。 我的主題將被分區。製作人將以鍵值對的形式發送記錄。 鍵將用於分區數據。 消費者將組織在消費者羣體中(他們將被分配相同的羣組ID,以便他們可以同時消費來自同一主題的消息,但來自不同的分區)。 卡夫卡保證單個分區內的消息順序。 消費者將被分配他們公平份額的分區。 唯一令我擔心的是,我的分區密鑰不會以循環方式分發消息,有些分區可能比其他分區更繁忙。 問:可能存

    4熱度

    1回答

    我是新來的卡夫卡在以前的分區中的偏移的樣子。目前我這個Channel Consumer example從匯合公司的Github上回購 據我所知試驗,消費者可分爲多個組。每個組在分區中都有自己的偏移量。假設我在特定主題中有40條消息,我們稱之爲owner_commands。屬於狗羣的消費者加入並開始消費這40條消息。 當我斷開並重新連接這種消費,我注意到,消息顯示不出來了。它說我已經到達文件的末尾。

    1熱度

    1回答

    我想知道什麼對我最好:卡夫卡流或卡夫卡消費者API或卡夫卡連接? 我想從主題讀取數據,然後做一些處理並寫入數據庫。所以我寫了消費者,但我覺得我可以編寫Kafka流應用程序,並使用它的有狀態處理器執行任何更改並將其寫入數據庫,這可以消除我的使用者代碼,並且只需編寫db代碼。 數據庫我想插入我的記錄是: HDFS - (插入生JSON) MSSQL - (處理JSON) 另一種選擇是卡夫卡連接,但我發

    0熱度

    2回答

    我從那裏我讀出的數據如下卡夫卡隊列: private static void startKafkaConsumerStream() { try { System.out.println("Print method: startKafkaConsumerStream"); Dataset<String> lines = (Dataset<String>)

    1熱度

    2回答

    我試圖用卡夫卡與斯卡拉 下面是我在Java代碼中,其工作完全正常流流scala代碼會引發編譯錯誤。類型不匹配預期:ForEachAction [>字符串,>字符串],實際((任意,任意),單位) 未發現:價值關鍵 未發現:值值 有誰知道如何使用流API中scala

    1熱度

    1回答

    多輸出我有一個具有把一個項目作爲輸入卡夫卡流處理器,和產生多個項作爲輸出。 什麼是正確的編碼方式?多次撥打電話this.context().forward(key, item)是正確的做法,還是有另一個作弊? 謝謝。

    1熱度

    2回答

    我已經爲卡夫卡中的主題設置了TTL爲7天,我從Kafka獲取數據並將其存儲在數據庫中,但是從最近5天我的數據庫服務器已關閉,現在我必須從Kafka獲取最近5天的消息並將它們存儲在數據庫中 注意:從過去5天起,Kafka沒有問題。

    5熱度

    1回答

    所以我試圖獲得使用Kafka流的交互式查詢。我有Zookeeper和Kafka在本地運行(在Windows上)。我在哪裏使用C:\ temp作爲存儲文件夾,適用於Zookeeper和Kafka。 我已經安裝這樣 kafka-topics.bat --zookeeper localhost:2181 --create --replication-factor 1 --partitions 1 --t

    4熱度

    2回答

    我有麻煩連接從我的主機(Windows)到客戶(Linux),我安裝了卡夫卡。 我已經設置了一個VM(帶有VirtualBox),我安裝了Confluent工具。在此VM中,我運行以下命令: confluent start schema-registry 它啓動zookeeper,kafka和模式註冊表。 在這個虛擬機,我可以運行 kafka-console-producer --broker