2011-04-21 35 views
4

問題:我需要編寫一個應用程序來處理數百個文件,每個文件需要幾百MB和幾秒鐘才能完成。我已經使用Future[Report]使用Future[Report]創建了對象,但使用Executors.newFixedThreadPool()創建了對象,但因爲由ExecutorService.invokeAll()返回的List[Future[Report]]對象持有到每個進程使用的中間內存而導致內存不足錯誤。我通過在計算Report值(每Report只有幾百行)而不是在call方法(從接口Callable)中進行計算之後,通過在處理器中返回來自處理器中本地方法的對象Report解決了該問題。斯卡拉演員而不是爪哇期貨

我想嘗試用Scala Actors來解決這個問題。我創建了一個採用一系列作業(作業,結果和處理功能的參數化類型)的類,並將每個作業配置爲一個可配置數量的Worker實例(Actor的子類)。代碼如下。

問題

  • 我不知道,我的處理是正確的 。我不喜歡用CountDownLatch來延遲從調度程序返回結果。

  • 我寧願寫的調度員更「實用」的版本,不修改jobsQueue列表或workers HashMap的,也許借用Clojure的尾遞歸loop結構(我在其它使用的方法@tailrec def loop Scala代碼)。

我急切地等待Philipp Haller和Frank Sommers發表的"Actors in Scala"

下面是代碼:

package multi_worker 

import scala.actors.Actor 
import java.util.concurrent.CountDownLatch 

object MultiWorker { 
    private val megabyte = 1024 * 1024 
    private val runtime = Runtime.getRuntime 
} 

class MultiWorker[A, B](jobs: List[A], 
         actorCount: Int)(process: (A) => B) { 
    import MultiWorker._ 

    sealed abstract class Message 

    // Dispatcher -> Worker: Run this job and report results 
    case class Process(job: A) extends Message 

    // Worker -> Dispatcher: Result of processing 
    case class ReportResult(id: Int, result: B) extends Message 

    // Worker -> Dispatcher: I need work -- send me a job 
    case class SendJob(id: Int) extends Message 

    // Worker -> Dispatcher: I have stopped as requested 
    case class Stopped(id: Int) extends Message 

    // Dispatcher -> Worker: Stop working -- all jobs done 
    case class StopWorking extends Message 

    /** 
    * A simple logger that can be sent text messages that will be written to the 
    * console. Used so that messages from the actors do not step on each other. 
    */ 
    object Logger 
    extends Actor { 
    def act() { 
     loop { 
     react { 
      case text: String => println(text) 
      case StopWorking => exit() 
     } 
     } 
    } 
    } 
    Logger.start() 

    /** 
    * A worker actor that will process jobs and return results to the 
    * dispatcher. 
    */ 
    class Worker(id: Int) 
    extends Actor{ 
    def act() { 
     // Ask the dispatcher for an initial job 
     dispatcher ! SendJob(id) 

     loop { 
     react { 
      case Process(job) => 
      val startTime = System.nanoTime 
      dispatcher ! ReportResult(id, process(job)) 

      val endTime = System.nanoTime 
      val totalMemory = (runtime.totalMemory/megabyte) 
      val usedMemory = totalMemory - (runtime.freeMemory/megabyte) 
      val message = "Finished job " + job + " in " + 
       ((endTime - startTime)/1000000000.0) + 
       " seconds using " + usedMemory + 
       "MB out of total " + totalMemory + "MB" 
      Logger ! message 

      dispatcher ! SendJob(id) 

      case StopWorking => 
      dispatcher ! Stopped(id) 
      exit() 
     } 
     } 
    } 
    } 

    val latch = new CountDownLatch(1) 
    var res = List.empty[B] 

    /** 
    * The job dispatcher that sends jobs to the worker until the job queue 
    * (jobs: TraversableOnce[A]) is empty. It then tells the workers to 
    * stop working and returns the List[B] results to the caller. 
    */ 
    val dispatcher = new Actor { 
    def act() { 
     var jobQueue = jobs 
     var workers = (0 until actorCount).map(id => (id, new Worker(id))).toMap 
     workers.values.foreach(_.start()) 

     loop { 
     react { 
      case ReportResult(id, result) => 
      res = result :: res 
      if (jobQueue.isEmpty && workers.isEmpty) { 
       latch.countDown() 
       exit() 
      } 

      case SendJob(id) => 
      if (!jobQueue.isEmpty) { 
       workers(id) ! Process(jobQueue.head) 
       jobQueue = jobQueue.tail 
      } 

      case Stopped(id) => 
      workers = workers - id 
     } 
     } 
    } 
    } 
    dispatcher.start() 

    /** 
    * Get the results of the processing -- wait for the dispatcher to finish 
    * before returning. 
    */ 
    def results: List[B] = { 
    latch.await() 
    res 
    } 
} 

回答

4

快速瀏覽後,我建議以下更新:

val resultsChannel = new Channel[List[B]] // used instead of countdown latch to get the results 

val dispatcher = new Actor { 

    def act = loop(Nil, (0 to actorCount).map(id => 
     (id, new Worker(id).start.asInstanceOf[Worker])).toMap, 
    Nil) 

    @tailrec 
    def loop(jobQueue: List[A], // queue, workers and results are immutable lists, passed recursively through the loop 
      workers: Map[Int, Worker], 
      res: List[B]):Unit = react { 
    case ReportResult(id, result) => 
     val results = result :: res 
     if (results.size == jobs.size) { // when the processing is finished, sends results to the output channel   
     resultsChannel ! results 
     } 
     loop(jobQueue, workers, results) 

    case SendJob(id) => 
     if (!jobQueue.isEmpty) { 
     workers(id) ! Process(jobQueue.head) 
     loop(jobQueue.tail, workers, res) 
     } 

    case Stopped(id) => 
     loop(jobQueue, workers - id, res) 
    } 

} 
dispatcher.start() 

def results: List[B] = { 
    resultsChannel.receive { 
    case results => results // synchronously wait for the data in the channel 
    } 
} 
+0

謝謝!我會仔細看看你的代碼。 – Ralph 2011-04-21 14:35:43

+1

太棒了!我喜歡'Channel'的招數, – 2011-04-21 15:22:42

+0

可愛的代碼。我會分解它 - 提取方法等 - 爲了可讀性,但這個概念非常好。 – 2011-04-22 21:26:50

0

這裏是我想出了(由於瓦西爾Remeniuk最終版本)。該println語句貼上了// DEBUG評論都表明進展和main方法是單元測試:

import scala.actors.Actor 
import scala.actors.Channel 
import scala.actors.Scheduler 
import scala.annotation.tailrec 

object MultiWorker { 
    private val megabyte = 1024 * 1024 
    private val runtime = Runtime.getRuntime 

    def main(args: Array[String]) { 
    val jobs = (0 until 5).map((value: Int) => value).toList 
    val multiWorker = new MultiWorker[Int, Int](jobs, 2, { value => 
     Thread.sleep(100) 
     println(value) 
     value 
     }) 
    println("multiWorker.results: " + multiWorker.results) 
    Scheduler.shutdown 
    } 
} 

class MultiWorker[A, B](jobs: List[A], 
         actorCount: Int, 
         process: (A) => B) { 
    import MultiWorker._ 

    sealed abstract class Message 

    // Dispatcher -> Worker: Run this job and report results 
    case class Process(job: A) extends Message 

    // Worker -> Dispatcher: Result of processing 
    case class ReportResult(id: Int, result: B) extends Message 

    // Worker -> Dispatcher: I need work -- send me a job 
    case class SendJob(id: Int) extends Message 

    // Worker -> Dispatcher: I have stopped as requested 
    case class Stopped(id: Int) extends Message 

    // Dispatcher -> Worker: Stop working -- all jobs done 
    case class StopWorking() extends Message 

    /** 
    * A simple logger that can be sent text messages that will be written to the 
    * console. Used so that messages from the actors do not step on each other. 
    */ 
    object Logger 
    extends Actor { 
    def act() { 
     loop { 
     react { 
      case text: String => println(text) 
      case StopWorking => exit() 
     } 
     } 
    } 
    } 
    Logger.start() 

    /** 
    * A worker actor that will process jobs and return results to the 
    * dispatcher. 
    */ 
    case class Worker(id: Int) 
    extends Actor{ 
    def act() { 
     // Ask the dispatcher for an initial job 
     dispatcher ! SendJob(id) 

     loop { 
     react { 
      case Process(job) => 
      println("Worker(" + id + "): " + Process(job)) // DEBUG 
      val startTime = System.nanoTime 
      dispatcher ! ReportResult(id, process(job)) 

      val endTime = System.nanoTime 
      val totalMemory = (runtime.totalMemory/megabyte) 
      val usedMemory = totalMemory - (runtime.freeMemory/megabyte) 
      val message = "Finished job " + job + " in " + 
      ((endTime - startTime)/1000000000.0) + 
      " seconds using " + usedMemory + 
      "MB out of total " + totalMemory + "MB" 
      Logger ! message 

      dispatcher ! SendJob(id) 

      case StopWorking() => 
      println("Worker(" + id + "): " + StopWorking()) // DEBUG 
      dispatcher ! Stopped(id) 
      exit() 
     } 
     } 
    } 
    } 

    val resultsChannel = new Channel[List[B]] 
    /** 
    * The job dispatcher that sends jobs to the worker until the job queue 
    * (jobs: TraversableOnce[A]) is empty. It then tells the workers to 
    * stop working and returns the List[B] results to the caller. 
    */ 
    val dispatcher = new Actor { 
    def act() { 
     @tailrec 
     def loop(jobs: List[A], 
       workers: Map[Int, Worker], 
       acc: List[B]) { 
     println("dispatcher: loop: jobs: " + jobs + ", workers: " + workers + ", acc: " + acc) // DEBUG 
     if (!workers.isEmpty) { // Stop recursion when there are no more workers 
      react { 
      case ReportResult(id, result) => 
       println("dispatcher: " + ReportResult(id, result)) // DEBUG 
       loop(jobs, workers, result :: acc) 

      case SendJob(id) => 
       println("dispatcher: " + SendJob(id)) // DEBUG 
       if (!jobs.isEmpty) { 
       println("dispatcher: " + "Sending: " + Process(jobs.head) + " to " + id) // DEBUG 
       workers(id) ! Process(jobs.head) 
       loop(jobs.tail, workers, acc) 
       } else { 
       println("dispatcher: " + "Sending: " + StopWorking() + " to " + id) // DEBUG 
       workers(id) ! StopWorking() 
       loop(Nil, workers, acc) 
       } 

      case Stopped(id) => 
       println("dispatcher: " + Stopped(id)) // DEBUG 
       loop(jobs, workers - id, acc) 
      } 
     } else { 
      println("dispatcher: " + "jobs: " + jobs + ", workers: " + workers + ", acc: " + acc) // DEBUG 
      resultsChannel ! acc 
     } 
     } 

     loop(jobs, (0 until actorCount).map(id => (id, new Worker(id).start.asInstanceOf[Worker])).toMap, Nil) 
     exit() 
    } 
    }.start() 

    /** 
    * Get the results of the processing -- wait for the dispatcher to finish 
    * before returning. 
    */ 
    def results: List[B] = { 
    resultsChannel.receive { 
     case results => results 
    } 
    } 
}