2017-07-28 37 views
0

是否可以通過Kafka Producer對象發送字符串數組。我想從'topic1'中接收一些消息 - 文本行然後將其分割爲單個單詞並將其發送到另一個主題。Kafka - scala - 處理多個郵件

object KafkaConsumer extends App { 

     implicit val actorSystem = ActorSystem("test-actor-system") 
     implicit val streamMaterializer = ActorMaterializer() 
     implicit val executionContext = actorSystem.dispatcher 
     val log = actorSystem.log 


     // PRODUCER config 
     val producerSettings = ProducerSettings(
     actorSystem, 
     new ByteArraySerializer, 
     new StringSerializer) 
     .withBootstrapServers("localhost:9092") 
     .withProperty("auto.create.topics.enable", "true") 

     // CONSUMER config 
     val consumerSettings = ConsumerSettings(
     system = actorSystem, 
     keyDeserializer = new ByteArrayDeserializer, 
     valueDeserializer = new StringDeserializer) 
     .withBootstrapServers("localhost:9092") 
     .withGroupId("kafka-sample") 
     .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") 
     // -----------------------------------------------------------------------// 

     // ROUTE OF THE APP 
     Consumer.committableSource(consumerSettings, 
     Subscriptions.topics("topic1")) 
    .map { 
      msg => println(s"topic1 -> topic2: $msg") 
      ProducerMessage.Message(new ProducerRecord[Array[Byte], String]("topic2", msg.record.value), msg.committableOffset) 
      } 
    .runWith(Producer.commitableSink(producerSettings)) 
    } 
+0

快速回復是的,你也可以發送對象的JSON – MaximeF

+0

我的主要目標是發送該數組的每個單個元素。我的意思是=>消費者從topic1獲取:「xxxx xxx xx x」,並且我想使用Producer 4消息發送:「xxxx」,「xxx」,「xx」,「x」。你能幫助我嗎? – METUAN

+0

也許我不明白,但你可以拆分你的消息,然後致電4次發送(產生)消息。 – MaximeF

回答

0

的阿卡流樣品創建一個簡單的流讀取一個消息,使用產生於卡夫卡和提交對所消耗的消息偏移的接收器。如果您需要閱讀一個或多個消息並生成許多消費集中的單詞,則應該使用Akka Stream Graph API進行更多操作。

此示例使用圖表以及從卡夫卡建立一個來源和使用groupedWithin讀了一堆的消息,並得到了現有詞。

創建了兩個簡單的流程,一個用於獲取最後一個偏移量,另一個用於獲取單詞。然後創建一個Source階段,將來自Kafka的消息廣播到兩個流中,並將結果壓縮成一個元組(Seq [String],Long)。隨着runForeach函數消息的產生。請注意,郵件不會按照Future.sequence的順序生成。

雖然樣品可看久了它編譯和運行使用正確「com.typesafe.akka」 %%「阿卡 - 流 - 卡夫卡」%「0.14」

import java.util.Properties 

import akka.actor.ActorSystem 
import akka.kafka.ConsumerMessage.{CommittableMessage, CommittableOffset} 
import akka.kafka.{ConsumerSettings, ProducerSettings, Subscriptions} 
import akka.kafka.scaladsl.Consumer 
import akka.stream.{ActorMaterializer, SourceShape} 
import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Source, Zip} 

import org.apache.kafka.clients.consumer.ConsumerConfig 
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} 
import org.apache.kafka.common.serialization.{ 
    ByteArrayDeserializer, 
    ByteArraySerializer, 
    StringDeserializer, 
    StringSerializer 
} 

import scala.concurrent.Future 
import scala.util.Success 
import scala.concurrent.duration._ 

object SplitSource extends App { 

    implicit val actorSystem = ActorSystem("test-actor-system") 
    implicit val streamMaterializer = ActorMaterializer() 
    implicit val executionContext = actorSystem.dispatcher 
    val log = actorSystem.log 

    // PRODUCER config 
    val producerSettings = ProducerSettings(actorSystem, 
              new ByteArraySerializer, 
              new StringSerializer) 
    .withBootstrapServers("localhost:9092") 
    .withProperty("auto.create.topics.enable", "true") 

    // CONSUMER config 
    val consumerSettings = 
    ConsumerSettings(system = actorSystem, 
        keyDeserializer = new ByteArrayDeserializer, 
        valueDeserializer = new StringDeserializer) 
     .withBootstrapServers("localhost:9092") 
     .withGroupId("kafka-sample4") 
     .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") 

    implicit val producerConfig = { 
    val props = new Properties() 
    props.setProperty("bootstrap.servers", "localhost:9092") 
    props.setProperty("key.serializer", classOf[StringSerializer].getName) 
    props.setProperty("value.serializer", classOf[StringSerializer].getName) 
    props 
    } 

    lazy val kafkaProducer = new KafkaProducer[String, String](producerConfig) 

    // Create Scala future from Java 
    private def publishToKafka(id: String, data: String) = { 
    Future { 
     kafkaProducer 
     .send(new ProducerRecord("outTopic", id, data)) 
     .get() 
    } 
    } 

    def getKafkaSource = 
    Consumer 
     .committableSource(consumerSettings, Subscriptions.topics("inTopic")) 
     // It consumes 10 messages or waits 30 seconds to push downstream 
     .groupedWithin(10, 30 seconds) 

    val getStreamSource = GraphDSL.create() { implicit b => 
    import GraphDSL.Implicits._ 

    val in = getKafkaSource 

    // BroadCast to two flows. One for obtain the last offset to commit 
    // and other to return the Seq with the words to publish 
    val br = b.add(Broadcast[Seq[CommittableMessage[Array[Byte], String]]](2)) 
    val zipResult = b.add(Zip[CommittableOffset, Array[String]]()) 
    val flowCommit = Flow[Seq[CommittableMessage[Array[Byte], String]]].map(_.last.committableOffset) 

    // Flow that creates the list of all words in all consumed messages 
    val _flowWords = 
     Flow[Seq[CommittableMessage[Array[Byte], String]]].map(input => { 
     input.map(_.record.value()).mkString(" ").split(" ") 
     }) 

    val zip = Zip[CommittableOffset, Array[String]] 

    // build the Stage 
    in ~> br ~> flowCommit ~> zipResult.in0 
      br ~> _flowWords ~> zipResult.in1 

    SourceShape(zipResult.out) 
    } 

    Source.fromGraph(getStreamSource).runForeach { msgs => 
    { 
     // Publish all words and when all futures complete the commit the last Kafka offset 
     val futures = msgs._2.map(publishToKafka("outTopic", _)).toList 

     // Produces in parallel!!. Use flatMap to make it in order 
     Future.sequence(futures).onComplete { 
     case Success(e) => { 
      // Once all futures are done, it makes commit to the last consumed message 
      msgs._1.commitScaladsl() 
     } 
     } 
    } 
    } 

} 

的阿卡流API允許創建出色的處理流水線。

+0

它完美的工作!謝謝! – METUAN

0

您應該map之前使用mapConcat,因爲它

變換每個輸入元素到輸出元件的Iterable隨後被壓平到輸出流中。

全部附加線將是這樣的:

Subscriptions.topics("topic1")) 
    .mapConcat { msg => msg.record.value().split(" ").toList } 
    .map { ... 
+0

這是一個簡單而好的解決方案,但請注意,如果您將所有消息映射到字符串列表,則無法從它們獲取可接受的偏移量。 – METUAN