2015-07-06 70 views
2

當我嘗試從未來的onComplete中調用updateState時,我使用PersistentActor, 非常新,失敗,沒有任何happanes,試圖調試它,並且我確實接觸到持續調用但未進入在updateStatePersistentActor不能在未來的onComplete中調用persist處理程序

trait Event 
case class Cmd(data: String) 
case class Evt(data: String) extends Event 

class BarActor extends PersistentActor{ 
    implicit val system = context.system 
    implicit val executionContext = system.dispatcher 
    def updateState(event: Evt): Unit ={ 
    println("Updating state") 
    state = state.updated(event) 
    sender() ! state 

    } 
    def timeout(implicit ec: ExecutionContext) = 
    akka.pattern.after(duration = 2 seconds, using = system.scheduler)(Future.failed(new TimeoutException("Got timed out!"))) 

    val receiveCommand: Receive = { 
    case Cmd(data) => 

     def anotherFuture(i: Int)(implicit system: ActorSystem) = { 
     val realF = Future { 
      i % 2 match { 
      case 0 => 
       Thread.sleep(100) 
      case _ => 
       Thread.sleep(500) 
      } 
      i 
     } 
     Future.firstCompletedOf(Seq(realF, timeout)) 
      .recover { 
      case _ => -1 
     } 
     } 
     val res = (1 to 10).map(anotherFuture(_)) 
     val list = Future.sequence(res) 
     list.onComplete{ 
     case _ => 
      persist(Evt("testing"))(updateState) 
     } 
    } 
} 

回答

2

你可以試試這個:

list.onComplete { 
    case _ => self ! Evt("testing") 
    } 

這增加receiveCommand

case evt: Evt => 
    persist(Evt("testing"))(updateStates) 
+0

感謝@Pim,這是一種解決方法,它可以工作。在這種情況下,我寧願通過使用自我轉發Evt(「測試」)來維護髮件人,但如果我可以避免它,我不尋找解決方法 – igx

+0

問題的根源是調用persist而不是receiveCommand方法。你也可以通過將處理和持久性分離成不同的角色來解決這個問題。 –

+1

@igx在您的情況下,您無法使用'轉發'來維護髮件人,因爲您在未來完成後嘗試持續發送! Sender()不保證是相同的參考,因爲演員可能在等待未來完成時收到其他消息。更好地捕獲'val'中的sender()值並使用'self.tell(Evt(..),originalSender)'。 –

相關問題