2017-02-21 22 views
0

我有這樣的資料來源:如何從Source中獲取物品的順序?

val queueP : Promise[SourceQueueWithComplete[List[String]]] = Promise() 
val source = Source.queue(Constants.CHUNK_SIZE, OverflowStrategy.backpressure).mapMaterializedValue { 
     q : SourceQueueWithComplete[List[String]] => { 
     queueP.success(q) 
     q 
     } 
    }.watchTermination() { 
     case (_,f) => f.recoverWith { 
     case t : Exception => { 
      queueP.tryFailure(new Exception) 
      Future.failed(t) 
     } 
     } 
    } 

我提供項目:

queueP.future.map(f => f.offer(someList)) 

但是,當我沉在另一端:

val sink = Sink.foreach[List[String]](someList => { 
     ... 
    }) 

    val flow = rowsSource.to(sink) 
    flow.run 

的項目我收到來沒有秩序,哪一種首先擊敗隊列的目的。有沒有辦法迫使這些物品按照他們提供給隊列的順序進入?

+0

你確定'rowsSource'和'source'是一樣的嗎?你能添加更多的代碼來解決這個問題嗎? –

+0

@StefanoBonetti我敢肯定 - 我想我的問題是,他們應該是爲了什麼?根據我發現的信息,我的理解是這些項目可以以任何順序進行(每當期貨完成時)。但是必須有辦法強制原始訂單。 – Erix

+0

從你的代碼看來,你只能向你的Source.queue提供**一個**元素,並且這是一個'List [String]'。我的結論是,無論什麼時候它被Sink接收到,它仍然具有相同順序的元素。或者你是否向該物化隊列提供了多個元素? –

回答

2

因爲您的offer您的元素作爲mapMaterializedValue調用的一部分,所以您每次需要提交元素時都需要實現(即運行)您的Source.queue

作爲一種副作用,您將不按順序獲取元素,因爲每個流實現都是異步發生的。

對您的問題採取更健康的方法將涉及運行單個圖形,獲取單個隊列,並向其中提交多個元素。看下面的代碼示例:

val queue: SourceQueueWithComplete[List[String]] = 
    Source.queue[List[String]](Constants.CHUNK_SIZE, OverflowStrategy.backpressure) 
    .to(Sink.foreach { list ⇒ /* do stuff */ }) 
    .run() 

queue.offer(List("a", "b")) 
queue.offer(List("c", "d")) 
+0

謝謝我會試試這個 – Erix