2014-09-24 46 views
1

我想在Scala中執行一列值的聚合。這裏有一些考慮:使用關聯運算符在Scala中的並行聚合

  • 聚集函數[1]是締合以及可交換:實例是加和乘
  • 這個函數應用於並行列表以利用CPU
  • 的所有核

這是一個實現:

package com.example.reactive 

import scala.concurrent.Future 
import scala.concurrent.Await 
import scala.concurrent.duration._ 
import scala.concurrent.ExecutionContext.Implicits.global 

object AggregateParallel { 

    private def pm[T](l: List[Future[T]])(zero: T)(fn: (T, T) => T): Future[T] = { 

    val l1 = l.grouped(2) 
    val l2 = l1.map { sl => 
     sl match { 
     case x :: Nil => x 
     case x :: y :: Nil => 
      for (a <- x; b <- y) yield fn(a, b) 
     case _ => Future(zero) 
     } 
    }.toList 

    l2 match { 
     case x :: Nil => x 
     case x :: xs => pm(l2)(zero)(fn) 
     case Nil => Future(zero) 
    } 
    } 

    def parallelAggregate[T](l: List[T])(zero: T)(fn: (T, T) => T): T = { 
    val n = pm(l.map(Future(_)))(zero)(fn) 
    Await.result(n, 1000 millis) 
    n.value.get.get 
    } 

    def main(args: Array[String]) { 

    // multiply empty list: zero value is 1 
    println(parallelAggregate(List[Int]())(1)((x, y) => x * y)) 

    // multiply a list: zero value is 1 
    println(parallelAggregate(List(1, 2, 3, 4, 5))(1)((x, y) => x * y)) 

    // sum a list: zero value is 0 
    println(parallelAggregate(List(1, 2, 3, 4, 5))(0)((x, y) => x + y)) 

    // sum a list: zero value is 0 
    val bigList1 = List(1, 2, 3, 4, 5).map(BigInt(_)) 
    println(parallelAggregate(bigList1)(0)((x, y) => x + y)) 

    // sum a list of BigInt: zero value is 0 
    val bigList2 = (1 to 100).map(BigInt(_)).toList 
    println(parallelAggregate(bigList2)(0)((x, y) => x + y)) 

    // multiply a list of BigInt: zero value is 1 
    val bigList3 = (1 to 100).map(BigInt(_)).toList 
    println(parallelAggregate(bigList3)(1)((x, y) => x * y)) 
    } 
} 

OUTPUT:

1 
120 
15 
15 
5050 
93326215443944152681699238856266700490715968264381621468592963895217599993229915608941463976156518286253697920827223758251185210916864000000000000000000000000 

我還可以在Scala中實現相同的目標還是改進此代碼?

EDIT1

我已經實現自下而上的彙總。我認爲我非常接近Scala中的aggregate方法(見下文)。所不同的是,我只拆分成子列表兩個元素:

Scala實現:

def aggregate[S](z: S)(seqop: (S, T) => S, combop: (S, S) => S): S = { 
    executeAndWaitResult(new Aggregate(z, seqop, combop, splitter)) 
} 

有了這個實現我假定總髮生在平行像這樣:

List(1,2,3,4,5,6) 
-> split parallel -> List(List(1,2), List(3,4), List(5,6)) 
-> execute in parallel -> List(3, 7, 11) 
-> split parallel -> List(List(3,7), List(11)) 
-> execute in parallel -> List(10, 11) 
-> Result is 21 

這是正確的假設斯卡拉aggregate也在做自下而上的並行聚合?

[1] http://www.mathsisfun.com/associative-commutative-distributive.html

+1

scala的並行列表已經有了一個'聚合'方法,您可以根據自己的要求進行操作。 http://markusjais.com/scalas-parallel-collections-and-the-aggregate-method/ – gwenzek 2014-09-24 07:29:58

+0

請在問題中檢查我的EDIT1。 – tuxdna 2014-09-24 07:56:49

+0

我回答了您的編輯 – gwenzek 2014-09-24 10:46:14

回答

3

階的平行名單已經有一個aggregate方法做你,你剛纔問什麼: http://markusjais.com/scalas-parallel-collections-and-the-aggregate-method/

它就像foldLeft但需要一個額外的參數:

def foldLeft[B](z: B)(f: (B, A) ⇒ B): B 
def aggregate[B](z: ⇒ B)(seqop: (B, A) ⇒ B, combop: (B, B) ⇒ B): B 

當稱爲上的並行集合aggregate分裂在N個部分收集,使用foldLeft parrallelyö n每個部分,並使用combop來計算所有結果。

但是,當在非平行集合上呼叫時aggregate就像foldLeft一樣工作並忽略combop

要獲得一致的結果,您需要關聯和交換操作符,因爲您不控制列表將如何分割。

簡單例子:

List(1, 2, 3, 4, 5).par.aggregate(1)(_ * _, _ * _) 
    res0: Int = 120 

答到EDIT1(根據意見改進):

我不認爲這是正確的做法,對於N項列表,你會創建n個Future秒。這在調度中造成了很大的開銷。除非seqop真的很長,否則每次調用它時都會避免創建Future

+0

創建未來並不意味着創建一個線程。直到調度器(通過ExecutionContext)來控制有多少線程來安排期貨。正如你所提到的,沒有辦法控制分割,所以Scala的「聚合」有點限制了恕我直言。除標準Scala庫之外,還有哪些其他選擇? – tuxdna 2014-09-24 11:37:57

+0

@tuxdna是的,Future和Thread是不同的,但是通過創建太多的Future,你將更多的工作交給Scheduler,因此會減慢你的代碼。恕我直言,未來應該只爲長時間計算而創建。在你的例子中,總結兩個整數當然不會更糟。 – gwenzek 2014-09-24 12:07:21

+0

@tuxdna至於其他替代我不知道。 scala集合庫的設計使您無需擔心在並行化時如何分割計算。你爲什麼想要對此有更多的控制? – gwenzek 2014-09-24 12:09:15