2012-12-06 101 views
2

我想使用標準scala.actors包爲Scala設計一個調度員 - 工作者演員模式。斯卡拉調度員 - 工人演員模式

調度員接收來自java.util.concurrent.LinkedBlockingQueue的工作並將其發送給工作者參與者進行處理。當所有的工作都完成後,調度員應該告訴每個工人要退出,然後它也應該退出。這裏是我想出的代碼,但它掛起,當所有的工作都做完了(我認爲是即將在調度隊列'GiveMeWork消息):

import java.util.concurrent.LinkedBlockingQueue 
import scala.actors.Actor 

object Dispatcher 
extends Actor { 
    println("Dispatcher created") 

    def act() { 
    val workers = (1 to 4).map(id => (new Worker(id)).start()) 

    loop { 
     react { 
     case 'GiveMeWork => 
      // println("Worker asked for work") 
      val (time, i) = workQueue.take() 
      if (time == 0) { 
      println("Quitting time") 
      workers.foreach(_ !? 0L) 
      } else { 
      println("Arrival at dispatcher: i: " + i + " dispatch time: " + 
        time + ", elapsed: " + (System.nanoTime() - time)) 
      sender ! time 
      } 
     case 'Quit => 
      println("Told to quit") 
      sender ! 'OffDuty 
      exit() 
     } 
    } 
    } 
} 

class Worker(id: Int) 
extends Actor { 
    println("Worker(" + id + ") created") 
    var jobs = 0 

    def act() { 
    Dispatcher ! 'GiveMeWork 

    loop { 
     react { 
     case time: Long => 
      if (time == 0) { 
      println("Worker(" + id + ") completed " + jobs + " jobs") 
      sender ! 'OffDuty 
      exit() 
      } else { 
      println("Arrival at worker(" + id + "): dispatch time: " + 
        time + ", elapsed: " + (System.nanoTime() - time)) 
      Thread.sleep(id) 
      jobs += 1 
      Dispatcher ! 'GiveMeWork 
      } 
     } 
    } 
    } 
} 

val workQueue = new LinkedBlockingQueue[(Long, Int)](1000) 

Dispatcher.start() 

for (i <- 0 until 5000) { 
    Thread.sleep(1) 
    workQueue.put((System.nanoTime(), i)) 
} 

workQueue.put((0L, 0)) 

println("Telling Dispatcher to quit") 
Dispatcher !? 'Quit 

回答

3

有一場比賽:

val (time, i) = workQueue.take() 

所有的工作都完成了,包括workQueue.put((0L, 0)),所以它會一直等下去。

同時使用不同類型的併發是一個壞主意。

調度員可以告知任務源有關任務限制:

import scala.actors.{Actor, OutputChannel} 
import scala.collection.mutable.Queue 

case class Task(time: Long, i: Int) 
case object GiveMeWork 
case object Quit 
case object OffDuty 

object Dispatcher extends Actor { 
    println("Dispatcher created") 

    def act() { 
    val workers = (1 to 4).map(id => (new Worker(id)).start()) 
    val waitingWorkers = Queue[OutputChannel[Any]](workers: _*) 
    val tasks = Queue[Task]() 
    var workSender: Option[OutputChannel[Any]] = None 

    loop { 
     react { 
     case GiveMeWork => 
      if (!tasks.isEmpty) sender ! tasks.dequeue() 
      else waitingWorkers enqueue sender 

      workSender map { _ ! GiveMeWork } 
      workSender = None 
     case t: Task => 
      if (!waitingWorkers.isEmpty) waitingWorkers.dequeue() ! t 
      else tasks enqueue t 

      if (tasks.length < 1000) sender ! GiveMeWork 
      else workSender = Some(sender) 
     case Quit => 
      println("Told to quit") 
      workers.foreach{ _ ! Quit } 
      sender ! OffDuty 
      exit() 
     } 
    } 
    } 
} 

class Worker(id: Int) 
extends Actor { 
    var jobs = 0 

    def act() { 
    loop { 
     react { 
     case t: Task => 
      Thread.sleep(id) 
      jobs += 1 
      Dispatcher ! GiveMeWork 
     case Quit => 
      println("Worker(" + id + ") completed " + jobs + " jobs") 
      sender ! OffDuty 
      exit() 
     } 
    } 
    } 
} 

Dispatcher.start() 

for (i <- 0 until 5000) { 
    Thread.sleep(1) 
    Dispatcher !? Task(System.nanoTime(), i) 
} 

println("Telling Dispatcher to quit") 
Dispatcher !? Quit 
+0

我不喜歡使用的LinkedBlockingQueue,但我需要限制的等待執行的最大進程數。當我在實際代碼中使用此模式時,由於生成工作的方法(目錄遞歸下降),我得到了內存不足錯誤,添加的文件比它們可以處理的快。有關如何純粹與演員合作的建議? – Ralph

+0

@Ralph我已經更新了我的答案。 – senia