2016-09-30 38 views

回答

2

的「分塊」的一面,即製片人,是很容易使用類似reactive kafka寫:

case class LargeMessage(bytes : Seq[Byte], topic : String) 

def messageToKafka(message : LargeMessage, maxMessageSize : Int) = 
    Source.fromIterator(() => message.bytes.toIterator) 
     .via(Flow[Byte].grouped(maxMessageSize)) 
     .via(Flow[Seq[Byte]].map(seq => new ProducerRecord(message.topic, seq))) 
     .runWith(Producer.plainSink(producerSettings) 

「重新組裝」,即如在此演示文稿描述消費者可以以類似於以下方式實現:the documentation

val messageFut : Future[LargeMessage] = 
    for { 
     bytes <- Consumer.map(_._1).runWith(Sink.seq[Byte]) 
    } yield LargeMessage(bytes, topic) 
+0

這是否保證了消息的順序,因爲order acros s分區是不是保證? – Rabzu