2016-05-02 30 views
0

我有一個演員正在編排數據庫更新。我需要確保每個操作只有在前一個操作完成後才能執行。 這是因爲操作B將重新使用操作結果A.需要依次使用Akka執行操作

這裏是我爲演員寫的代碼。

class DbUpdateActor(databaseOperations: DBProvider) extends Actor { 

    implicit val ec:ExecutionContext = context.system.dispatcher 

    def receive: Receive = { 

    case newInfo : UpdateDb => 

     val future = Future { 
      // gets the current situation from DB 
      val status = databaseOperations.getSituation() 
      // do db update 
      databaseOperations.save(something) 
     } 

     future onComplete { 
     case Success(result: List[Int]) => 
      // 
     case Failure(err: Throwable) => 
      // 
     } 
    } 
} 

該代碼適用於單個操作。如果我激發兩次更新,那麼第二次更新將異步執行,因此它在第一次完成之前就開始了。

我正在閱讀有關不同類型的郵箱,不知道是否有不同的郵箱會有所幫助。

有什麼建議嗎?

回答

1

你可以探索的一個選擇是刪除Future並允許在actor中運行阻塞數據庫代碼。然後,使用一個單獨的調度程序(可能是PinnedDispatcher)來阻止來自主actor系統調度程序的這個阻塞代碼,併爲其提供自己的線程來運行。通過阻止身體並移除Future,您將確保演員郵箱的正確順序執行。變化的草圖,使這項工作如下:

object DbUpdateActor{ 
    def props(databaseOperations:DBProvider) = 
    Props(classOf[DbUpdateActor], databaseOperations). 
     withDispatcher("db-update-dispatcher") 
} 

class DbUpdateActor(databaseOperations: DBProvider) extends Actor { 
    def receive: Receive = { 

    case newInfo : UpdateDb => 
     val status = databaseOperations.getSituation() 
     databaseOperations.save(something) 
    } 
} 

然後,只要你有以下調度員在演員系統配置配置:

db-update-dispatcher { 
    executor = "thread-pool-executor" 
    type = PinnedDispatcher 
} 

而且你開始了DB更新的演員,像這樣:

val updater = system.actorOf(DbUpdateActor.props(databaseOperations)) 

那麼你應該是所有設置的設置這個演員多達運行的方式阻止代碼,不會產生負面的主調度程序的吞吐量的影響。

+0

工作得很好。我期待Future塊保持演員忙碌,並且只有在將來的塊完成後才繼續下一條消息。我錯過了什麼? – abx78

+1

在Future上註冊的回調函數(如您的案例中的「onComplete」)異步處理,而不是完全不同的線程。這意味着當前線程上的代碼繼續執行。在你的情況下,在'Future'之後沒有更多的代碼,所以演員完成對該消息的處理並轉到下一個。因爲這個,你需要小心地混合'未來'和演員。 – cmbaxter

+0

真棒,感謝您的解釋。 – abx78

0

這個怎麼樣:開始操作A在一個孩子;當孩子完成時,它會向父母發送一條消息,說明它已完成。然後,您可以在現有的或新的孩子中開始操作B.