我正在使用Scala 2.11和Akka Streams Kafka 0.17。如何在構建Akka Streams Kafka中的ProducerMessage時配置偏移參數?
我有一個流其中:
- 甲
Source
使用Source.actorRef
創建。在這裏,演員計劃以固定間隔運行並持續生成消息,並將消息發送到流中。 - 我已附加
Producer
作爲Flow
。製片人推動ProducerMessage.Message
成爲卡夫卡話題。 - 一些數據庫操作。
我有一個問題,同時構建ProducerMessage.Message
,它看起來像:
final case class Message[K, V, +PassThrough](
record: ProducerRecord[K, V],
passThrough: PassThrough
)
我可以很容易地通過record
參數,它包含實際的消息。但我不知道要通過passThrough
參數。根據docs:
的
passThrough
字段可以認爲是通過在 傳遞Consumer#flow
並且包括在Result
的任何元件。當需要在下游操作上傳遞某個上下文時,這是有用的 。 這可以用unzip/zip完成,但這樣更方便。它可以是 例如是ConsumerMessage.CommittableOffset
或ConsumerMessage.CommittableOffsetBatch
,可以在流程中稍後提交 。
在我的情況下,沒有任何卡夫卡消費者訂閱卡夫卡的話題和我流產生Source
(comittableSource
或plainSource
)。在這種情況下,我會按照文檔中所述通過消費者抵消。但就我而言,演員正在模擬這樣的消費者。這意味着我無權訪問ConsumerMessage.CommittableOffset
。那麼我在這裏通過了什麼passThrough
參數?這種情況下最好的做法是什麼?