2017-08-21 83 views
2

假設我有不同水果的來源,並且我想將它們的計數插入數據庫。斯卡拉/阿卡流中元素的分組

我可以做這樣的事情:

Flow[Fruits] 
.map { item => 
    insertItemToDatabase(item) 
} 

但是,這顯然是緩慢的 - 爲什麼插入到數據庫,每一個項目,我可以將它們呢?所以,我想出了一個更好的解決方案:

Flow[Fruits] 
.grouped(10000) 
.map { items => 
    insertItemsToDatabase(items) 
} 

但是,這意味着我必須持有10 000元[banana, orange, orange, orange, banana, ...]在內存中,直到它們被刷新到數據庫中。這不是沒有效率嗎?也許我可以做這樣的事情:

Flow[Fruits] 
.grouped(100) 
.map { items => 
    consolidate(items) // this will return Map[String, Int] 
} 
.grouped(100) 
// here I have Seq[Map[String, Int]] 
.map { mapOfItems=> 
    insertMapToDatabase(mapOfItems) 
} 

從我的理解,這也應該一次處理10 000元,但不應該佔用儘可能多的內存(提供重複的元素通常)。但是每個鍵在內存中仍然會重複100次。當然我可以做.grouped(10).map().grouped(10).map().grouped(10).map().grouped(10).map() ...但是沒有更好的方法嗎?也許是這樣的:

Flow[Fruits] 
.map { item => 
    addToMap(item) 
    if(myMap.length == 10000) { 
     insertToDatabase(myMap) 
     clearMyMap() 
    } 
} 

但不會打破的概念阿卡流,即處理階段的獨立性(因此併發)?

+0

看看函數''groupedWithin''。它需要兩個參數:元素的最大邊界和時間速率。例如''.groupedWithnin(5000,1.seconds)''會給5000個元素進行處理,如果你在1秒之前到達它,或者它會給出在1秒內累積的元素數量。 – alifirat

+0

感謝@alifirat您的建議,但這只是一種不同的分組方式。我需要的是一種處理我擁有的數據的不同方式,既有內存友好又有數據庫友好。 –

回答

1

如果Fruit集合的基數很低,那麼您可以保留一個具有所有計數的奇異Map,然後在流過所有Fruit值後將其清除到數據庫。

首先,構建一個流程,將讓運行中計:

type Count = Int 

type FruitCount = Map[Fruit, Count] 

val zeroCount : FruitCount = 
    Map.empty[Fruit, Count] withDefaultValue 0 

val appendFruitToCount : (FruitCount, Fruit) => FruitCount = 
    (fruitCount, fruit) => fruitCount + (fruit -> fruitCount(fruit) + 1) 

val fruitCountFlow : Flow[Fruit, FruitCount, NotUsed] = 
    Flow[Fruit].scan(zeroCount)(appendFruitToCount) 

現在創建將接收最後FruitCount和物化的流水槽:

val lastFruitCountSink : Sink[FruitCount, _] = Sink.lastOption[FruitCount] 

val fruitSource : Source[Fruit, NotUsed] = ??? 

val lastFruitCountFut : Future[Option[FruitCount]] = 
    fruitSource 
    .via(fruitCountFlow) 
    .to(lastFruitCountSink) 
    .run() 

lastFruitCountFut然後可以用於將值發送到數據庫:

lastFruitCountFut foreach (_ foreach (_ foreach { (fruit, count) => 
    insertItemsToDatabase(Iterator.fill(count)(fruit)) 
})) 

使用Iterator是因爲它是用於構建水果項目的TraversableOnce的最高效內存集合。

此解決方案將只保留1 Map在內存中,每個不同水果類型將有1個密鑰& 1每個密鑰的整數。