2016-10-18 29 views
2

我想爲包含並行處理流程的Akka流定義圖(我正在使用Akka.NET,但這應該不重要)。想象一下訂單的數據來源,每個訂單都包含訂單ID和產品列表(訂單項)。工作流程如下:爲收集元素的並行處理創建一個Akka流

  1. 接收和順序
  2. 廣播順序給兩個流,流A將處理訂單項目,信道B將處理訂單ID(一些簿記工作)
  3. 流甲:將訂單項目拆分爲單獨的元素,每個單獨處理
  4. 流程A:對於上一步驟中拆分產生的每個訂單項,調用一些外部服務,查找額外信息(價格,可用性等)。 )
  5. 流程B:爲此做一些額外的簿記給定的訂單ID
  6. 合併Ë流A和B
  7. 發送到信宿合併的數據來自前面步驟,其導致富集順序信息

步驟1(Source.From),2(廣播), 4-5(地圖),6(合併),7(水槽)看起來不錯。但是如何在Akka或反應性流條款中實施收集分拆?這不是廣播或扁平化,需要將N個元素的集合拆分成N個獨立的子流,這些子流將稍後被合併回去。這是如何實現的?

回答

3

我建議在一個流程中做到這一點。我知道兩個流動看起來更酷,但相信我在設計的簡單性方面不值得(我試過)。你可以寫這樣的東西

import akka.stream.scaladsl.{Flow, Sink, Source, SubFlow} 

import scala.collection.immutable 
import scala.concurrent.Future 

case class Item() 

case class Order(items: List[Item]) 

val flow = Flow[Order] 
    .mapAsync(4) { order => 
    Future { 
     // Enrich your order here 
     order 
    } 
    } 
    .mapConcat { order => 
    order.items.map(order -> _) 
    } 
    .mapAsync(4) { case (order, item) => 
    Future { 
     // Enrich your item here 
     order -> item 
    } 
    } 
    .groupBy(2, tuple => tuple._1) 
    .fold[Map[Order, List[Item]]](immutable.Map.empty) { case (map, (order, item)) => map.updated(order, map.getOrElse(order, Nil) :+ item) } 
    .mapConcat { _.map { case (order, newItems) => order.copy(items = newItems)} } 

但即使這種方法是不好的。上面的代碼或您的設計有太多東西可能會出錯。如果豐富一項訂單的物品失敗,你會怎麼做?如果訂單對象的富集失敗會怎樣?您的信息流應該發生什麼?

如果我是你,我會有Flow[Order]並在mapAsync處理其子女,所以至少可以保證我沒有部分處理的訂單。

+0

非常感謝您的答覆。是的,mapAsync將提供一個高效且可預測的工作流程。但我不確定我瞭解叉子的危險。是的,有些事情可能會出錯,但是如果在其中一個併發分支中發生錯誤,我不能以類似的方式處理錯誤? –

+0

@VagifAbilov你將能夠處理它,但我認爲這將需要更多的代碼來處理邊緣情況和錯誤。更多的代碼 - >難以維護。在使用具有奇特拓撲的流構建系統之後,我意識到這往往不值得(以我的愚見)。 – expert

+0

我在另一個關於流的討論中收到了類似的解釋。有道理,謝謝你的澄清。 –

相關問題