0

我正在嘗試使用自動確認功能使用Reactor Kafka實現卡夫卡主題分區的併發處理。這裏的文檔使它看起來這是可能的:如何使用自動確認功能按主題和分區同時處理反應器Kafka流?

http://projectreactor.io/docs/kafka/milestone/reference/#concurrent-ordered

說什麼我試圖是我使用自動應答之間的唯一區別。

我有以下代碼(相關法是receiveAuto):

public class KafkaFluxFactory<K, V> { 

    private final Map<String, Object> properties; 

    public KafkaFluxFactory(Map<String, Object> properties) { 
     this.properties = properties; 
    } 

    public Flux<ConsumerRecord<K, V>> receiveAuto(Collection<String> topics, Scheduler scheduler) { 
     return KafkaReceiver.create(ReceiverOptions.create(properties).subscription(topics)) 
      .receiveAutoAck() 
      .flatMap(flux -> flux.groupBy(this::extractTopicPartition)) 
      .flatMap(topicPartitionFlux -> topicPartitionFlux.publishOn(scheduler)); 
    } 

    private TopicPartition extractTopicPartition(ConsumerRecord<K, V> record) { 
     return new TopicPartition(record.topic(), record.partition()); 
    } 
} 

當我使用它來創建消費者記錄從卡夫卡通量與並行調度器(Schedulers.newParallel("debug", 10)),我看到他們全部結束在同一個線程上進行處理。

對我可能做錯什麼想法?

回答

0

我想它至少在你的消費者中按順序執行。要做到並行消費則應將您通量ParallelFlux

public ParallelFlux<ConsumerRecord<K, V>> receiveAuto(Collection<String> topics, Scheduler scheduler) { 
     return KafkaReceiver.create(ReceiverOptions.create(properties).subscription(topics)) 
      .receiveAutoAck() 
      .flatMap(flux -> flux.groupBy(this::extractTopicPartition)) 
      .flatMap(topicPartitionFlux -> topicPartitionFlux.parallel().runOn(Schedulers.parallel())); 
    } 

在你的消費功能後,如果你想在你應該使用的方法,如並行的方式來消耗:

void subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> 
      onError, Runnable onComplete, Consumer<? super Subscription> onSubscribe) 

或任何其他重載方法與Consumer<T super T> onNext參數。 如果你只是使用方法,下面你將在順序方式

void subscribe(Subscriber<? super T> s) 
+0

@ Mr.E.Gas你也可以在這裏查看我的答案https://stackoverflow.com/a/44588357/2055854,以更好地瞭解'Flux'如何工作 –

+0

感謝您對此的意見;它並沒有解決我的問題,但是我發現了一種多方面的答案來回答我想要完成的事情,我會回答我的問題。 你的回答不能很好地解決我的問題的原因是它最終會並行處理GroupedFlux,因此可能會導致亂序處理,這對我的用例來說是不可取的。 –

0

消耗流量相當多的試驗和錯誤的加的什麼,我想完成我意識到我是想解決兩個問題重新思考後在一個代碼位。

的兩件事情我需要的是:

  1. 按序卡夫卡分區的處理
  2. 能力並行每個分區

的處理。在試圖與這一塊的同時解決兩個代碼,我限制了下游用戶配置並行級別的能力。因此,我改變該方法返回GroupedFluxes的通量,其提供下游用戶與確定的正確粒度什麼是可並行:

public Flux<GroupedFlux<TopicPartition, ConsumerRecord<K, V>>> receiveAuto(Collection<String> topics) { 
    return KafkaReceiver.create(createReceiverOptions(topics)) 
     .receiveAutoAck() 
     .flatMap(flux -> flux.groupBy(this::extractTopicPartition)); 
} 

下游,用戶可以使用他們所希望的任何調度並行每個發射GroupedFlux:

public <V> void work(Flux<GroupedFlux<TopicPartition, V>> flux) { 
    flux.doOnNext(groupPublisher -> groupPublisher 
      .publishOn(Schedulers.elastic()) 
      .subscribe(this::doWork)) 
     .subscribe(); 
} 

這有理想的行爲處理每個TopicPartition-GroupedFlux按順序和平行於其他GroupedFluxes。

相關問題