的阿卡流樣品創建一個簡單的流讀取一個消息,使用產生於卡夫卡和提交對所消耗的消息偏移的接收器。如果您需要閱讀一個或多個消息並生成許多消費集中的單詞,則應該使用Akka Stream Graph API進行更多操作。
此示例使用圖表以及從卡夫卡建立一個來源和使用groupedWithin讀了一堆的消息,並得到了現有詞。
創建了兩個簡單的流程,一個用於獲取最後一個偏移量,另一個用於獲取單詞。然後創建一個Source階段,將來自Kafka的消息廣播到兩個流中,並將結果壓縮成一個元組(Seq [String],Long)。隨着runForeach函數消息的產生。請注意,郵件不會按照Future.sequence的順序生成。
雖然樣品可看久了它編譯和運行使用正確「com.typesafe.akka」 %%「阿卡 - 流 - 卡夫卡」%「0.14」
import java.util.Properties
import akka.actor.ActorSystem
import akka.kafka.ConsumerMessage.{CommittableMessage, CommittableOffset}
import akka.kafka.{ConsumerSettings, ProducerSettings, Subscriptions}
import akka.kafka.scaladsl.Consumer
import akka.stream.{ActorMaterializer, SourceShape}
import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Source, Zip}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.{
ByteArrayDeserializer,
ByteArraySerializer,
StringDeserializer,
StringSerializer
}
import scala.concurrent.Future
import scala.util.Success
import scala.concurrent.duration._
object SplitSource extends App {
implicit val actorSystem = ActorSystem("test-actor-system")
implicit val streamMaterializer = ActorMaterializer()
implicit val executionContext = actorSystem.dispatcher
val log = actorSystem.log
// PRODUCER config
val producerSettings = ProducerSettings(actorSystem,
new ByteArraySerializer,
new StringSerializer)
.withBootstrapServers("localhost:9092")
.withProperty("auto.create.topics.enable", "true")
// CONSUMER config
val consumerSettings =
ConsumerSettings(system = actorSystem,
keyDeserializer = new ByteArrayDeserializer,
valueDeserializer = new StringDeserializer)
.withBootstrapServers("localhost:9092")
.withGroupId("kafka-sample4")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
implicit val producerConfig = {
val props = new Properties()
props.setProperty("bootstrap.servers", "localhost:9092")
props.setProperty("key.serializer", classOf[StringSerializer].getName)
props.setProperty("value.serializer", classOf[StringSerializer].getName)
props
}
lazy val kafkaProducer = new KafkaProducer[String, String](producerConfig)
// Create Scala future from Java
private def publishToKafka(id: String, data: String) = {
Future {
kafkaProducer
.send(new ProducerRecord("outTopic", id, data))
.get()
}
}
def getKafkaSource =
Consumer
.committableSource(consumerSettings, Subscriptions.topics("inTopic"))
// It consumes 10 messages or waits 30 seconds to push downstream
.groupedWithin(10, 30 seconds)
val getStreamSource = GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val in = getKafkaSource
// BroadCast to two flows. One for obtain the last offset to commit
// and other to return the Seq with the words to publish
val br = b.add(Broadcast[Seq[CommittableMessage[Array[Byte], String]]](2))
val zipResult = b.add(Zip[CommittableOffset, Array[String]]())
val flowCommit = Flow[Seq[CommittableMessage[Array[Byte], String]]].map(_.last.committableOffset)
// Flow that creates the list of all words in all consumed messages
val _flowWords =
Flow[Seq[CommittableMessage[Array[Byte], String]]].map(input => {
input.map(_.record.value()).mkString(" ").split(" ")
})
val zip = Zip[CommittableOffset, Array[String]]
// build the Stage
in ~> br ~> flowCommit ~> zipResult.in0
br ~> _flowWords ~> zipResult.in1
SourceShape(zipResult.out)
}
Source.fromGraph(getStreamSource).runForeach { msgs =>
{
// Publish all words and when all futures complete the commit the last Kafka offset
val futures = msgs._2.map(publishToKafka("outTopic", _)).toList
// Produces in parallel!!. Use flatMap to make it in order
Future.sequence(futures).onComplete {
case Success(e) => {
// Once all futures are done, it makes commit to the last consumed message
msgs._1.commitScaladsl()
}
}
}
}
}
的阿卡流API允許創建出色的處理流水線。
快速回復是的,你也可以發送對象的JSON – MaximeF
我的主要目標是發送該數組的每個單個元素。我的意思是=>消費者從topic1獲取:「xxxx xxx xx x」,並且我想使用Producer 4消息發送:「xxxx」,「xxx」,「xx」,「x」。你能幫助我嗎? – METUAN
也許我不明白,但你可以拆分你的消息,然後致電4次發送(產生)消息。 – MaximeF