我正在使用Akka Streams Kafka將卡夫卡消息傳遞給遠程服務。我希望保證該服務每次只收到一條消息(至少一次,最多一次發送)。處理消息後提交Kafka消費者補償的好模式是什麼?
這是我想出了代碼:
private def startFlow[T](implicit system: ActorSystem, config: Config, subscriber: ActorRef,
topicPattern: String,
mapCommittableMessageToSinkMessage: Function[CommittableMessage[String, String], T]) {
val groupId = config.getString("group-id")
implicit val materializer = ActorMaterializer()
val consumerSettings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
.withGroupId(groupId)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
implicit val timeout = Timeout(5 seconds) // timeout for reply message on ask call below
import system.dispatcher // the ExecutionContext that will be used in ask call below
Consumer.committableSource(consumerSettings, Subscriptions
.topicPattern(topicPattern))
.map(message => (message, mapCommittableMessageToSinkMessage(message)))
.mapAsync(1)(tuple => ask(subscriber, tuple._2).map(_ => tuple._1))
.mapAsync(1)(message => message.committableOffset.commitScaladsl())
.runWith(Sink.ignore)
}
正如代碼所示,它映射原始消息的元組,以及傳遞給用戶(發送到遠程服務的男主角轉化消息)。該元組的用途是在用戶完成處理後提交偏移量。
它的一些東西看起來像一個反模式,但我不確定更好的方法來做到這一點。任何建議在更好的方式?
謝謝!
謝謝Stefano!你的方法奏效了。這是比我的方法更多的代碼。我是Akka的新手,所以我可以看到GraphDSL對於複雜流是一種更可擴展的方法。我將在單獨的答案中使用樣板文字發佈代碼。 – jacob