0
使用Kafka發送大型文件時,是否可以將其分配到分區中,然後使用Akka-Stream重新組裝?如何使用Akka-Stream在「反應性卡夫卡」中分塊並重新分配大型郵件
http://www.slideshare.net/JiangjieQin/handle-large-messages-in-apache-kafka-58692297
使用Kafka發送大型文件時,是否可以將其分配到分區中,然後使用Akka-Stream重新組裝?如何使用Akka-Stream在「反應性卡夫卡」中分塊並重新分配大型郵件
http://www.slideshare.net/JiangjieQin/handle-large-messages-in-apache-kafka-58692297
的「分塊」的一面,即製片人,是很容易使用類似reactive kafka寫:
case class LargeMessage(bytes : Seq[Byte], topic : String)
def messageToKafka(message : LargeMessage, maxMessageSize : Int) =
Source.fromIterator(() => message.bytes.toIterator)
.via(Flow[Byte].grouped(maxMessageSize))
.via(Flow[Seq[Byte]].map(seq => new ProducerRecord(message.topic, seq)))
.runWith(Producer.plainSink(producerSettings)
「重新組裝」,即如在此演示文稿描述消費者可以以類似於以下方式實現:the documentation:
val messageFut : Future[LargeMessage] =
for {
bytes <- Consumer.map(_._1).runWith(Sink.seq[Byte])
} yield LargeMessage(bytes, topic)
這是否保證了消息的順序,因爲order acros s分區是不是保證? – Rabzu