我正在嘗試使用自動確認功能使用Reactor Kafka實現卡夫卡主題分區的併發處理。這裏的文檔使它看起來這是可能的:如何使用自動確認功能按主題和分區同時處理反應器Kafka流?
http://projectreactor.io/docs/kafka/milestone/reference/#concurrent-ordered
說什麼我試圖是我使用自動應答之間的唯一區別。
我有以下代碼(相關法是receiveAuto
):
public class KafkaFluxFactory<K, V> {
private final Map<String, Object> properties;
public KafkaFluxFactory(Map<String, Object> properties) {
this.properties = properties;
}
public Flux<ConsumerRecord<K, V>> receiveAuto(Collection<String> topics, Scheduler scheduler) {
return KafkaReceiver.create(ReceiverOptions.create(properties).subscription(topics))
.receiveAutoAck()
.flatMap(flux -> flux.groupBy(this::extractTopicPartition))
.flatMap(topicPartitionFlux -> topicPartitionFlux.publishOn(scheduler));
}
private TopicPartition extractTopicPartition(ConsumerRecord<K, V> record) {
return new TopicPartition(record.topic(), record.partition());
}
}
當我使用它來創建消費者記錄從卡夫卡通量與並行調度器(Schedulers.newParallel("debug", 10)
),我看到他們全部結束在同一個線程上進行處理。
對我可能做錯什麼想法?
@ Mr.E.Gas你也可以在這裏查看我的答案https://stackoverflow.com/a/44588357/2055854,以更好地瞭解'Flux'如何工作 –
感謝您對此的意見;它並沒有解決我的問題,但是我發現了一種多方面的答案來回答我想要完成的事情,我會回答我的問題。 你的回答不能很好地解決我的問題的原因是它最終會並行處理GroupedFlux,因此可能會導致亂序處理,這對我的用例來說是不可取的。 –