2017-03-06 64 views
1

我正在使用Akka Streams Kafka將卡夫卡消息傳遞給遠程服務。我希望保證該服務每次只收到一條消息(至少一次,最多一次發送)。處理消息後提交Kafka消費者補償的好模式是什麼?

這是我想出了代碼:

private def startFlow[T](implicit system: ActorSystem, config: Config, subscriber: ActorRef, 
          topicPattern: String, 
          mapCommittableMessageToSinkMessage: Function[CommittableMessage[String, String], T]) { 

    val groupId = config.getString("group-id") 

    implicit val materializer = ActorMaterializer() 

    val consumerSettings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer) 
     .withGroupId(groupId) 
     .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") 

    implicit val timeout = Timeout(5 seconds) // timeout for reply message on ask call below 
    import system.dispatcher // the ExecutionContext that will be used in ask call below 

    Consumer.committableSource(consumerSettings, Subscriptions 
     .topicPattern(topicPattern)) 
     .map(message => (message, mapCommittableMessageToSinkMessage(message))) 
     .mapAsync(1)(tuple => ask(subscriber, tuple._2).map(_ => tuple._1)) 
     .mapAsync(1)(message => message.committableOffset.commitScaladsl()) 
     .runWith(Sink.ignore) 
    } 

正如代碼所示,它映射原始消息的元組,以及傳遞給用戶(發送到遠程服務的男主角轉化消息)。該元組的用途是在用戶完成處理後提交偏移量。

它的一些東西看起來像一個反模式,但我不確定更好的方法來做到這一點。任何建議在更好的方式?

謝謝!

回答

1

可以通過使用GraphDSL來使其更清潔和更易於更改。它可以讓你產生一個支持你的消息部分的分支,而另一個分支可以執行所有需要的業務邏輯。

圖的一個例子可以(忽略所有的更多,更清晰的樣板):

val src = Consumer.committableSource(consumerSettings, Subscriptions 
     .topicPattern(topicPattern)) 

val businessLogic = Flow[CommittableMessage[String, String]].mapAsync(1)(message => ask(subscriber, mapCommittableMessageToSinkMessage(message))) 

val snk = Flow[CommittableMessage[String, String]].mapAsync(1)(message => message.committableOffset.commitScaladsl()) 
     .runWith(Sink.ignore) // look into Sink.foldAsync for a more compact re-write of this part 

src ~> broadcast 
     broadcast ~> businessLogic ~> zip.in0 
     broadcast   ~>   zip.in1 
            zip.out.map(_._2) ~> snk 
+0

謝謝Stefano!你的方法奏效了。這是比我的方法更多的代碼。我是Akka的新手,所以我可以看到GraphDSL對於複雜流是一種更可擴展的方法。我將在單獨的答案中使用樣板文字發佈代碼。 – jacob

0

下面是一個使用@斯特凡諾 - BONETTI的辦法,先後在回答上面的完整代碼:

private def startStream[T](implicit system: ActorSystem, config: Config, subscriber: ActorRef, 
          topicSuffix: String, 
          convertCommittableMessageToSubscriberMessage: Function[CommittableMessage[String, String], T]) { 

    val groupId = config.getString("group-id") 
    val subscriberName = subscriber.path.name 
    val customerId = config.getString("customer-id") 
    val topicPattern = s"^$customerId\\.$topicSuffix$$" 

    implicit val materializer = ActorMaterializer() 

    val consumerSettings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer) 
     .withGroupId(s"$groupId.$subscriberName") 
     .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") 

    implicit val timeout = Timeout(5 seconds) // timeout for reply message on ask call below 
    import system.dispatcher // the ExecutionContext that will be used in ask call below 

    val src = Consumer.committableSource(consumerSettings, Subscriptions.topicPattern(topicPattern)) 

    val businessLogic = Flow[CommittableMessage[String, String]] 
     .mapAsync(1)(message => subscriber.ask(convertCommittableMessageToSubscriberMessage(message))) 

    val snk = Flow[CommittableMessage[String, String]] 
     .mapAsync(1)(message => message.committableOffset.commitScaladsl()) 
     .to(Sink.ignore) 

    val decider: Supervision.Decider = { 
     case e => { 
     system.log.error("error in stream", e) 
     Supervision.Stop 
     } 
    } 

    val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] => 
     import GraphDSL.Implicits._ 

     val broadcast = builder.add(Broadcast[CommittableMessage[String, String]](2)) 
     val zip = builder.add(Zip[Any, CommittableMessage[String, String]]) 

     src ~> broadcast 
     broadcast ~> businessLogic ~> zip.in0 
     broadcast ~> zip.in1 
     zip.out.map(_._2) ~> snk 

     ClosedShape 
    }) 
     .withAttributes(ActorAttributes.supervisionStrategy(decider)) 
     .run(materializer) 
    } 
相關問題