2015-12-04 55 views
3

我想在收到消息時得到經紀人的回覆。 我曾嘗試使用CallBack機制(通過實現CallBack)在KafkaProducer.send中使用,但它沒有工作,並沒有調用onCompletion方法。

當我關閉Kafka服務器並嘗試生成消息時,它確實調用了回調方法。如果消息是由製片人製作的,如何獲得卡夫卡經紀人的確認?

是否有任何其他方式獲得承認?

@Override 
    public void onCompletion(RecordMetadata metadata, Exception exception) { 
     long elapsedTime = System.currentTimeMillis() - startTime; 
     System.out.println("Called Callback method"); 
     if (metadata != null) { 
      System.out.println("message(" + key + ", " + message 
        + ") sent to partition(" + metadata.partition() + "), " 
        + "offset(" + metadata.offset() + ") in " + elapsedTime 
        + " ms"); 
     } else { 
      exception.printStackTrace(); 
     } 

    } 

props.put("bootstrap.servers", "localhost:9092"); 
props.put("client.id", "mytopic"); 
props.put("key.serializer", org.apache.kafka.common.serialization.StringSerializer.class); 
props.put("value.serializer", org.apache.kafka.common.serialization.ByteArraySerializer.class); 

KafkaProducer<String, byte[]> producer = new KafkaProducer<String,byte[]>(props); 
long runtime = new Date().getTime(); 
String ip = "192.168.2."+ rnd.nextInt(255); 
String msg = runtime + ".www.ppop.com," + ip; 
producer.send(new ProducerRecord<String, byte[]>("mytopic", msg.getBytes()), `new TransCallBack(Calendar.getInstance().getTimeInMillis(), key, msg));` 

我正在使用kafka-client api 0.9.1與broker版本0.8.2。

+0

你可以顯示你用來實現回調的代碼嗎?如果這個方法沒有被調用,那麼問題一定是在其他地方。 – morganw09dev

+0

我剛剛更新我的經紀人到0.9.0.0,它開始工作。現在的問題是,新的api將不會在0.8.2.x版本上調用Callback? – usman

+0

看到我的答案,但再次。向我們展示您用來向Kafka發送數據的代碼,這可能是您問題的根源。不建議使用不同的Kafka版本。 – morganw09dev

回答

2

所以我並不是100%確定哪些版本適用於卡夫卡。目前我使用0.8.2,我知道0.9引入了一些突破性更改,但我無法確定現在哪些功能無法正常工作。

一個非常強烈的建議,我會使用對應於您的經紀人版本的卡夫卡客戶端版本。如果你使用經紀人0.8.2,我也會使用kakfa-client 0.8.2。

你從來沒有提出任何代碼如何使用這個,所以我只是在黑暗中猜測。但是我已經在生產者中使用this method在卡夫卡0.8.2中實現了回撥功能。以下是方法簽名。

public java.util.concurrent.Future<RecordMetadata> send(ProducerRecord<K,V> record, Callback callback) 

我將在哪裏調用該方法,我實際上是通過overriden方法傳遞類。

KafkaProducer<String, String> prod = new KafkaProducer<String, String>(props); 
ProducerRecord<String, String> record = //data to send to kafka 
prod.send(record, new Callback() { 
    @Override 
    public void onCompletion(RecordMetadata metadata, Exception e) { 
    if (e != null) { 
     e.printStackTrace(); 
    } else { 
     //implement logic here, or call another method to process metadata 
     System.out.println("Callback"); 
    } 
    } 
}); 

我假設有一種方法可以做到這一點,因爲你已經做到了。但是,您必須提供代碼,顯示您是如何實際向卡夫卡發送記錄的。除此之外,我只是猜測。

3

有一種簡單的方法可以在KafkaProducer使用kafka 0.9版發佈消息後從代理獲取信息。你可以調用get()方法返回一個對象RecordMetadata,你可以得到信息,如膠印,topicPartition,下面的代碼片段爲例:

RecordMetadata m = kafkaProducer.send(new ProducerRecord<byte[], byte[]>(
         topic, key.getBytes("UTF-8"), message 
           .getBytes("UTF-8"))).get(); 
System.out.println("Message produced, offset: " + m.offset()); 
System.out.println("Message produced, partition : " + m.partition()); 
System.out.println("Message produced, topic: " + m.topic()); 
+3

與異步相比get()顯着降低了性能 – chro

2

關閉生產者 - 一旦你完成所有發佈您的郵件 - 這樣的:

producer.close(); 

這確保了從回調下面的方法總是叫:

onCompletion(RecordMetadata metadata, Exception exception) 

注:我已經通過添加T線測試這o this sample Producer class,它的工作原理。

相關問題