2013-11-23 177 views
0

我需要幫助的東西是粗體。阿卡匹配失敗,並恢復

我有一個演員,飛多個噴HttpRequests,請求分頁和演員確保它將結果順序寫入數據庫(序列是重要的,以恢復爬網)。我解釋這一點是因爲我現在不想探索其他併發模式。演員需要從超時恢復而不重新啓動。

在我的演員

我有以下幾點:

  case f : Failure => { 
       system.log.error("faiure") 
       system.log.error(s"$f") 
       system.shutdown() 
      } 
      case f : AskTimeoutException => { 
       system.log.error("faiure") 
       system.log.error(s"$f") 
       system.shutdown() 
      } 
      case msg @ _ => { 

       system.log.error("Unexpected message in harvest") 
       system.log.error(s"${msg}") 
       system.shutdown() 
      } 

,但我不能正確地匹配:

[ERROR] [11/23/2013 14:58:10.694] [Crawler-akka.actor.default-dispatcher-3] [ActorSystem(Crawler)] Unexpected message in harvest 
[ERROR] [11/23/2013 14:58:10.694] [Crawler-akka.actor.default-dispatcher-3] [ActorSystem(Crawler)] Failure(akka.pattern.AskTimeoutException: Timed out) 

我分派如下所示:

abstract class CrawlerActor extends Actor { 
    private implicit val timeout: Timeout = 20.seconds 
    import context._ 
    def dispatchRequest(node: CNode) { 
    val reqFut = (System.requester ? CrawlerRequest(node,Get(node.url))).map(r=> CrawlerResponse(node,r.asInstanceOf[HttpResponse])) 
    reqFut pipeTo self 
    } 


class CrawlerRequester extends Actor { 
    import context._ 
    val throttler = context.actorOf(Props(classOf[TimerBasedThrottler],System.Config.request_rate),"throttler") 
    throttler ! SetTarget(Some(IO(Http).actorRef)) 

    def receive : Receive = { 
    case CrawlerRequest(type_,request) => { 
     throttler forward request 
    } 
    } 
} 

一旦找到正確的匹配方式,就是無論如何,我可以得到我的手在超時發生的CrawlerRequest?它包含一些我需要弄清楚如何恢復的狀態。

回答

1

需要鍵入失敗案例類的完整路徑(或者我猜想導入它)。

case f: akka.actor.Status.Failure => { 
       system.log.error("faiure") 
       system.log.error(s"${f.cause}") 
       system.shutdown() 
      } 

只是留下了與超時相關的請求。在點請求調度中似乎需要具有自定義失敗處理程序的地圖和管道。現在看看它。

以下將超時時間拖入角色。

case class CrawlerRequestTimeout(request: CrawlerRequest) 
abstract class CrawlerActor extends Actor { 
    private implicit val timeout: Timeout = 20.seconds 
    import context._ 
    def dispatchRequest(node: CNode) { 
    val req = CrawlerRequest(node,Get(node.url)) 
    val reqFut = (System.requester ? req).map(r=> CrawlerResponse(node,r.asInstanceOf[HttpResponse])) 

    reqFut onFailure { 
     case te: akka.pattern.AskTimeoutException => self ! CrawlerRequestTimeout(req) 
    } 
    reqFut pipeTo self 
    } 
} 

火柴的:

case timeout : CrawlerRequestTimeout => { 
       println("boom") 
       system.shutdown() 
      } 

需要尋找抑制例外,雖然的一種方式,它仍然射擊。也許壓制並不是真正的問題,需要驗證。

不,抑制是一個問題,或者異常流入msg @ _,需要放入一個case類以吸收冗餘失敗消息。

好的,所以擺脫pipeto擺脫進入客戶端演員的異常。這也是很多更容易閱讀:d

abstract class CrawlerActor extends Actor { 
    private implicit val timeout: Timeout = 20.seconds 
    import context._ 
    def dispatchRequest(node: CNode) { 
    val req = CrawlerRequest(node,Get(node.url)) 
    val reqFut = (System.requester ? req) 

    reqFut onFailure { 
     case te: akka.pattern.AskTimeoutException => self ! CrawlerRequestTimeout(req) 
    } 
    reqFut onSuccess { 
     case r: HttpResponse => self ! CrawlerResponse(node,r) 
    } 
    } 
} 
3

如果使用pipeTo向通過tell發送消息作出響應會發生這種情況。

例如:

in actorA: actorB ! message 
in actorB: message => doStuff pipeTo sender 
in actorA: receives not 'scala.util.Failure', but 'akka.actor.Status.Failure' 

的附加邏輯在pipeTo是轉變TryFailure成阿卡的演員Failureakka.actor.Status.Failure)。當你使用ask模式時,這可以正常工作,因爲臨時詢問演員的句柄akka.actor.Status.Failure,但不適用於tell

希望這個簡短的回答幫助:)

祝你好運!

0

如果我理解正確,您目前沒有成功匹配AskTimeoutException

如果是這樣,你應該匹配case Failure(AskTimeoutException) => ...而不是case f : AskTimeoutException => ...