我有一個使用kafka client 2.11: 0.10.2.1的小型Java火花服務。使用新的kafka客戶端獲取較早的kafka發佈的主題的分區信息時超時
以下是當我讀到最新從卡夫卡版本發表主題的正常工作代碼:
Properties props = new Properties();
props.put(org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, producerConfig.getBrokerConnectionString());
props.put(org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG, "all");
props.put(org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG, producerConfig.getRetry());
props.put(org.apache.kafka.clients.producer.ProducerConfig.BATCH_SIZE_CONFIG, producerConfig.getBatchSize());
props.put(org.apache.kafka.clients.producer.ProducerConfig.LINGER_MS_CONFIG, producerConfig.getLingerTimeInMs());
props.put(org.apache.kafka.clients.producer.ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, producerConfig.getRequestTimeout());
props.put(org.apache.kafka.clients.producer.ProducerConfig.MAX_BLOCK_MS_CONFIG, producerConfig.getMaxBlockMS());
props.put(org.apache.kafka.clients.producer.ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, producerConfig.getMaxIdleTime());
props.put(org.apache.kafka.clients.producer.ProducerConfig.BUFFER_MEMORY_CONFIG, maxBytesInBuffer/producerConfig.getProducersCount());
props.put(org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
producers = new Producer[1];
producers[0] = new KafkaProducer<>(props);
producers[0].partitionsFor("mYTopic").size();
有一個現有的卡夫卡的話題,其中卡夫卡的版本是0.8.2.x 。我想爲此使用相同的代碼。但是這段代碼在最後一行(partitionsFor
)中給出了超時,其中包含版本爲0.8.2.x的Kafka發佈的主題。任何在這方面的幫助將不勝感激。
:卡夫卡主題(通過0.8.2.x出版)不能通過0.10.2.1客戶
文檔http://kafka.apache.org/documentation/#upgrade表示0.10.2客戶端無法與0.8.2代理進行通信。 –