2013-01-14 166 views
3

我正在嘗試在akka演員之間建立一個消息處理過程,以表示主給工作人員一份工作,並密切關注它。我的問題是Akka演員工作與期貨交接

  1. 就是我下面提出一個合理的方法,並
  2. 即使它不是 ,我想知道如何才能正確地與期貨的組成 來完成,對於爲了我未來的教育。

我想的過程是這樣的

1)主機發送工作,工人與ask。它期望在5秒內得到答覆,否則認爲工人失去了機會,並且必須再次進入競標。

import context.dispatcher 
implicit val timeout = Timeout(5 seconds) 
val workCompletedFuture = (worker ? WorkTicket(work)).mapTo[Future[WorkCompleted]] 

2A)如果工人沒有在5秒內做出反應,我想師傅將自身發送一個消息,說來重新分配工作。

self ! WorkAllocationFailed(work, worker) 

2B)如果工人沒有響應,那麼它給了我們一個未來[WorkCompleted。我想等待未來完成,例如2分鐘。

3a)的如果未來[WorkCompleted]未能在超時時間內完成,然後重新分配工作

self ! WorkFailed(work, worker) 

3b)的如果未來[WorkCompleted]成功然後收集結果

我我試圖創建這個邏輯,但是我陷入了嵌套的onComplete混亂,我不知道如何處理Future [WorkCompleted]的超時。我嘗試閱讀Akka 2.10 Futures docs,但無法找出解決方案。

回答

2

我恩德雷的答案達成一致 - 都非常好點。

如何:

1)安排一個消息給自己的超時時間(使用system.scheduler.scheduleOnce

2)發送工作郵件給工人使用常規tell

3A)如完成的工作在超時消息之前回來,取消預定的工作並使用步驟1和2重新分配工作

3b)如果完成的工作在超時消息th要麼忽略它,要麼取消重新分配的工作。

一個地方的期貨可能對工人有幫助,尤其是在工作需要很長時間或阻礙工作時。工作人員可以使用未來來完成工作並保持可用於處理更多傳入消息,例如取消工作。

2

一般的想法,即你有一位將工作交給工作人員池的主人是一種健全的模式。

另一方面,當系統的所有部分都已經是演員時,我不建議使用Futures。不要使用提問來提交工作,你可以通過告訴發送它。主人然後可以定期檢查超時工作並重新提交。

另外,在actor的主體中調用onComplete是非常危險的,因爲它在一個潛在的不同線程上執行。與演員溝通的安全方式是通過信息傳遞。如果你有一個未來,並且你希望在未來完成時在演員中完成一些事情,那麼最好使用管道模式。

你的代碼段中還有一個小錯誤。如果你的工人演員與WorkCompleted回覆,那麼這就是你真正想要的行:

val workCompletedFuture = (worker ? WorkTicket(work)).mapTo[WorkCompleted]