2016-04-05 29 views
1

設想一個組流元素

val myFlow: Flow[Element] = ... //some flow.. 

給定一個權重函數

val weightFunction: Element => Int 

我想獲得

val transformedFlow: Flow[List[Element]] 

使得每個元素transformationFlow是一個List [Element],使得該列表中元素的權重之和大於給定的con STANT。

我該怎麼做到這一點?

回答

1

如何使用scan創建累計權重流,然後zip將結果與原始元素流一起使用,然後使用splitAfter創建子流?我還沒有嘗試編譯下面的,但我希望你的想法:(你可能要考慮在resultFlow做map(_.reverse)

val broadCast = builder.add(Broadcast[Element](2)) 
val zip = builder.add(Zip[Element, Boolean]) 

myFlow.shape.out ~> broadCast.in 

broadCast.out(0) ~> zip.in0 

broadCast.out(1).scan(0){ (totalWeight, elem) => 
    if(totalWeight > Limit) weightFunction(elem) 
    else totalWeight + weightFunction(elem) 
}.map(_ > Limit) ~> zip.in1 

val resultFlow = 
    zip.out.splitAfter(_._2) 
    .fold(List.empty[Element]){ case (list, (elem, _)) => elem :: list } 
    .concatSubstreams 

編輯:你甚至不需要要做broadcastzip如果你稍微改變scan的返回類型 - 請參閱這裏的可運行代碼示例:https://gist.github.com/MartinHH/a05a87269b1697d5f57a1c77db269767