2017-02-08 68 views
0

我試圖將反應流訂戶附加到akka源。不能使用具有akka流源的反應流訂戶

我的源似乎很好用一個簡單的水槽(如foreach) - 但如果我把一個真正的水槽,由用戶,我沒有得到任何東西。

我的背景是:

import akka.actor.ActorSystem 
import akka.stream.ActorMaterializer 
import akka.stream.scaladsl.{Sink, Source} 
import org.reactivestreams.{Subscriber, Subscription} 

implicit val system = ActorSystem.create("test") 
implicit val materializer = ActorMaterializer.create(system) 

class PrintSubscriber extends Subscriber[String] { 
    override def onError(t: Throwable): Unit = {} 
    override def onSubscribe(s: Subscription): Unit = {} 
    override def onComplete(): Unit = {} 
    override def onNext(t: String): Unit = { 
    println(t) 
    } 
} 

和我的測試情況是:

val subscriber = new PrintSubscriber() 
val sink = Sink.fromSubscriber(subscriber) 

val source2 = Source.fromIterator(() => Iterator("aaa", "bbb", "ccc")) 
val source1 = Source.fromIterator(() => Iterator("xxx", "yyy", "zzz")) 
source1.to(sink).run()(materializer) 
source2.runForeach(println) 

我得到的輸出:

aaa 
bbb 
ccc 

爲什麼我沒有拿到XXX,YYY,和ZZZ?

回答

2

此亭無流規格爲以下Subscriber

將通過用戶的 實例Publisher.subscribe(用戶)後接受一次調用onSubscribe(認購)。不會再收到 通知,直到Subscription.request(long)爲 爲止。

你可以看到流動的一些項目通過對您的用戶的最小變化是

override def onSubscribe(s: Subscription): Unit = { 
    s.request(3) 
} 

但是,請記住,這不會使其完全兼容無流specs。實現起來並不那麼容易,這是Akka-Streams本身的更高級工具包背後的主要原因。