2017-08-06 60 views
2

如何發送kafka中的同步消息?
實現它的一種方法可以通過設置屬性參數
max.in.flight.requests.per.connection = 1在kafka中發送同步消息?

但我想知道是否有以kafka發送同步消息的直接或替代方式。 (類似producer.syncSend(...)等)。

+0

你能解釋你爲什麼要這麼做嗎?如果這是關於消息順序保證的話,這可能會更復雜一些(並且同步發送它們並不會真正改變那裏的東西)。 – Thilo

+0

由同一個生產者發送到同一主題中同一分區的消息將保留該順序。跨多個分區,主題或生產者沒有訂單保證。如果需要,您必須在應用程序代碼中的消費者端安排消息(例如通過查看消息時間戳)。 – Thilo

+0

@Thilo,這引起我另一個問題。每個分區或每個主題或每個生產者是否批量生產?或者這些的一些組合。 ? – joveny

回答

4

生產者API從send返回Future。您可以撥打Future#get以阻止發送完成。

看到這個example from the Javadocs

如果要模擬一個簡單的阻塞調用你可以調用立即get()方法:

byte[] key = "key".getBytes(); 
byte[] value = "value".getBytes(); 
ProducerRecord<byte[],byte[]> record = 
    new ProducerRecord<byte[],byte[]>("my-topic", key, value) 
producer.send(record).get(); 
1

的蒂洛提出的答案是必經之路走。通常,關於使用max.in.flight.requests.per.connection = 1的建議用於啓用仍然重試的消息,但不會丟失消息排序。它不是用於擁有同步製作者。