kafka-consumer-api

    0熱度

    1回答

    我在AWS上配置了兩個節點的Kafka集羣,目前正在測試其性能屬性。 我使用kafka-consumer-perf-test.sh從單個線程讀取來自Kafka主題的5000萬條消息。 我在測試消費者吞吐量時觀察到以下情況。 觀察1 上m4.large EC2實例 單個消費者 - 讀吞吐量40.2MB /秒 3名消費者在三個獨立的m4.large EC2實例 - 個人讀取吞吐量 - 40.25MB

    0熱度

    2回答

    我在AWS上配置了複製因子爲2的雙節點六分區Kafka集羣。每個Kafka節點在由EBS支持的m4.2xlarge EC2實例上運行。 據我所知,卡夫卡生產者向卡夫卡經紀人傳輸的數據流量受制造商網絡帶寬的限制。 表示Kafka生產者和代理之間的網絡帶寬爲1Gbps(約125 MB/s),Kafka代理和存儲之間(EC2實例和EBS卷之間)的帶寬爲1 Gbps。 我使用org.apache.kafk

    1熱度

    1回答

    我使用卡夫卡版本0.10.2.1和春季啓動我的項目。 我具有可以通過多個消費者消耗一個話題的5個分區(具有相同組ID),這些不同的機器上運行。 我面對什麼問題是: 我收到重複這些卡夫卡警告日誌讀取單個消息的 Auto offset commit failed for group my-consumer-group: Commit cannot be completed since the grou

    3熱度

    1回答

    屬於同一消費羣組的多個消費者是否可以同時從分區讀取數據。 我猜想不是,以避免處理相同的消息不止一次。 是的,如果消費者來自不同的羣體,那麼是的,多個消費者可以毫無困難地從相同的分區讀取數據。 (A SOF question is already posted on this)。 但是我特別要問一下同一個分區的消費者。卡夫卡允許嗎?

    0熱度

    3回答

    我知道在Apache Kafka中,我可以編寫Producer和Partitioner,使TypeA的消息進入PartitionA,TypeB的消息進入PartitionB。我還可以編寫Consumer/ConsumerGroup,使消費者/ ConsumerGroupA僅消費PartitionA,Consumer/ConsumerGroupB僅使用assign()從PartitionB消費。 但

    3熱度

    1回答

    下面兩個代碼片段發佈消息的行爲有什麼不同? 方法1 Message<String> message = MessageBuilder.withPayload("testmsg") .setHeader(KafkaHeaders.MESSAGE_KEY, "key").setHeader(KafkaHeaders.TOPIC, "test").build(); ListenableF

    2熱度

    2回答

    如上所述。假設我有3個經紀人。當我作爲消費者聯繫時,其中一個經紀人成爲集團協調員。然後我殺了一個經紀人(或者死了)。如果我嘗試重新連接到經紀人,我會得到協調員不可用的錯誤。 卡夫卡如何知道經紀人死亡以及指定新協調員需要多長時間?它是如何配置的? 這應該在文檔中,但我找不到它。

    0熱度

    2回答

    代碼(通過火花流運行)設立消費,創建後的道具對象 val consumer = new KafkaConsumer[String, String](props) consumer.subscribe(util.Arrays.asList(topic)) 代碼擁有自營如下 package main.scala import org.apache.spark.SparkContext imp

    2熱度

    1回答

    我們已經將我們的卡夫卡經紀人升級到0.10.2.1,同時也更新了日誌消息格式版本。 但我們從測試中知道,如果卡夫卡消費者與前10版本的客戶端訂閱在升級後的卡夫卡經紀人的主題,它會導致非常高的CPU上揚,造成了嚴重的服務質量下降。 我們是否有可能在卡夫卡經紀人本身設置配置,使得舊版卡夫卡消費者客戶被拒絕?

    0熱度

    1回答

    我的卡夫卡消費者代碼如下所示,我只有一個消費者! Properties consumerConfig = new Properties(); consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "sandbox.hortonworks.com:6667"); consumerConfig.put(Consu