2017-03-05 58 views
1

也似乎並沒有爲我工作。物品不會陷入其中。這是我的。什麼也沒有來定義在也到

val merged: Source[ArticleWithKeywords, _] = ... 
val (ks, fut) = merged 
    .alsoTo(Flow[ArticleWithKeywords].map { a => a.id -> a.ids.toList }.to(queueManager.getIdsForAnsSink)) 
    .map(_.id) 
    .groupedWithin(100, 5 seconds) 
    .mapAsync(4) { ids => runReferenceFetching(ids) } 
    .viaMat(KillSwitches.single)(Keep.right) 
    .toMat(Sink.ignore)(Keep.both) 
    .run() 

但我看到的項目達到runReferenceFetching。我錯過了什麼?

+0

如果你用'Sink.foreach'替換你的'queueManager.getIdsForAnsSink',它是否仍然不能按預期工作? (或者如果這確實解決了問題,那麼有沒有錯誤/意外與那個水槽) – johanandren

+0

@johanandren它與'Sink.foreach'一起工作。但另一方面,簡單的'Sourse(List(..))。runWith(queueManager.getIdsForAnsSink)也適用。所以我很困惑這裏有什麼錯。 – expert

+0

alsoTo實際上是廣播階段反壓,如果任何下游分支背壓,所以我會說元素確實達到那個水槽,但然後以某種方式丟失。併發問題也許? – johanandren

回答

0

原來的問題與alsoTo無關。問題是使用Source.fromPublisher創建的接收器。我錯誤地認爲我可以使用相同的Publisher[T]創建多個接收器。既然已經有另外一個水槽,第二個水槽就沒有工作。