2017-06-14 83 views
1

有一個整數的一些流:如何結合`count`和`sum`計算爲同一來源

val source = Source(List(1,2,3,4,5))

是否有可能獲得從源頭上(count, sum)結果?對於上面的例子,它將是(5, 15)

我想我應該用流量,並結合他們:

val countFlow = Flow[Int].fold(0)((c, _) => c + 1) 
val sumFlow = Flow[Int].fold(0)((s, e) => s + e) 

如何在上述流量適用於源。還是有另一種方式?

回答

1

最後的總

,你提出的Flow是得到一個最終值後的來源幾乎是正確的用盡:

這個流程然後可以用Sink.head結合起來得到最終結果:

val result : Future[Data] = 
    source 
    .via(countAndSum) 
    .runWith(Sink[Data].head) 

中間值

如果你想要一個「跑反」,例如你希望所有的中間數據值,那麼你可以使用Flow.scan代替折:

val intermediateCountAndSum = 
    Flow[Int].scan(zeroData)(updateData) 

你也可以 「漏」,這些Data值成Sink.seq

val intermediateResult : Future[Seq[Data]] = 
    source 
    .via(intermediateCountAndSum) 
    .runWith(Sink[Data].seq) 
0

你可以簡單地做以下

source.map(list => (list.length, list.reduceLeft(_+_))) 

我希望它有幫助

+0

這不起作用。 'Source.apply'接受一個I​​terable [T]併發出'T'類型的值,因此'source'的類型是'Source [Int,_]'。 –

1
val graph = Source.fromGraph(GraphDSL.create() { implicit builder => 
    import GraphDSL.Implicits._ 

    val fanOut = builder.add(Broadcast[Int](2)) 
    val merge = builder.add(Zip[Int, Int]) 

    source ~> fanOut ~> countFlow ~> merge.in0 
       fanOut ~> sumFlow ~> merge.in1 

    SourceShape(merge.out) 
    }) 


    graph.runWith(Sink.last) 
0
case class Stats(sum: Int, count: Int) { 
    def add(el: Int): Stats = this.copy(sum = sum += el, count = count +=1) 
} 

object Stats { 
    def empty: Stats = Stats(0, 0) 
} 

val countFlow = Flow[Status].fold(Stats.empty)((stats, e) => stats add e) 
+1

請解釋這個答案如何解決這個問題。原始代碼沒有解釋沒有幫助普通觀察者。 – coderatchet