2017-08-14 73 views
0

我是新的使用aka流kafka(和akka流一般)。我正在嘗試構建一個圖表,以便將消息發佈到不同的主題。 如何將生產者作爲流連接以提交處理後的消息?我試着用Producer.flow但由於您使用的是GraphDSL我不能得到commitScaladsl連接生產者流程圖

object TestFoo { 
    import akka.kafka.ProducerMessage.Message 
    implicit val system = ActorSystem("test-kafka") 
    implicit val materializer = ActorMaterializer() 
    val evenNumbersTopic = "even_numbers" 
    val allNumbersTopic = "all_numbers" 
    lazy val consumerSettings = ConsumerSettings(system, new StringDeserializer(), new JsonDeserializer[Int]) 
    .withBootstrapServers("localhost:9092") 
    .withGroupId("group1") 
    .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") 
    lazy val source = Consumer.committableSource(consumerSettings, Subscriptions.topics(Set(evenNumbersTopic, allNumbersTopic))) 
    val producerSettings = ProducerSettings(system, new StringSerializer(), new StringSerializer()) 
    .withBootstrapServers("localhost:9092") 
    val flow: RunnableGraph[NotUsed] = RunnableGraph.fromGraph(GraphDSL.create() { implicit b => 
    import akka.stream.scaladsl.GraphDSL.Implicits._ 
    type TypedMessage = Message[String, Int,CommittableOffset] 
    val bcast = b.add(Broadcast[TypedMessage](2)) 
    val merge = b.add(Merge[TypedMessage](2)) 

    val evenFilter = Flow[TypedMessage].filter ( c => c.record.value() % 2 == 0) 
    val justEven = Flow[TypedMessage].map{ 
     case Message(pr, offset) => 
     val r = new ProducerRecord[String, Int]("general", pr.value()) 
     Message(r, offset) 
    } 
    val allNumbers = Flow[TypedMessage].map{ 
     case Message(pr, offset) => 
     val r = new ProducerRecord[String, Int](allNumbersTopic, pr.value()) 
     Message(r, offset) 
    } 

    val toMsg = Flow[ConsumerMessage.CommittableMessage[String, Int]].map{ msg => 
     val r = new ProducerRecord[String, Int]("general", msg.record.value()) 
     Message(r, msg.committableOffset) 
    } 
    source ~> toMsg ~> bcast 

    bcast ~> evenFilter ~> justEven ~> merge 
    bcast ~> allNumbers ~> merge 
    merge ~> Producer.flow(producerSettings).mapAsync(producerSettings.parallelism) { result => 
     result.message.passThrough.commitScaladsl() //this doesn't compile, cannot get the .commitScaladsl() 
    } 
    ClosedShape 
    })} 
+0

此刻這個例子是操縱着許多其他編譯錯誤。你能修改它,使你的編譯錯誤很容易重現嗎? –

+0

@StefanoBonetti是的,我更新了代碼,但編譯錯誤較少,謝謝 – igx

回答

0

,編譯器不能推斷出從前一階段PassThrough類型。 嘗試並明確地將類型參數傳遞給Producer.flow函數,例如

merge ~> Producer.flow[K, V, CommittableOffset](producerSettings).mapAsync(producerSettings.parallelism) { result => 
    result.message.passThrough.commitScaladsl() 
} 

我已經離開KV非綁定PARAM,請配合有任何鍵/值類型的生產者就必然產生。如果您希望上面的代碼正確連線,則需要將producerSettings類型與合併階段的代碼匹配。你需要這樣的東西:

val producerSettings = ProducerSettings(system, new StringSerializer(), new JsonSerializer[Int]) 
    .withBootstrapServers("localhost:9092") 
+0

謝謝,但它實際上產生了相同的結果: val sink = Producer.flow [String,String,CommittableOffset](producerSettings).mapAsync producerSettings.parallelism){result => result.message.passThrough.commitScaladsl()//仍然無法編譯,無法獲得.commitScaladsl()}'' – igx

+0

對我來說,它看起來像'producerSettings'類型是'[String,String]',但你實際上給它提供了一個'[String,Int]'類型的記錄。我已經修改了答案,提出了一些更改 –

相關問題