2016-11-12 55 views
2

我仍然在掌握Akka流概念,並試圖瞭解如何將它們映射到場景中,以便我們收集需要以原子方式處理的項目集合。假設我們有一個由多個項目組成的採購訂單,我們需要對每個項目應用一些處理,然後將其合併回單一值。此類工作流程是否應成爲自己的單獨流(或子流),一旦採購訂單得到全面處理,該流(或子流)就會關閉?即每個採購訂單開始一個新的流?或者我有一系列永不停止的採購訂單?但如果是這樣,我不會有混合來自不同訂單的採購訂單的問題嗎?Akka流和事務邊界

換句話說,我試圖實現的是處理不同工作流程的隔離,並想知道阿卡流是否爲它提供了很好的匹配。

回答

2

直接回答您的問題:可以創建一個「將處理應用於每個項目,然後將其合併回單一值」的流。

開發您的例子有一些示例代碼:

case class Item(itemId : String) 

case class PurchaseOrder(orderId : String, items : Seq[Item]) 

val purchaseOrder : PurschaseOrder = ??? 

如果我們想處理與流,我們可以在項目,雖然減少的確切性質是在問題不明確,所以我不會定義如何摺疊方式獲得:

type ProcessOutput = ??? 

def processItem(item : Item) : ProcessOutput = ??? 

val combinedResult : Future[CombinedResult] = 
    Source.fromIterator(purchaseOrder.items.toIterator) 
     .via(Flow[Item] map processItem) 
     .to(Sink.fold[ProcessOutput](???)(???)) 
     .run() 

間接回答你的問題,

考慮期貨首先

當背壓是必要的時,阿卡流非常有用。當連接到外部數據源時背壓很常見,因爲bp允許您的應用程序確定數據流式傳輸的速度,因爲您負責連續發送更多數據的需求。

在您提出問題的情況下,不需要廣播需求,and incur the inherent overhead,這種溝通需要。你已經有項目的集合,所以沒有人送需求...

相反,我認爲期貨是去爲您所描述的情況下,最好的辦法:

def futProcess(item : Item)(implicit ec : ExecutionContext) = 
    Future { processItem(item) } 

// same output type as the stream run 
val combinedResults : Future[CombinedResult] = 
    Future.sequence{ purchaseOrder.items map futProcess } 
     .map{ _ fold[ProcessOutput](???)(???) } 

您將得到更好的性能,從一個完整的ActorSystem周圍的複雜性和完全相同的結果無論如何...

+0

謝謝你的詳細答案。我也明白,在某些情況下,您指出流可能不是最理想的。但是我認爲在一般情況下,我會看到很多潛力巨大的潛力,並希望檢查它們。 –

+0

@VagifAbilov歡迎您,愉快的黑客入侵。 –