我想寫一個卡夫卡消費者使用反應卡夫卡,阿卡-http和akka流websocket流。卡夫卡消息給websocket
val publisherActor = actorSystem.actorOf(CommandPublisher.props)
val publisher = ActorPublisher[String](publisherActor)
val commandSource = Source.fromPublisher(publisher) map toMessage
def toMessage(c: String): Message = TextMessage.Strict(c)
class CommandPublisher extends ActorPublisher[String] {
override def receive = {
case cmd: String =>
if (isActive && totalDemand > 0)
onNext(cmd)
}
}
object CommandPublisher {
def props: Props = Props(new CommandPublisher())
}
// This is the route
def mainFlow(): Route = {
path("ws"/"commands") {
handleWebSocketMessages(Flow.fromSinkAndSource(Sink.ignore, commandSource))
}
}
從卡夫卡的消費者(此處省略),我做了publisherActor ! commandString
動態內容添加到網頁套接字。
不過,我碰到這種異常在後端,當我啓動多個客戶端的網頁套接字:
[ERROR] [03/31/2016 21:17:10.335] [KafkaWs-akka.actor.default-dispatcher-3][akka.actor.ActorSystemImpl(KafkaWs)] WebSocket handler failed with can not subscribe the same subscriber multiple times (see reactive-streams specification, rules 1.10 and 2.12)
java.lang.IllegalStateException: can not subscribe the same subscriber multiple times (see reactive-streams specification, rules 1.10 and 2.12)
at akka.stream.impl.ReactiveStreamsCompliance$.canNotSubscribeTheSameSubscriberMultipleTimesException(ReactiveStreamsCompliance.scala:35)
at akka.stream.actor.ActorPublisher$class.aroundReceive(ActorPublisher.scala:295)
...
不能用於所有的WebSocket客戶一個流程?或者應該爲每個客戶創建流量/發佈者角色?
在這裏,我打算向所有websocket客戶端發送「當前」/「實時」通知。通知的歷史不相關,新客戶需要忽略。
有趣的是,我從https://github.com/J-Technologies/akka-http-websocket-activator-template中採用了我的代碼,它令人驚訝地工作。它使用Source.actorPublisher而不是Source.fromPublisher - 這是唯一的區別。無法理解爲什麼這個作品,我的不是 – vishr
我很確定我引用的帖子是準確的......所以這意味着,你必須像普通的演員一樣使用它,而不是掛鉤到符合ReactiveStreams的 –
From ActorPublisher '它說:/ ** *創建一個[[ActorPublisher]] actor支持的[[org.reactivestreams.Publisher]]。它可以是連接到[[org.reactivestreams.Subscriber]]的 *,也可以用作 * [[akka.stream.scaladsl.Flow]]的輸入源。 * / –