2017-07-25 168 views
2

我想實現一個安裝程序,我有多個Web瀏覽器打開一個websocket連接到我的akka​​-http服務器,以便讀取發佈到卡夫卡主題的所有消息。卡夫卡主題websocket

所以消息流應該走這條路

kafka topic -> akka-http -> websocket connection 1 
         -> websocket connection 2 
         -> websocket connection 3 

對於WebSocket的,現在我已經創建了一個路徑:

val route: Route = 
path("ws") { 
    handleWebSocketMessages(notificationWs) 
} 

然後我創建了一個消費者對我的卡夫卡話題:

val consumerSettings = ConsumerSettings(system, 
    new ByteArrayDeserializer, new StringDeserializer) 
    .withBootstrapServers("localhost:9092") 
    .withGroupId("group1") 
    .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") 
val source = Consumer 
    .plainSource(consumerSettings, Subscriptions.topics("topic1")) 

然後最後我想把這個源連接到handleWebSocketMessages中的websocket

def handleWebSocketMessages: Flow[Message, Message, Any] = 
    Flow[Message].mapConcat { 
    case tm: TextMessage => 
     TextMessage(source)::Nil 
    case bm: BinaryMessage => 
     // ignore binary messages but drain content to avoid the stream being clogged 
     bm.dataStream.runWith(Sink.ignore) 
     Nil 
    } 

這裏是我的錯誤,當我嘗試在TextMessage中使用source

Error:(77, 9) overloaded method value apply with alternatives: (textStream: akka.stream.scaladsl.Source[String,Any])akka.http.scaladsl.model.ws.TextMessage (text: String)akka.http.scaladsl.model.ws.TextMessage.Strict cannot be applied to (akka.stream.scaladsl.Source[org.apache.kafka.clients.consumer.ConsumerRecord[Array[Byte],String],akka.kafka.scaladsl.Consumer.Control]) TextMessage(source)::Nil

我想我前進的道路上做出頻頻失誤,但我要說的是,大部分阻塞部分是handleWebSocketMessages

回答

3

第一件事,就是要了解該源的類型:Source[ConsumerRecord[K, V], Control]。 因此,它不是您可以作爲TextMessage參數傳遞的內容。

現在,讓我們來看網頁套接字的點:

  • 傳出消息被內置在卡夫卡源中的每個消息。該消息將是來自Kafka消息的字符串轉換的TextMessage。
  • 對於每個收到的消息,只是println()一樣它

所以,Flow可以被看作是兩個組件:Source &的Sink

val incomingMessages: Sink[Message, NotUsed] = 
    Sink.foreach(println(_)) 

val outgoingMessages: Source[Message, NotUsed] = 
    source 
    .map { consumerRecord => TextMessage(consumerRecord.record.value) } 

val handleWebSocketMessages: Flow[Message, Message, Any] 
    = Flow.fromSinkAndSource(incomingMessages, outgoingMessages) 

希望它有幫助。

+0

非常感謝!您的答案在我使用'Consumer.committableSource'而不是'Consumer.plainSource',和'consumerRecord.record.value()'而不是'consumerRecord.getkey.toString'之後起作用。 –

+0

太棒了!我更新了我的答案。 – n1r3