2011-01-27 64 views
2

在我的模型中有大約8-9斯卡拉演員。 每個演員有RabbitMQ的服務器將在其自己的隊列一些斯卡拉演員進入等待狀態時8-10演員同時運行

在每個演員的動作方法

。它不斷上市隊列 像

def act { 
    this ! 1 
    loop { 
     react { 
     case 1 => processMessage(QManager.getMessage); this ! 1 
     } 
    } 
    } 

我一個RabbitMQ的QManager getMessage方法

def getMessage: MyObject = { 
    getConnection 
    val durable = true 
    channel.exchangeDeclare(EXCHANGE, "direct", durable) 
    channel.queueDeclare(QUEUE, durable, false, false, null) 
    channel queueBind (QUEUE, EXCHANGE, _ROUTING_KEY) 
    consumer = new QueueingConsumer(channel) 
    channel basicConsume (QUEUE, false, consumer) 

    var obj = new MyObject 
    try { 
     val delivery = consumer.nextDelivery 
     val msg = new java.io.ObjectInputStream(
     new java.io.ByteArrayInputStream(delivery.getBody)).readObject() 
     obj = msg.asInstanceOf[MyObject] 
     channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false) 
    } catch { 
     case e: Exception =>logger.error("error in Get Message", e);endConnection 
    } 
    endConnection 
    obj 
    } 

所有9個Actor都有自己的對象類型和一個自己的對象類型QManager

GetMessage我正在使用Rabbitmq QueueConsu濱海

val delivery = consumer.nextDelivery 

和nextDelivery方法時,在隊列中 這種方法itfounds使演員在等待狀態

當我開始所有演員8只,其中4工作正常,其他都沒有說明返回一個對象。 我有測試每一個演員相互依賴單獨運行

開始時,我開始更多的是4名演員

是therer Scala的演員穿的任何問題,會出現問題時,他們工作得很好。

回答

5

免責聲明:我是阿卡

的寶象雷克斯說,你忙等待,佔用線程,線程上的共享池。

我不知道,如果你要考阿卡的選項,但我們對AMQP消費者(生產者)的支持,演員:Akka-AMQP

生產AMQP的消息:

val exchangeParameters = ExchangeParameters("my_topic_exchange", Topic) 
    val producer = AMQP.newProducer(connection, ProducerParameters(Some(exchangeParameters), producerId = Some("my_producer")) 
producer ! Message("Some simple sting data".getBytes, "some.routing.key") 

消費AMQP消息:

val exchangeParameters = ExchangeParameters("my_topic_exchange", Topic) 
val myConsumer = AMQP.newConsumer(connection, ConsumerParameters("some.routing.key", actorOf(new Actor { def receive = { 
    case Delivery(payload, _, _, _, _, _) => log.info("Received delivery: %s", new String(payload)) 
}}), None, Some(exchangeParameters))) 

另一種選擇是使用Akka-Camel與演員

0消耗和產生AMQP消息
5

你所有的演員都在運行;他們從不休息。由於演員是通過共同的線索共享的,這意味着幸運的優勝者演員一直運行,不幸的輸家永遠不會有任何時間。如果您希望有一個實體始終佔用整個線程,通常最好使用java Thread,或者至少使用receive而不是react。您還可以增加演員池的大小以匹配演員數量,但通常情況下,如果您有大量演員,而演員中的所有演員都會一直運行,那麼您應該仔細考慮如何構建程序。

+0

我可以知道如何增加演員池的大小 – 2011-01-28 15:10:13

+0

請參閱http://stackoverflow.com/questions/1597899的答案 – 2011-01-28 20:21:59