我想實現一個安裝程序,我有多個Web瀏覽器打開一個websocket連接到我的akka-http服務器,以便讀取發佈到卡夫卡主題的所有消息。卡夫卡主題websocket
所以消息流應該走這條路
kafka topic -> akka-http -> websocket connection 1
-> websocket connection 2
-> websocket connection 3
對於WebSocket的,現在我已經創建了一個路徑:
val route: Route =
path("ws") {
handleWebSocketMessages(notificationWs)
}
然後我創建了一個消費者對我的卡夫卡話題:
val consumerSettings = ConsumerSettings(system,
new ByteArrayDeserializer, new StringDeserializer)
.withBootstrapServers("localhost:9092")
.withGroupId("group1")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val source = Consumer
.plainSource(consumerSettings, Subscriptions.topics("topic1"))
然後最後我想把這個源連接到handleWebSocketMessages中的websocket
def handleWebSocketMessages: Flow[Message, Message, Any] =
Flow[Message].mapConcat {
case tm: TextMessage =>
TextMessage(source)::Nil
case bm: BinaryMessage =>
// ignore binary messages but drain content to avoid the stream being clogged
bm.dataStream.runWith(Sink.ignore)
Nil
}
這裏是我的錯誤,當我嘗試在TextMessage中使用source
:
Error:(77, 9) overloaded method value apply with alternatives: (textStream: akka.stream.scaladsl.Source[String,Any])akka.http.scaladsl.model.ws.TextMessage (text: String)akka.http.scaladsl.model.ws.TextMessage.Strict cannot be applied to (akka.stream.scaladsl.Source[org.apache.kafka.clients.consumer.ConsumerRecord[Array[Byte],String],akka.kafka.scaladsl.Consumer.Control]) TextMessage(source)::Nil
我想我前進的道路上做出頻頻失誤,但我要說的是,大部分阻塞部分是handleWebSocketMessages
。
非常感謝!您的答案在我使用'Consumer.committableSource'而不是'Consumer.plainSource',和'consumerRecord.record.value()'而不是'consumerRecord.getkey.toString'之後起作用。 –
太棒了!我更新了我的答案。 – n1r3