2010-03-17 134 views
1

我想使用scala actors來並行化代碼。這是我第一次與演員合作的真實代碼,但我在C中使用Java Mulithreading和MPI方面有一些經驗。但是我完全失去了。scala actors之間的依賴關係

欲實現的工作流是一個圓形的管道,並且可以描述爲以下:

  • 每個工人演員具有到另一個基準,從而形成了一圈
  • 有一個協調演員,其可以通過發送消息StartWork()
  • 當工人接收StartWork()消息觸發的計算,它在本地處理的一些東西,併發送DoWork(...) MESSA ge給它在圈子裏的鄰居。
  • 鄰居做了一些其他的東西,併發送一個DoWork(...)消息到自己的鄰居。
  • 這一直持續到最初的工作人員收到DoWork()消息。
  • 協調員可以發送一個GetResult()消息給初始worker並等待回覆。

問題是協調器應該只在數據準備就緒時收到結果。 工作人員在回覆GetResult()消息之前如何等待作業返回?

爲了加速計算,任何員工可以隨時收到StartWork()

這是我第一次嘗試僞實現工人:

class Worker(neighbor: Worker, numWorkers: Int) { 
    var ready = Foo() 
    def act() { 
    case StartWork() => { 
     val someData = doStuff() 
     neighbor ! DoWork(someData, numWorkers-1) 
     } 
    case DoWork(resultData, remaining) => if(remaining == 0) { 
     ready = resultData 
     } else { 
     val someOtherData = doOtherStuff(resultData) 
     neighbor ! DoWork(someOtherData, remaining-1) 
     } 
    case GetResult() => reply(ready) 
    } 
} 

在協調方面:

worker ! StartWork() 
val result = worker !? GetResult() // should wait 

回答

3

首先,你顯然需要對什麼是單件的一些標識的工作,這樣GetResult可以得到正確的結果。我想最明顯的解決方案是讓你的角色保持結果的Map任何等待干將Map

class Worker(neighbor: Worker, numWorkers: Int) { 
    var res: Map[Long, Result] = Map.empty 
    var gets: Map[Long, OutputChannel[Any]] = Map.empty 
    def act() { 
    ... 
    case DoWork(id, resultData, remaining) if remaining == 0 => 
     res += (id -> resultData) 
     gets.get(id).foreach(_ ! res(id)) //reply to getters when result is ready 
     gets -= id //clear out getter map now? 
    case GetResult(id) if res.isDefinedAt(d) => //result is ready 
     reply (res(id)) 
    case GetResult(id) => //no result ready 
     gets += (id -> sender) 
    } 
} 

注:的匹配條件使用if可以使信息處理有點清晰

+0

謝謝你的回答。我會盡快嘗試。順便說一句,我認爲,如果在'=>'之後在這種情況下是正確的。在匹配參數時,我不會尋找警衛,但我希望根據價值有兩種不同的行爲。也許我應該使用兩個不同的警衛的「案例」條目。 – paradigmatic

+0

哦,是的。所以是 - 我正在讀'=>'在其他地方 –

1

一個替代辦法是這樣的:該作品

class Worker(neighbor: Worker, numWorkers: Int) { 
    var ready = Foo() 
    def act() { 
    case StartWork() => { 
     val someData = doStuff() 
     neighbor ! DoWork(someData, numWorkers-1) 
     } 
    case DoWork(resultData, remaining) => if(remaining == 0) { 
     ready = resultData 
     react { 
      case GetResult() => reply(ready) 
     } 
     } else { 
     val someOtherData = doOtherStuff(resultData) 
     neighbor ! DoWork(someOtherData, remaining-1) 
     } 
    } 
} 

後完成後,此工作人員將被卡住,直到收到消息GetResult。另一方面,協調員可以立即發送GetResult,因爲它將保留在郵箱中,直到工作人員收到它。

+0

真的很好。我沒有意識到可以嵌入反應塊。但是,這不是我的問題的解決方案,因爲(如果我理解正確),工作人員將被困在正在等待'GetResult()'的內部'react'中,並且將無法成爲管道的一部分。 – paradigmatic

+0

@paradigmatic只有當結果準備就緒時,它纔會停下來等待'GetResult',但我的觀點確實表明您可以級聯反應。 –