2017-07-09 38 views
4

我在視頻教程中看到,當製作人發佈消息時,Kafka Broker支持3種類型的確認。Kafka - Producer Acknowledgegment

0 - 消防和忘記 1 - 領導確認 2 - 所有的經紀人

我使用卡夫卡的Java API發佈消息的確認。這是否必須爲每個代理使用每個代理特有的server.properties設置,還是必須由生產者設置?如果它必須由生產者設置,請解釋如何使用Java API設置它。

import org.apache.kafka.clients.producer.KafkaProducer; 
import org.apache.kafka.clients.producer.ProducerRecord; 
import org.apache.kafka.clients.producer.RecordMetadata; 

import java.util.Properties; 

public class KafkaProducerApp { 

    public static void main(String[] args){ 
     Properties properties = new Properties(); 
     properties.put("bootstrap.servers","localhost:9092,localhost:9093,localhost:9094"); 
     properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); 
     properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); 

     KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String, String>(properties); 

     try{ 
      for(int i=0;i<150;i++) { 
       RecordMetadata ack = kafkaProducer.send(new ProducerRecord<String, String>("replicated_topic", Integer.toString(i), "MyMessage" + Integer.toString(i))).get(); 
       System.out.println(" Offset = " + ack.offset()); 
       System.out.println(" Partition = " + ack.partition()); 
      } 
     } catch (Exception ex){ 
      ex.printStackTrace(); 
     } finally { 
      kafkaProducer.close(); 
     } 



    } 

} 
+0

其實我以爲它支持連續的值範圍:-1 ==「全部」==領導和所有in-sync-replicas,0 == fire-and-forget,1 ==只是領導,2 = =領導和一個更多的副本,3 ==領導和兩個更多副本,等等? –

+0

我想ack> 1從v0.9以後版本中刪除:https://cwiki.apache.org/confluence/display/KAFKA/KIP-1+-+Remove+support+of+request.required.acks –

回答

3

這是一個生產性和設置類似於你在你的代碼具有其它特性:

properties.put("acks","all"); 

所有配置的生產屬性的列表可以發現here

您可能還想看看與此生產者配置相關的經紀人(或主題)propertymin.insync.replicas

+0

謝謝lot @vahid –

+0

@vahid是否意味着當我們關閉生產者時,它實際上會等待,直到我們得到所有消息的確認信息?或者我們可以在下一次或下一次發送之前在代碼中查找某處。 –

+0

是的,來自KafkaProducer的[JavaDoc](https://github.com/apache/kafka/blob/e18335dd953107a61d89451932de33d33c0fd207/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L932) : '這種方法阻塞,直到所有以前發送的請求完成.' – vahid

相關問題