2014-10-10 187 views
5

我試圖在沒有使用Queue的情況下在scala中實現Producer Consumer程序。因爲我認爲Actor已經實現了「郵件隊列」或其他的東西,所以再次編寫代碼將是多餘的。在scala中實現Producer Consumer的正確方法是什麼

我試圖純粹在Actor中編寫程序。 下面是一個多生產者多個消費者程序。 生產者睡一會兒,模擬做某事。消費者根本不睡覺。

但是我不知道怎麼關閉程序,如果我不添加主管演員監控消費者,以及使用「等待」(代碼中的超級類)

是一種承諾對象無論如何擺脫他們?

import akka.actor.Actor.Receive 
import akka.actor._ 
import akka.routing._; 
import akka.util._ 

import scala.concurrent.{Await, Promise} 
import scala.concurrent.duration._ 

class Producer(val pool:ActorRef)(val name:String) extends Actor { 

    def receive = { 
    case _ => 
     while (true) { 
     val sleepTime = scala.util.Random.nextInt(1000) 
     Thread.sleep(sleepTime) 
     println("Producer %s send food" format name) 
     pool ! name 
     } 
    } 
} 

class Consumer(supervisor : ActorRef)(val name:String) extends Actor { 

    var counter = 0 

    def receive = { 
    case s => 
     counter += 1 
     println("%s eat food produced by %s" format (name,s)) 

     if (counter >= 10) { 
     println("%s is full" format name) 

     context.stop(self) 
     supervisor ! 1 
     } 
    } 
} 

class Supervisor(p:Promise[String]) extends Actor { 

    var r = 3 

    def receive = { 
    case _ => 
     r -= 1 
     if (0 == r) { 
     println("All consumer stopped") 
     context.stop(self) 
     p success ("Good") 
     } 
    } 

} 

object Try3 { 

    def work(): Unit = { 
    val system = ActorSystem("sys1") 
    val nProducer = 5; 
    val nConsumer = 3; 
    val p = Promise[String] 
    val supervisor = system.actorOf(Props(new Supervisor(p))); 
    val arrConsumer = for (i <- 1 to nConsumer) yield system.actorOf(Props(new Consumer(supervisor)("Consumer %d" format (i)))) 
    val poolConsumer = system.actorOf(Props.empty.withRouter(RoundRobinRouter(arrConsumer))) 
    val arrProducer = for (i <- 1 to nProducer) yield system.actorOf(Props(new Producer(poolConsumer)("Producer %d" format (i)))) 

    arrProducer foreach (_ ! "start") 

    Await.result(p.future,Duration.Inf) 
    println("great!") 
    system.shutdown 
    } 

    def main(args:Array[String]): Unit = { 
    work() 
    } 
} 

接收功能產生類有一個問題,它不會被關閉,因爲它雖然沒有打破的條件。

我能想到的唯一方法是「向製作者本身發送信息」。 我不知道這是實現這種請求的正常方式嗎?

下面是修改代碼:

class Producer(val pool:ActorRef)(val name:String) extends Actor { 

    // original implementation: 
    // def receive = { 
    // case _ => 
    // while (true){ 
    //  val sleepTime = scala.util.Random.nextInt(1000) 
    //  Thread.sleep(sleepTime) 
    //  println("Producer %s send food" format name) 
    //  pool ! name 
    // } 
    // } 

    case object Loop; 

    def receive = { 
    case _ => 
     val sleepTime = scala.util.Random.nextInt(1000) 
     Thread.sleep(sleepTime) 
     println("Producer %s send food" format name) 
     pool ! name 
     self ! Loop //send message to itself 
    } 
} 

不管我的實現,什麼是Scala實現生產者消費者程序的正確方法,與演員或未來/無極?

回答

2

你不應該在actor中阻塞(在你的情況下是Thread.sleep,while循環)。在演員內部阻止從所有演員中使用的線程池中獲取線程。即使像你這樣少量的製作者也會讓ActorSystem中的所有actor脫離線程並使其無法使用。

取而代之的是使用Scheduler來定期在Producer中定期發送meesage。

override def preStart(): Unit = { 
    import scala.concurrent.duration._ 
    import context.dispatcher 
    context.system.scheduler.schedule(
    initialDelay = 0.seconds, 
    interval = 1.second, 
    receiver = pool, 
    message = name 
) 
} 
+1

謝謝@Martynas什麼。你解決了我的「循環」問題。我仍在尋求生產者 - 消費者優雅實施的答案。 – worldterminator 2014-10-10 07:26:22

0

你想想實現Terminator演員:)

object Terminator { 
    case class WatchMe(ref: ActorRef) 
} 
class Terminator extends Actor { 
    var consumers: Map[ActorRef, ActorRef] = Map() 

    def receive = { 
     case WatchMe(ref) => { 
     consumers += ref -> ref 
     context.watch(ref) 
     } 
     case Terminated(ref) => { 
     context.unwatch(ref) 
     consumers.get(ref).foreach { ref -> ref ! PoisonPill } 
     consumers -= ref 
     //If all consumers are dead stop.self and delegate NoConsumers message higher in hierarchy 
     if(consumers.size == 0) { 
      delegate() 
      context.stop(self) 
     } 
     } 
    } 
} 
相關問題