2017-11-11 122 views
0

我正在使用Scala 2.11和Akka Streams Kafka 0.17。如何在構建Akka Streams Kafka中的ProducerMessage時配置偏移參數?

我有一個其中:

  • Source使用Source.actorRef創建。在這裏,演員計劃以固定間隔運行並持續生成消息,並將消息發送到流中。
  • 我已附加Producer作爲Flow。製片人推動ProducerMessage.Message成爲卡夫卡話題。
  • 一些數據庫操作。

我有一個問題,同時構建ProducerMessage.Message,它看起來像:

final case class Message[K, V, +PassThrough](
    record: ProducerRecord[K, V], 
    passThrough: PassThrough 
) 

我可以很容易地通過record參數,它包含實際的消息。但我不知道要通過passThrough參數。根據docs

passThrough字段可以認爲是通過在 傳遞Consumer#flow並且包括在Result的任何元件。當需要在下游操作上傳遞某個上下文時,這是有用的 。 這可以用unzip/zip完成,但這樣更方便。它可以是 例如是ConsumerMessage.CommittableOffsetConsumerMessage.CommittableOffsetBatch,可以在流程中稍後提交 。

在我的情況下,沒有任何卡夫卡消費者訂閱卡夫卡的話題和我流產生SourcecomittableSourceplainSource)。在這種情況下,我會按照文檔中所述通過消費者抵消。但就我而言,演員正在模擬這樣的消費者。這意味着我無權訪問ConsumerMessage.CommittableOffset。那麼我在這裏通過了什麼passThrough參數?這種情況下最好的做法是什麼?

回答

0

轉發我的問題後reactive-kafka團隊,我得到了我的答案。基本上,他們說的是,如果你沒有一個用例pass through任何東西,你可以嘗試將其設置爲NOTUSED,或者只是空字符串「」

另請注意,如果您使用的可能是Producer.plainSink,則無需構建ProducerMessage.Message。然後,您可以直接構建一個Kafka ProducerRecordProducerMessage.Message case class只是一個容器,用於需要或需要pass through的情況。除了要傳遞的元素之外,它只包含一個Kafka ProducerRecord

相關問題