說我正在發送消息給一個Actor,當它正在處理一條消息時可能會出現更多的消息。現在,當它準備處理下一條消息時,我希望它只處理最新的消息,因爲以前的消息已經過時。我怎樣才能最好地實現這一點?Akka演員如何處理最新消息
使用Scala的演員庫,我能夠通過第一檢查從我的發件人來實現這一如下:
if (myActor.getState != Runnable)
myActor ! message
但我不認爲我可以在阿卡系統做這樣的測試
說我正在發送消息給一個Actor,當它正在處理一條消息時可能會出現更多的消息。現在,當它準備處理下一條消息時,我希望它只處理最新的消息,因爲以前的消息已經過時。我怎樣才能最好地實現這一點?Akka演員如何處理最新消息
使用Scala的演員庫,我能夠通過第一檢查從我的發件人來實現這一如下:
if (myActor.getState != Runnable)
myActor ! message
但我不認爲我可以在阿卡系統做這樣的測試
你可以實現自己的郵箱,這種方式不會影響你的actor的實現。請參閱this answer以獲取解決方案,以更改執行者實現而不是定製郵箱實施。
實現郵箱的上enqueue
下降舊郵件:
package akka.actor.test
import akka.actor.{ ActorRef, ActorSystem }
import com.typesafe.config.Config
import akka.dispatch.{Envelope, MessageQueue}
class SingleMessageMailbox extends akka.dispatch.MailboxType {
// This constructor signature must exist, it will be called by Akka
def this(settings: ActorSystem.Settings, config: Config) = this()
// The create method is called to create the MessageQueue
final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue =
new MessageQueue {
val message = new java.util.concurrent.atomic.AtomicReference[Envelope]
final def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit =
Option(message.get) foreach {deadLetters.enqueue(owner, _)}
def enqueue(receiver: ActorRef, handle: Envelope): Unit =
for {e <- Option(message.getAndSet(handle))}
receiver.asInstanceOf[InternalActorRef].
provider.deadLetters.
tell(DeadLetter(e.message, e.sender, receiver), e.sender)
def dequeue(): Envelope = message.getAndSet(null)
def numberOfMessages: Int = Option(message.get).size
def hasMessages: Boolean = message.get != null
}
}
請注意,我有這個類添加到包akka.actor
送舊郵件使用InternalActorRef
像implemented爲BoundedQueueBasedMessageQueue
一紙空文。
如果你想直接跳過你可以實現enqueue
像這樣的舊郵件:
def enqueue(receiver: ActorRef, handle: Envelope): Unit = message.set(handle)
用法:
object Test extends App {
import akka.actor._
import com.typesafe.config.ConfigFactory
// you should use your config file instead of ConfigFactory.parseString
val actorSystem: ActorSystem =
ActorSystem("default", ConfigFactory.parseString(
"""
akka.daemonic=on
myMailbox.mailbox-type = "akka.actor.test.SingleMessageMailbox"
"""))
class EchoActor extends Actor {
def receive = {
case m => println(m); Thread.sleep(500)
}
}
val actor = actorSystem.actorOf(Props[EchoActor].withMailbox("myMailbox"))
for {i <- 1 to 10} {
actor ! i
Thread.sleep(100)
}
Thread.sleep(1000)
}
測試:
$ sbt run
1
[INFO] <dead letters log>
[INFO] <dead letters log>
[INFO] <dead letters log>
5
[INFO] <dead letters log>
[INFO] <dead letters log>
[INFO] <dead letters log>
[INFO] <dead letters log>
10
沒有必要實現自己的郵箱。完全一樣。
去掉大量的文字,讓這段代碼爲自己說話:
// Either implement "equals" so that every job is unique (by default) or do another comparison in the match.
class Work
case class DoWork(work: Work)
class WorkerActor extends Actor {
// Left as an exercise for the reader, it clearly should do some work.
def perform(work: Work): Unit =()
def lookingForWork: Receive = {
case w: Work =>
self forward DoWork(w)
context become prepareToDoWork(w)
}
def prepareToDoWork(work: Work): Receive = {
case DoWork(`work`) =>
// No new work, so perform this one
perform(work)
// Now we're ready to look for new work
context become lookingForWork
case DoWork(_) =>
// Discard work that we don't need to do anymore
case w2: Work =>
// Prepare to do this newer work instead
context become prepareToDoWork(w2)
}
//We start out as looking for work
def receive = lookingForWork
}
這意味着,如果在郵箱中沒有新的工作,工作僅進行。
這是一個好主意,但是在你的實現中存在一個錯誤:你不應該繼續工作的平等:讓我們假設我們在郵箱中有2個工作:a從s1和a從s2 = (a)來自s1','DoWork(a)來自s1' =>'DoWork(a)來自s1','DoWork(a)來自s2'。所以我們將從s1處理'a而不是從s2'處理a。所以你要麼放棄關於發件人的信息,要麼修復你的實現。如果{s來自s1,b來自s1,a來自s2},這可能很重要。 – senia
這是[沒有這個bug的實現](http://pastebin.com/4Awz9VVb)。 – senia
的確,我假定實施等同的人考慮到工作項目是唯一的。這是除了一般意義上的點。我會添加一個免責聲明。 –
什麼消息處理保證你試圖確保? –
我相信阿卡也有優先收件箱。這可能會做你想要的,但這會導致問題:如何處理舊的消息?因此,您可以按照建議使用自定義收件箱,或讓您的演員存儲最後處理的消息(必須存儲在消息中)的時間戳,然後刪除之前的所有消息。 –