2017-02-12 117 views
1

我正在使用蛋糕解決方案Akka client for scala and Kafka。雖然我創建KafkaProducerActor演員,並嘗試使用ask模式發送消息並返回將來並執行一些操作,但每次我都面臨ask超時異常。下面是我的代碼:Apache Kafka:KafkaProducerActor拋出異常ASk超時。

class SimpleAkkaProducer (config: Config, system: ActorSystem) { 

    private val producerConf = KafkaProducer. 
    Conf(config, 
     keySerializer = new StringSerializer, 
     valueSerializer = new StringSerializer) 

    val actorRef = system.actorOf(KafkaProducerActor.props(producerConf)) 

    def sendMessageWayOne(record: ProducerRecords[String, String]) = { 
    actorRef ! record 
    } 

    def sendMessageWayTwo(record: ProducerRecords[String, String]) = { 
    implicit val timeout = Timeout(100.seconds) 
    val future = (actorRef ? record).mapTo[String] 
    future onComplete { 
     case Success(data) => println(s" >>>>>>>>>>>> ${data}") 
     case Failure(ex) => ex.printStackTrace() 
    } 
    } 
} 

object SimpleAkkaProducer { 
    def main(args: Array[String]): Unit = { 
    val system = ActorSystem("KafkaProducerActor") 
    val config = ConfigFactory.defaultApplication() 
    val simpleAkkaProducer = new SimpleAkkaProducer(config, system) 

    val topic = config.getString("akka.topic") 
    val messageOne = ProducerRecords.fromKeyValues[String, String](topic, 
     Seq((Some("Topics"), "First Message")), None, None) 

    simpleAkkaProducer.sendMessageWayOne(messageOne) 
    simpleAkkaProducer.sendMessageWayTwo(messageOne) 
    } 
} 

以下是例外:

akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://KafkaProducerActor/user/$a#-1520717141]] after [100000 ms]. Sender[null] sent message of type "cakesolutions.kafka.akka.ProducerRecords". 
    at akka.pattern.PromiseActorRef$.$anonfun$apply$1(AskSupport.scala:604) 
    at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126) 
    at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:864) 
    at scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:109) 
    at scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:103) 
    at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:862) 
    at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329) 
    at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280) 
    at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284) 
    at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236) 
    at java.lang.Thread.run(Thread.java:745) 
+0

添加KafkaProducerActor'的'的定義。 –

+0

'KafkaProducerActor'由api和實現設計,就像https://github.com/cakesolutions/scala-kafka-client/blob/1cbeccbb183ca06585f4b9fb1e366048e993ac51/akka/src/main/scala/cakesolutions/kafka/akka/KafkaProducerActor.scala –

+0

嘿@YuvalItzchakov你有沒有找到解決辦法? –

回答

2

生產者演員只響應給發件人,如果指定的ProducerRecordssuccessResponsefailureResponse值比None以外的東西。當Kafka寫入成功時,將successResponse值發送回發件人,並且在Kafka寫入失敗時發回failureResponse值。

實施例:

val record = ProducerRecords.fromKeyValues[String, String](
    topic = topic, 
    keyValues = Seq((Some("Topics"), "First Message")), 
    successResponse = Some("success"), 
    failureResponse = Some("failure") 
) 

val future = (actorRef ? record).mapTo[String] 
future onComplete { 
    case Success("success") => println("Send succeeded!") 
    case Success("failure") => println("Send failed!") 
    case Success(data) => println(s"Send result: $data") 
    case Failure(ex) => ex.printStackTrace() 
} 
+0

好吧@Jaakko,這是有道理的,這是工作,但仍然如何發送消息後獲得'RrecordMetaData'? –

+0

不幸的是,沒有辦法獲得'RecordMetaData'與當前版本的生產者actor。在項目頁面上歡迎修補程序和建議。 :) –

+0

好的,謝謝@Jaakko的幫助。 –