我想在收到消息時得到經紀人的回覆。 我曾嘗試使用CallBack機制(通過實現CallBack)在KafkaProducer.send
中使用,但它沒有工作,並沒有調用onCompletion
方法。
當我關閉Kafka服務器並嘗試生成消息時,它確實調用了回調方法。如果消息是由製片人製作的,如何獲得卡夫卡經紀人的確認?
是否有任何其他方式獲得承認?
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
long elapsedTime = System.currentTimeMillis() - startTime;
System.out.println("Called Callback method");
if (metadata != null) {
System.out.println("message(" + key + ", " + message
+ ") sent to partition(" + metadata.partition() + "), "
+ "offset(" + metadata.offset() + ") in " + elapsedTime
+ " ms");
} else {
exception.printStackTrace();
}
}
props.put("bootstrap.servers", "localhost:9092");
props.put("client.id", "mytopic");
props.put("key.serializer", org.apache.kafka.common.serialization.StringSerializer.class);
props.put("value.serializer", org.apache.kafka.common.serialization.ByteArraySerializer.class);
KafkaProducer<String, byte[]> producer = new KafkaProducer<String,byte[]>(props);
long runtime = new Date().getTime();
String ip = "192.168.2."+ rnd.nextInt(255);
String msg = runtime + ".www.ppop.com," + ip;
producer.send(new ProducerRecord<String, byte[]>("mytopic", msg.getBytes()), `new TransCallBack(Calendar.getInstance().getTimeInMillis(), key, msg));`
我正在使用kafka-client api 0.9.1與broker版本0.8.2。
你可以顯示你用來實現回調的代碼嗎?如果這個方法沒有被調用,那麼問題一定是在其他地方。 – morganw09dev
我剛剛更新我的經紀人到0.9.0.0,它開始工作。現在的問題是,新的api將不會在0.8.2.x版本上調用Callback? – usman
看到我的答案,但再次。向我們展示您用來向Kafka發送數據的代碼,這可能是您問題的根源。不建議使用不同的Kafka版本。 – morganw09dev