2016-04-21 90 views
3

請求我有以下流是非常有效:阿卡流+阿卡的Http - 獲取上的錯誤

source 
    .map(x => HttpRequest(uri = x.rawRequest)) 
    .via(Http().outgoingConnection(host, port)) 
    .to(Sink.actorRef(myActor, IsDone)) 
    .run() 

和一個簡單的演員流完成時處理響應狀態,並最終消息:

/** 
    * A simple actor to count how many rows have been processed 
    * in the complete process given a http status 
    * 
    * It also finish the main thread upon a message of type [[IsDone]] is received 
    */ 
class MyActor extends Actor with ActorLogging { 

    var totalProcessed = 0 

    def receive = LoggingReceive { 

    case response: HttpResponse => 

     if(response.status.isSuccess()) { 
     totalProcessed = totalProcessed + 1 
     } else if(response.status.isFailure()) { 
     log.error(s"Http response error: ${response.status.intValue()} - ${response.status.reason()}") 
     } else { 
     log.error(s"Error: ${response.status.intValue()} - ${response.status.reason()}") 
     } 

    case IsDone => 
     println(s"total processed: $totalProcessed") 
     sys.exit() 
    } 
} 

case object IsDone 

我不知道這是否是處理事件並處理響應狀態的最佳方法,但目前爲止還是有效的。

問題是如何將原始請求傳遞給演員,以我能夠知道什麼請求導致了特定錯誤的方式。

我的演員可以期待以下代替:

case (request: String, response: HttpResponse) => 

但如何傳遞,我有我的管道開始的信息?

我想對map這樣的:

source 
    .map(x => (HttpRequest(uri = x.rawRequest), x.rawRequest)) 

但我對如何激發中的HTTP流不知道。

有什麼建議嗎?

+2

使用主機連接池,而不是嘗試每個請求明確打開傳出連接。該模型需要每個請求的任意標識符,然後返回響應,以便您可以正確地關聯請求和響應 – cmbaxter

+0

Hi @cmbaxter您是否打算使用此示例? http://doc.akka.io/docs/akka/2.4.4/scala/http/client-side/host-level.html#Example但用String代替? –

回答

1

隨着@cmbaxter的幫助下,我可以使用下面的代碼解決我的問題:

val poolClientFlow = Http().cachedHostConnectionPool[String](host, port) 

source 
    .map(url => HttpRequest(uri = url) -> url) 
    .via(poolClientFlow) 
    .to(Sink.actorRef(myActor, IsDone)) 
    .run() 

現在我的演員是能夠接收這樣的:

case (respTry: Try[HttpResponse], request: String) =>