假設我必須完成幾個CPU綁定的任務。例如,如果我有4個CPU,我可能會創建一個4-5個工作線程的固定大小的線程池,等待一個隊列並將這些任務放入隊列中。在Java中,我可以使用java.util.concurrent
(也許是ThreadPoolExecutor
)來實現此機制。用Scala執行器執行CPU綁定的任務嗎?
你將如何實現它與斯卡拉演員?
假設我必須完成幾個CPU綁定的任務。例如,如果我有4個CPU,我可能會創建一個4-5個工作線程的固定大小的線程池,等待一個隊列並將這些任務放入隊列中。在Java中,我可以使用java.util.concurrent
(也許是ThreadPoolExecutor
)來實現此機制。用Scala執行器執行CPU綁定的任務嗎?
你將如何實現它與斯卡拉演員?
所有的角色基本上都是由調度程序在引擎蓋下執行的線程。調度程序創建一個線程池來執行與您的內核數量大致相關的actor。這意味着,你可以創建每個任務需要執行,剩下的Scala的演員:
for(i <- 1 to 20) {
actor {
print(i);
Thread.sleep(1000);
}
}
這裏的缺點是取決於任務的數量,每個任務創建線程的成本可能由於線程在Java中並不便宜,所以價格相當昂貴。
一個簡單的方法來創建工人的演員有界池,然後通過短信分發任務,他們會是這樣的:
import scala.actors.Actor._
val numWorkers = 4
val pool = (1 to numWorkers).map { i =>
actor {
loop {
react {
case x: String => println(x)
}
}
}
}
for(i <- 1 to 20) {
val r = (new util.Random).nextInt(numWorkers)
pool(r) ! "task "+i
}
我們之所以要創建多個角色是因爲一個演員的過程每次只有一條消息(即任務),以便爲需要創建多個任務的任務獲得並行性。
備註:當涉及到I/O綁定任務時,默認調度程序變得尤爲重要,因爲在這種情況下您肯定會想要更改線程池的大小。兩篇很好的博客文章詳細介紹了這些內容:Explore the Scheduling of Scala Actors和Scala actors thread pool pitfall。
就是這樣說的,Akka是一個Actor框架,它爲Actors提供了更高級的工作流的工具,而且這是我在任何實際應用中都會用到的。這裏是一個負載平衡(而不是隨機的)任務執行:
import akka.actor.Actor
import Actor._
import akka.routing.{LoadBalancer, CyclicIterator}
class TaskHandler extends Actor {
def receive = {
case t: Task =>
// some computationally expensive thing
t.execute
case _ => println("default case is required in Akka...")
}
}
class TaskRouter(numWorkers: Int) extends Actor with LoadBalancer {
val workerPool = Vector.fill(numWorkers)(actorOf[TaskHandler].start())
val seq = new CyclicIterator(workerPool)
}
val router = actorOf(new TaskRouter(4)).start()
for(i <- 1 to 20) {
router ! Task(..)
}
你可以有不同類型的負載平衡(CyclicIterator是循環分配),所以你可以檢查文檔here以獲得更多信息。
那麼,你通常不會。使用演員的部分吸引力是他們爲你處理這些細節。
但是,如果您堅持要管理它,則需要覆蓋Actor
類上受保護的scheduler
方法以返回相應的IScheduler
。有關調度程序,另請參閱scala.actors.scheduler
package以及有關Actor
trait的註釋。
您是否嘗試過Scala Actor或平行集合?他們已經可以將工作負載分配到不同的CPU上。如果你需要更多的控制,你可以看看阿卡。 – Fabian