問題:我需要編寫一個應用程序來處理數百個文件,每個文件需要幾百MB和幾秒鐘才能完成。我已經使用Future[Report]
使用Future[Report]
創建了對象,但使用Executors.newFixedThreadPool()
創建了對象,但因爲由ExecutorService.invokeAll()
返回的List[Future[Report]]
對象持有到每個進程使用的中間內存而導致內存不足錯誤。我通過在計算Report
值(每Report
只有幾百行)而不是在call
方法(從接口Callable
)中進行計算之後,通過在處理器中返回來自處理器中本地方法的對象Report
解決了該問題。斯卡拉演員而不是爪哇期貨
我想嘗試用Scala Actors來解決這個問題。我創建了一個採用一系列作業(作業,結果和處理功能的參數化類型)的類,並將每個作業配置爲一個可配置數量的Worker
實例(Actor
的子類)。代碼如下。
問題:
我不知道,我的處理是正確的 。我不喜歡用
CountDownLatch
來延遲從調度程序返回結果。我寧願寫的調度員更「實用」的版本,不修改
jobsQueue
列表或workers
HashMap的,也許借用Clojure的尾遞歸loop
結構(我在其它使用的方法@tailrec def loop
Scala代碼)。
我急切地等待Philipp Haller和Frank Sommers發表的"Actors in Scala"。
下面是代碼:
package multi_worker
import scala.actors.Actor
import java.util.concurrent.CountDownLatch
object MultiWorker {
private val megabyte = 1024 * 1024
private val runtime = Runtime.getRuntime
}
class MultiWorker[A, B](jobs: List[A],
actorCount: Int)(process: (A) => B) {
import MultiWorker._
sealed abstract class Message
// Dispatcher -> Worker: Run this job and report results
case class Process(job: A) extends Message
// Worker -> Dispatcher: Result of processing
case class ReportResult(id: Int, result: B) extends Message
// Worker -> Dispatcher: I need work -- send me a job
case class SendJob(id: Int) extends Message
// Worker -> Dispatcher: I have stopped as requested
case class Stopped(id: Int) extends Message
// Dispatcher -> Worker: Stop working -- all jobs done
case class StopWorking extends Message
/**
* A simple logger that can be sent text messages that will be written to the
* console. Used so that messages from the actors do not step on each other.
*/
object Logger
extends Actor {
def act() {
loop {
react {
case text: String => println(text)
case StopWorking => exit()
}
}
}
}
Logger.start()
/**
* A worker actor that will process jobs and return results to the
* dispatcher.
*/
class Worker(id: Int)
extends Actor{
def act() {
// Ask the dispatcher for an initial job
dispatcher ! SendJob(id)
loop {
react {
case Process(job) =>
val startTime = System.nanoTime
dispatcher ! ReportResult(id, process(job))
val endTime = System.nanoTime
val totalMemory = (runtime.totalMemory/megabyte)
val usedMemory = totalMemory - (runtime.freeMemory/megabyte)
val message = "Finished job " + job + " in " +
((endTime - startTime)/1000000000.0) +
" seconds using " + usedMemory +
"MB out of total " + totalMemory + "MB"
Logger ! message
dispatcher ! SendJob(id)
case StopWorking =>
dispatcher ! Stopped(id)
exit()
}
}
}
}
val latch = new CountDownLatch(1)
var res = List.empty[B]
/**
* The job dispatcher that sends jobs to the worker until the job queue
* (jobs: TraversableOnce[A]) is empty. It then tells the workers to
* stop working and returns the List[B] results to the caller.
*/
val dispatcher = new Actor {
def act() {
var jobQueue = jobs
var workers = (0 until actorCount).map(id => (id, new Worker(id))).toMap
workers.values.foreach(_.start())
loop {
react {
case ReportResult(id, result) =>
res = result :: res
if (jobQueue.isEmpty && workers.isEmpty) {
latch.countDown()
exit()
}
case SendJob(id) =>
if (!jobQueue.isEmpty) {
workers(id) ! Process(jobQueue.head)
jobQueue = jobQueue.tail
}
case Stopped(id) =>
workers = workers - id
}
}
}
}
dispatcher.start()
/**
* Get the results of the processing -- wait for the dispatcher to finish
* before returning.
*/
def results: List[B] = {
latch.await()
res
}
}
謝謝!我會仔細看看你的代碼。 – Ralph 2011-04-21 14:35:43
太棒了!我喜歡'Channel'的招數, – 2011-04-21 15:22:42
可愛的代碼。我會分解它 - 提取方法等 - 爲了可讀性,但這個概念非常好。 – 2011-04-22 21:26:50