直接回答您的問題:可以創建一個「將處理應用於每個項目,然後將其合併回單一值」的流。
開發您的例子有一些示例代碼:
case class Item(itemId : String)
case class PurchaseOrder(orderId : String, items : Seq[Item])
val purchaseOrder : PurschaseOrder = ???
如果我們想處理與流,我們可以在項目,雖然減少的確切性質是在問題不明確,所以我不會定義如何摺疊方式獲得:
type ProcessOutput = ???
def processItem(item : Item) : ProcessOutput = ???
val combinedResult : Future[CombinedResult] =
Source.fromIterator(purchaseOrder.items.toIterator)
.via(Flow[Item] map processItem)
.to(Sink.fold[ProcessOutput](???)(???))
.run()
間接回答你的問題,
考慮期貨首先
當背壓是必要的時,阿卡流非常有用。當連接到外部數據源時背壓很常見,因爲bp允許您的應用程序確定數據流式傳輸的速度,因爲您負責連續發送更多數據的需求。
在您提出問題的情況下,不需要廣播需求,and incur the inherent overhead,這種溝通需要。你已經有項目的集合,所以沒有人送需求...
相反,我認爲期貨是去爲您所描述的情況下,最好的辦法:
def futProcess(item : Item)(implicit ec : ExecutionContext) =
Future { processItem(item) }
// same output type as the stream run
val combinedResults : Future[CombinedResult] =
Future.sequence{ purchaseOrder.items map futProcess }
.map{ _ fold[ProcessOutput](???)(???) }
您將得到更好的性能,從一個完整的ActorSystem周圍的複雜性和完全相同的結果無論如何...
謝謝你的詳細答案。我也明白,在某些情況下,您指出流可能不是最理想的。但是我認爲在一般情況下,我會看到很多潛力巨大的潛力,並希望檢查它們。 –
@VagifAbilov歡迎您,愉快的黑客入侵。 –