2
我想使用標準scala.actors包爲Scala設計一個調度員 - 工作者演員模式。斯卡拉調度員 - 工人演員模式
調度員接收來自java.util.concurrent.LinkedBlockingQueue
的工作並將其發送給工作者參與者進行處理。當所有的工作都完成後,調度員應該告訴每個工人要退出,然後它也應該退出。這裏是我想出的代碼,但它掛起,當所有的工作都做完了(我認爲是即將在調度隊列'GiveMeWork
消息):
import java.util.concurrent.LinkedBlockingQueue
import scala.actors.Actor
object Dispatcher
extends Actor {
println("Dispatcher created")
def act() {
val workers = (1 to 4).map(id => (new Worker(id)).start())
loop {
react {
case 'GiveMeWork =>
// println("Worker asked for work")
val (time, i) = workQueue.take()
if (time == 0) {
println("Quitting time")
workers.foreach(_ !? 0L)
} else {
println("Arrival at dispatcher: i: " + i + " dispatch time: " +
time + ", elapsed: " + (System.nanoTime() - time))
sender ! time
}
case 'Quit =>
println("Told to quit")
sender ! 'OffDuty
exit()
}
}
}
}
class Worker(id: Int)
extends Actor {
println("Worker(" + id + ") created")
var jobs = 0
def act() {
Dispatcher ! 'GiveMeWork
loop {
react {
case time: Long =>
if (time == 0) {
println("Worker(" + id + ") completed " + jobs + " jobs")
sender ! 'OffDuty
exit()
} else {
println("Arrival at worker(" + id + "): dispatch time: " +
time + ", elapsed: " + (System.nanoTime() - time))
Thread.sleep(id)
jobs += 1
Dispatcher ! 'GiveMeWork
}
}
}
}
}
val workQueue = new LinkedBlockingQueue[(Long, Int)](1000)
Dispatcher.start()
for (i <- 0 until 5000) {
Thread.sleep(1)
workQueue.put((System.nanoTime(), i))
}
workQueue.put((0L, 0))
println("Telling Dispatcher to quit")
Dispatcher !? 'Quit
我不喜歡使用的LinkedBlockingQueue,但我需要限制的等待執行的最大進程數。當我在實際代碼中使用此模式時,由於生成工作的方法(目錄遞歸下降),我得到了內存不足錯誤,添加的文件比它們可以處理的快。有關如何純粹與演員合作的建議? – Ralph
@Ralph我已經更新了我的答案。 – senia