我試圖在沒有使用Queue的情況下在scala中實現Producer Consumer程序。因爲我認爲Actor已經實現了「郵件隊列」或其他的東西,所以再次編寫代碼將是多餘的。在scala中實現Producer Consumer的正確方法是什麼
我試圖純粹在Actor中編寫程序。 下面是一個多生產者多個消費者程序。 生產者睡一會兒,模擬做某事。消費者根本不睡覺。
但是我不知道怎麼關閉程序,如果我不添加主管演員監控消費者,以及使用「等待」(代碼中的超級類)
是一種承諾對象無論如何擺脫他們?
import akka.actor.Actor.Receive
import akka.actor._
import akka.routing._;
import akka.util._
import scala.concurrent.{Await, Promise}
import scala.concurrent.duration._
class Producer(val pool:ActorRef)(val name:String) extends Actor {
def receive = {
case _ =>
while (true) {
val sleepTime = scala.util.Random.nextInt(1000)
Thread.sleep(sleepTime)
println("Producer %s send food" format name)
pool ! name
}
}
}
class Consumer(supervisor : ActorRef)(val name:String) extends Actor {
var counter = 0
def receive = {
case s =>
counter += 1
println("%s eat food produced by %s" format (name,s))
if (counter >= 10) {
println("%s is full" format name)
context.stop(self)
supervisor ! 1
}
}
}
class Supervisor(p:Promise[String]) extends Actor {
var r = 3
def receive = {
case _ =>
r -= 1
if (0 == r) {
println("All consumer stopped")
context.stop(self)
p success ("Good")
}
}
}
object Try3 {
def work(): Unit = {
val system = ActorSystem("sys1")
val nProducer = 5;
val nConsumer = 3;
val p = Promise[String]
val supervisor = system.actorOf(Props(new Supervisor(p)));
val arrConsumer = for (i <- 1 to nConsumer) yield system.actorOf(Props(new Consumer(supervisor)("Consumer %d" format (i))))
val poolConsumer = system.actorOf(Props.empty.withRouter(RoundRobinRouter(arrConsumer)))
val arrProducer = for (i <- 1 to nProducer) yield system.actorOf(Props(new Producer(poolConsumer)("Producer %d" format (i))))
arrProducer foreach (_ ! "start")
Await.result(p.future,Duration.Inf)
println("great!")
system.shutdown
}
def main(args:Array[String]): Unit = {
work()
}
}
接收功能產生類有一個問題,它不會被關閉,因爲它雖然沒有打破的條件。
我能想到的唯一方法是「向製作者本身發送信息」。 我不知道這是實現這種請求的正常方式嗎?
下面是修改代碼:
class Producer(val pool:ActorRef)(val name:String) extends Actor {
// original implementation:
// def receive = {
// case _ =>
// while (true){
// val sleepTime = scala.util.Random.nextInt(1000)
// Thread.sleep(sleepTime)
// println("Producer %s send food" format name)
// pool ! name
// }
// }
case object Loop;
def receive = {
case _ =>
val sleepTime = scala.util.Random.nextInt(1000)
Thread.sleep(sleepTime)
println("Producer %s send food" format name)
pool ! name
self ! Loop //send message to itself
}
}
不管我的實現,什麼是Scala實現生產者消費者程序的正確方法,與演員或未來/無極?
謝謝@Martynas什麼。你解決了我的「循環」問題。我仍在尋求生產者 - 消費者優雅實施的答案。 – worldterminator 2014-10-10 07:26:22