2016-04-01 103 views
3

我想寫一個卡夫卡消費者使用反應卡夫卡,阿卡-http和akka流websocket流。卡夫卡消息給websocket

val publisherActor = actorSystem.actorOf(CommandPublisher.props) 
    val publisher = ActorPublisher[String](publisherActor) 
    val commandSource = Source.fromPublisher(publisher) map toMessage 
    def toMessage(c: String): Message = TextMessage.Strict(c) 

    class CommandPublisher extends ActorPublisher[String] { 
    override def receive = { 
     case cmd: String => 
     if (isActive && totalDemand > 0) 
      onNext(cmd) 
    } 
    } 

    object CommandPublisher { 
    def props: Props = Props(new CommandPublisher()) 
    } 

    // This is the route 
    def mainFlow(): Route = { 
    path("ws"/"commands") { 
     handleWebSocketMessages(Flow.fromSinkAndSource(Sink.ignore, commandSource)) 
    } 
    } 

從卡夫卡的消費者(此處省略),我做了publisherActor ! commandString動態內容添加到網頁套接字。

不過,我碰到這種異常在後端,當我啓動多個客戶端的網頁套接字:

[ERROR] [03/31/2016 21:17:10.335] [KafkaWs-akka.actor.default-dispatcher-3][akka.actor.ActorSystemImpl(KafkaWs)] WebSocket handler failed with can not subscribe the same subscriber multiple times (see reactive-streams specification, rules 1.10 and 2.12) 
java.lang.IllegalStateException: can not subscribe the same subscriber multiple times (see reactive-streams specification, rules 1.10 and 2.12) 
    at akka.stream.impl.ReactiveStreamsCompliance$.canNotSubscribeTheSameSubscriberMultipleTimesException(ReactiveStreamsCompliance.scala:35) 
    at akka.stream.actor.ActorPublisher$class.aroundReceive(ActorPublisher.scala:295) 
    ... 

不能用於所有的WebSocket客戶一個流程?或者應該爲每個客戶創建流量/發佈者角色?

在這裏,我打算向所有websocket客戶端發送「當前」/「實時」通知。通知的歷史不相關,新客戶需要忽略。

回答

1

對不起,我承受了一個壞消息,但看起來這是akka關於的明確設計。您可以根據需要爲所有客戶端重複使用該流的實例。作爲Rx模型的結果,扇出必須是「明確的」。

例子我所遇到使用routee特定Flow

// The flow from beginning to end to be passed into handleWebsocketMessages 
    def websocketDispatchFlow(sender: String): Flow[Message, Message, Unit] = 
    Flow[Message] 
     // First we convert the TextMessage to a ReceivedMessage 
     .collect { case TextMessage.Strict(msg) => ReceivedMessage(sender, msg) } 
     // Then we send the message to the dispatch actor which fans it out 
     .via(dispatchActorFlow(sender)) 
     // The message is converted back to a TextMessage for serialization across the socket 
     .map { case ReceivedMessage(from, msg) => TextMessage.Strict(s"$from: $msg") } 

    def route = 
    (get & path("chat") & parameter('name)) { name => 
     handleWebsocketMessages(websocketDispatchFlow(sender = name)) 
    } 

這裏是關於它的討論:

而這正是我不阿卡流一樣,這個明確的 扇出。當我從某個地方收到一個數據源時,我想要 進程(例如Observable或Source),我只想訂閱它 ,我不想關心它是冷還是熱,或者它是否已訂閱 由其他訂戶或不是。這是我的河流比喻。 這條河不應該關心誰喝它,飲酒者 不應該關心河的來源或關於有多少其他飲酒者。我的樣本相當於提供的Mathias 確實共享數據源,但它只是參考 計數,您可以擁有2個訂戶,或者您可以有100個,而不是 。在這裏,我已經看中了,但如果您不想丟失活動或者如果您想確保 流始終保持打開狀態,則參考計數不會 有效。但後來你用ConnectableObservable 其中有connect(): Cancelable,這是一個完美的配合說... 一個Play的LifeCycle插件。如果您想爲新訂戶重複以前的 值,則可以使用BehaviorSubject或ReplaySubject。而之後的事情就會起作用,沒有需要繪製該連接圖的手冊 。 ... ...(這是從https://bionicspirit.com/blog/2015/09/06/monifu-vs-akka-streams.html) ... 對於採用Observable並返回Observable的函數,我們確實有 提升,這是與名稱最接近的東西,可以在Monifu中用於Subject或其他 可觀察類型,因爲LiftOperators1 (和2),這是什麼 可以轉換Observables而不會丟失它們的類型 - 這是對RxJava與lift做什麼的OOP改進。

但是,這些功能不等於Processor/Subject。 區別在於Subject同時是消費者和 生產者。這意味着,當數據源啓動並且數據源基本上是 (意味着多個訂戶共享相同的數據源)時,訂戶無法精確控制 。在Rx中,如果你模擬觀測值(意思是 可觀察到每個個體 訂戶啓動一個新的數據源),那就完全沒有問題。另一方面,在Rx(通常情況下),只有一次訂閱的數據源可能不是 ,就是這樣。Monifu中此規則的唯一例外是由GroupBy運算符 產生的觀察對象,但這就像確認 規則的例外情況。

這意味着,特別是再加上兩個Monifu和反應性流協議(あ應 不訂閱用相同的消費者多次)的 合同的另一個限制是,一個 SubjectProcessor實例不是可重複使用的。爲了使這樣一個實例可重用,Rx模型需要一個工廠 Processor。此外,這意味着無論何時您想使用 Subject/Processor,您的數據源必須自動爲hot (可在多個訂戶之間共享)。

+0

有趣的是,我從https://github.com/J-Technologies/akka-http-websocket-activator-template中採用了我的代碼,它令人驚訝地工作。它使用Source.actorPublisher而不是Source.fromPublisher - 這是唯一的區別。無法理解爲什麼這個作品,我的不是 – vishr

+0

我很確定我引用的帖子是準確的......所以這意味着,你必須像普通的演員一樣使用它,而不是掛鉤到符合ReactiveStreams的 –

+0

From ActorPublisher '它說:/ ** *創建一個[[ActorPublisher]] actor支持的[[org.reactivestreams.Publisher]]。它可以是連接到[[org.reactivestreams.Subscriber]]的 *,也可以用作 * [[akka.stream.scaladsl.Flow]]的輸入源。 * / –