1
也似乎並沒有爲我工作。物品不會陷入其中。這是我的。什麼也沒有來定義在也到
val merged: Source[ArticleWithKeywords, _] = ...
val (ks, fut) = merged
.alsoTo(Flow[ArticleWithKeywords].map { a => a.id -> a.ids.toList }.to(queueManager.getIdsForAnsSink))
.map(_.id)
.groupedWithin(100, 5 seconds)
.mapAsync(4) { ids => runReferenceFetching(ids) }
.viaMat(KillSwitches.single)(Keep.right)
.toMat(Sink.ignore)(Keep.both)
.run()
但我看到的項目達到runReferenceFetching。我錯過了什麼?
如果你用'Sink.foreach'替換你的'queueManager.getIdsForAnsSink',它是否仍然不能按預期工作? (或者如果這確實解決了問題,那麼有沒有錯誤/意外與那個水槽) – johanandren
@johanandren它與'Sink.foreach'一起工作。但另一方面,簡單的'Sourse(List(..))。runWith(queueManager.getIdsForAnsSink)也適用。所以我很困惑這裏有什麼錯。 – expert
alsoTo實際上是廣播階段反壓,如果任何下游分支背壓,所以我會說元素確實達到那個水槽,但然後以某種方式丟失。併發問題也許? – johanandren