2011-12-11 38 views
14

我有可能輸入的值列表斯卡拉平行集合 - 如何提前回報?

val inputValues = List(1,2,3,4,5) 

我有一個很長的計算功能,這給了我一個結果

def reallyLongFunction(input: Int) : Option[String] = { ..... } 

使用Scala的並行集合,我可以很容易做到

inputValues.par.map(reallyLongFunction(_)) 

獲得所有結果,並行。問題是,我並不是真的想要所有的結果,我只想要第一個結果。只要我的一個輸入是成功的,我想要我的輸出,並希望繼續我的生活。這做了很多額外的工作。

那麼,我該如何獲得兩全其美?我想

  1. 獲取返回從我的長期功能
  2. 停止從無用功我所有的其他線程的東西的第一個結果。

編輯 - 我具有

@volatile var done = false; 

被設置和我reallyLongFunction內檢查解決了這個問題很愚蠢Java程序員。這工作,但不感到非常scala。想更好的辦法來做到這一點....

+1

附註(不是你的問題的答案):這是恕我直言,更簡單:'inputValues.par.map(reallyLongFunction)' –

+1

Similar:http://stackoverflow.com/questions/8073061/filtering-scalas-parallel- collection-with-early-abort-when-desired-number-of-r –

+0

它看起來不像我平行集合或fork-join框架被設計來處理這種情況。如果計算時間長,因爲它是CPU密集型的,那麼想要計算所有結果或拆分內核之間的負載與將所有內核用於計算結果相比似乎是浪費。如果計算時間很長是因爲它正在等待一些IO,那麼看起來未來或演員會更合適。 – huynhjl

回答

3

我以與huynhjl相同的方式解釋了您的問題,但如果您只是想搜索並丟棄None s,則可以執行類似操作以避免在找到合適的結果時重複計算:

class Computation[A,B](value: A, function: A => B) { 
    lazy val result = function(value) 
} 

def f(x: Int) = {   // your function here 
    Thread.sleep(100 - x) 
    if (x > 5) Some(x * 10) 
    else None 
} 

val list = List.range(1, 20) map (i => new Computation(i, f)) 
val found = list.par find (_.result.isDefined) 
    //found is Option[Computation[Int,Option[Int]]] 
val result = found map (_.result.get) 
    //result is Option[Int] 

但並行集合的find似乎做了很多不必要的工作(請參閱this question),所以這可能無法正常工作,至少使用當前版本的Scala。

並行集合中使用了易失性標誌(請參閱find,existsforall的源代碼),所以我認爲您的想法很好。如果你可以在函數本身包含標誌,那實際上會更好。它會破壞你函數的參考透明度(即對於某些輸入,你的函數現在有時返回None而不是Some),但是由於你放棄了停止的計算,所以這應該不重要。

+0

我真的很喜歡儲存的想法因爲我的「f」函數除了我的參數外還有2個其他的參數(不涉及我分裂的東西,並在所有調用常數)..所以需要明白這一點從語法POV。也許我應該咖喱吧... – bwawok

+0

@bwawok'新的計算((ARG1,ARG2,ARG3),(F _)。tupled)'將在不對'Computation'類進行任何修改的情況下工作,假設'f'需要3個參數。或者,您可以使計算類爲不同的性格。 –

4

(更新:不,這是行不通的,沒有做地圖)

將它的工作做這樣的事情:

inputValues.par.find({ v => reallyLongFunction(v); true }) 

實現使用這樣的:

protected[this] class Find[U >: T](pred: T => Boolean, protected[this] val pit: IterableSplitter[T]) extends Accessor[Option[U], Find[U]] { 
    @volatile var result: Option[U] = None 
    def leaf(prev: Option[Option[U]]) = { if (!pit.isAborted) result = pit.find(pred); if (result != None) pit.abort } 
    protected[this] def newSubtask(p: IterableSplitter[T]) = new Find(pred, p) 
    override def merge(that: Find[U]) = if (this.result == None) result = that.result 
    } 

看起來精神很相似,你@volatile除了你不必看它;-)

+0

如何取回真正功能的結果?不知道我理解這個語法,humm – bwawok

+1

哦,我搞砸了;當然,找到的不是計算的原始值。不要這個答案! –

+0

@HavocP - 我遇到了這個問題好幾次,太:(爲什麼Scala沒有像findMap [B](FN:A =>(B,布爾))定義在它的收藏 – Rogach

2

如果你願意使用非核心庫,我認爲期貨將是這個任務的一個很好的匹配。例如:

...這兩者的出現,使你要找的功能。

+0

我並不想先完成,我想先用結果 – bwawok

+0

「發現」存在於即將到來的阿卡2.0完成,但在那之前它是相當容易實現:https://github.com/jboner/akka/blob/master /akka-actor/src/main/scala/akka/dispatch/Future.scala#L211 –