kafka-consumer-api

    0熱度

    2回答

    我在我的框架中使用了卡夫卡生產者 - 消費者模型。在消費者端消費的記錄稍後被索引到elasticsearch上。在這裏,我有一個使用案例,如果ES停止運行,我將不得不暫停卡夫卡消費,直到ES啓動。一旦啓動,我需要恢復使用者並從我最後離開的位置消耗記錄。 我不認爲這可以通過@KafkaListener來實現。任何人都可以請給我一個這樣的解決方案?我發現我需要爲此編寫自己的KafkaListenerC

    1熱度

    1回答

    我正在使用Kafka 0.10.2.1羣集。我正在使用Kafka的offsetForTimes API來查找特定的偏移量,並希望在達到結束時間戳時跳出循環。 我的代碼是這樣的: //package kafka.ex.test; import java.util.*; import org.apache.kafka.clients.consumer.KafkaConsumer; imp

    1熱度

    1回答

    我正在建立一個卡夫卡消費者。我已經設置了類似於下面的恢復回調。我已啓用手動提交。我如何在恢復回調方法中確認消息,以免發生延遲。 @Bean public ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> kafkaListenerContainerFactory() { Concurren

    0熱度

    1回答

    我目前正在使用R包kafka - rkafka。每當我從卡夫卡讀取時,我都會收到「信息:記得在完成閱讀信息後關閉消費者」。我如何避免這種情況? >library(rkafka) >consumer<-rkafka.createConsumer("1.2.3.4:2181","Real-time-data", consumerTimeoutMs = "-1") >rkafka.read(cons

    1熱度

    1回答

    上述應從單個主題消費的卡夫卡消費者。因爲我整合卡夫卡消費者API與彈簧芯web應用程序我不能使用彈簧啓動.. Spring的XML配置如下 <bean id="kafkaConsumerProperties" class="com.azuga.kafka.listeners.KafkaConsumerProperties"> <constructor-arg type="java.lan

    1熱度

    1回答

    我正在使用Spring Kafka使用者讀取關於Kafka主題的消息。我堅持Oracle數據庫。無論何時出現數據庫連接錯誤,我想執行重試。我正在使用Spring JDBC連接到Oracle DB。如果需要僅重試JDBC連接問題,那麼需要添加哪些異常類列表。 private static Map<Class<? extends Throwable>, Boolean> retryableExcept

    0熱度

    1回答

    我需要處理產品ID的順序,並計劃使用卡夫卡爲此,現在如果數據丟失從卡夫卡或我的代碼我有所有這些產品ID在可能數據庫,如果記錄沒有在給定的時間內處理可以說24小時我需要重新發布他們在一個隊列中,但優先考慮,因爲kafka沒有隊列中的數據的優先概念我可以有另一個隊列,可以充當優先隊列。 我面臨的問題是我需要在優先隊列中排序產品。因此,如果我已經在基於散列的分區中進行分發,並且我的客戶再次處理消息,即爲

    0熱度

    1回答

    我想在一個Spring引導應用程序中創建多個Kafka使用者組以處理不同的Kafka隊列。需求場景基於消息的臨界性,它應該推送到不同的Kafka隊列。爲了管理不同的卡夫卡隊列,我想創建一個專門的卡夫卡用戶組。但我不確定在一次春季啓動應用程序中是否可以創建多個卡夫卡消費羣。 目前我有三個卡夫卡主題,每個主題有4個分區,只有一個卡夫卡消費者組和三個卡夫卡消費者。這三名卡夫卡消費者從三個專用卡夫卡隊列中

    1熱度

    1回答

    我正在建設一個春季卡夫卡消費者。我已經設置了重試機制。重試完畢後,我想將失敗的消息推送給死信主題。 Listen方法具有以下參數 public void listen(@Payload Map<String, Object> conciseMap, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition, Ackn

    1熱度

    1回答

    我正在調查kafka溪流。我想過濾我的流,使用選擇性非常低的過濾器(幾千個之一)。我正在看這種方法: https://kafka.apache.org/0100/javadoc/org/apache/kafka/streams/kstream/KStream.html#filter(org.apache.kafka.streams.kstream.Predicate) 但我找不到任何證據,如果過濾