2010-11-11 105 views
7

試圖獲得如何根據actor而不是線程思考的句柄。我有點難倒以下用例:從線程模型轉換到演員

考慮一個創建工作(例如,通過讀取數據從文件)一個生產者進程的系統,和一個數字,消費工作的工作進程(例如解析數據並將其寫入數據庫)。工作的產生和消耗速度可能會有所不同,並且系統應該保持穩定。例如,如果工人無法跟上,製片人應該檢測到這一點,並最終放慢速度或等待。

這是很容易使用線程來實現:

val producer:Iterator[Work] = createProducer() 
val queue = new LinkedBlockingQueue[Work](QUEUE_SIZE) 
val workers = (0 until NUM_WORKERS) map { i => 
    new Thread() { 
    override def run() = { 
     while (true) { 
     try { 
      // take next unit of work, waiting if necessary 
      val work = queue.take() 
      process(work) 
     } 
     catch { 
      case e:InterruptedException => return 
     } 
     } 
    } 
    } 
} 

// start the workers 
workers.foreach(_.start()) 

while (producer.hasNext) { 
    val work = producer.next() 
    // add new unit of work, waiting if necessary 
    queue.put(work) 
} 

while (!queue.isEmpty) { 
    // wait until queue is drained 
    queue.wait() 
} 

// stop the workers 
workers.foreach(_.interrupt()) 

有沒有什麼不對的模型是,我已經成功地使用過。這個例子可能過於冗長,因爲使用Executor或CompletionService可以很好地適應這個任務。但我喜歡演員的抽象,並且認爲在很多情況下推理是比較容易的。有沒有辦法使用actors重寫這個例子,特別是確保沒有緩衝區溢出(例如完整的郵箱,丟棄的消息等)?

回答

3

因爲參與者處理消息「脫機」(即消息的消耗與他們正在接收的消息不相關),所以很難看到你如何能夠準確地模擬「生產者等待消費者趕上」。

我能想到的唯一的事情是,消費者要求從生產者演員的工作(使用reply):

case object MoreWorkPlease 
class Consumer(prod : Producer) extends Actor { 
    def act = { 
    prod ! MoreWorkPlease 
    loop { 
     react { 
     case Work(payload) => doStuff(payload); reply(MoreWorkPlease) 
     } 
    } 
    } 
} 

class Producer extends Actor { 
    def act = loop { 
    react { 
     case MoreWorkPlease => reply(Work(getNextItem)) 
    } 
    } 
} 

這是不完美的,當然,因爲生產者不「讀轉發「,只有當消費者準備好時才能獲得工作。用法可能類似於:

val prod = new Producer 
(1 to NUM_ACTORS).map(new Consumer(prod)).foreach(_.start()) 
prod.start() 
+0

Hrm,這是我想到的一種解決方案。這可能就足夠了,但我擔心的是,如果工人超過生產者,那麼缺少工作緩衝區會導致性能下降。 – toluju 2010-11-11 23:03:37

+0

@toluju - 開始讓每個消費者都要求工作,並讓生產者對這些消息不作出反應,但要接收它們,並在沒有更多工作要做時將它們放入隊列中。 (然後,一旦有工作,就可以將其劃分到隊列中的項目。) – 2010-11-12 00:08:18