0
我有這樣的資料來源:如何從Source中獲取物品的順序?
val queueP : Promise[SourceQueueWithComplete[List[String]]] = Promise()
val source = Source.queue(Constants.CHUNK_SIZE, OverflowStrategy.backpressure).mapMaterializedValue {
q : SourceQueueWithComplete[List[String]] => {
queueP.success(q)
q
}
}.watchTermination() {
case (_,f) => f.recoverWith {
case t : Exception => {
queueP.tryFailure(new Exception)
Future.failed(t)
}
}
}
我提供項目:
queueP.future.map(f => f.offer(someList))
但是,當我沉在另一端:
val sink = Sink.foreach[List[String]](someList => {
...
})
val flow = rowsSource.to(sink)
flow.run
的項目我收到來沒有秩序,哪一種首先擊敗隊列的目的。有沒有辦法迫使這些物品按照他們提供給隊列的順序進入?
你確定'rowsSource'和'source'是一樣的嗎?你能添加更多的代碼來解決這個問題嗎? –
@StefanoBonetti我敢肯定 - 我想我的問題是,他們應該是爲了什麼?根據我發現的信息,我的理解是這些項目可以以任何順序進行(每當期貨完成時)。但是必須有辦法強制原始訂單。 – Erix
從你的代碼看來,你只能向你的Source.queue提供**一個**元素,並且這是一個'List [String]'。我的結論是,無論什麼時候它被Sink接收到,它仍然具有相同順序的元素。或者你是否向該物化隊列提供了多個元素? –