如何發送kafka中的同步消息?
實現它的一種方法可以通過設置屬性參數
max.in.flight.requests.per.connection = 1
。在kafka中發送同步消息?
但我想知道是否有以kafka發送同步消息的直接或替代方式。 (類似producer.syncSend(...)等)。
如何發送kafka中的同步消息?
實現它的一種方法可以通過設置屬性參數
max.in.flight.requests.per.connection = 1
。在kafka中發送同步消息?
但我想知道是否有以kafka發送同步消息的直接或替代方式。 (類似producer.syncSend(...)等)。
生產者API從send
返回Future
。您可以撥打Future#get
以阻止發送完成。
看到這個example from the Javadocs:
如果要模擬一個簡單的阻塞調用你可以調用立即get()方法:
byte[] key = "key".getBytes();
byte[] value = "value".getBytes();
ProducerRecord<byte[],byte[]> record =
new ProducerRecord<byte[],byte[]>("my-topic", key, value)
producer.send(record).get();
的蒂洛提出的答案是必經之路走。通常,關於使用max.in.flight.requests.per.connection = 1的建議用於啓用仍然重試的消息,但不會丟失消息排序。它不是用於擁有同步製作者。
你能解釋你爲什麼要這麼做嗎?如果這是關於消息順序保證的話,這可能會更復雜一些(並且同步發送它們並不會真正改變那裏的東西)。 – Thilo
由同一個生產者發送到同一主題中同一分區的消息將保留該順序。跨多個分區,主題或生產者沒有訂單保證。如果需要,您必須在應用程序代碼中的消費者端安排消息(例如通過查看消息時間戳)。 – Thilo
@Thilo,這引起我另一個問題。每個分區或每個主題或每個生產者是否批量生產?或者這些的一些組合。 ? – joveny