2016-09-11 217 views
6

我想使用akka-http-client將鏈接http請求作爲Stream。鏈中的每個http請求都取決於先前請求的成功/響應,並使用它來構建新的請求。如果請求不成功,Stream應返回不成功請求的響應。鏈中的Akka-http客戶端請求

我該如何在akka-http中構建這樣的流? 我應該使用哪個akka-http客戶端級API?

+0

如果所有請求均成功,那麼該流應實現哪些內容?想必你想從他們那裏得到一些數據? – Mikesname

+0

是的,我想獲得類HTTPResponse在流的末尾;當實現時,我想知道它是失敗的成功和失敗的原因。 – Rabzu

回答

8

如果您正在製作網絡爬蟲,請查看this post。此答案解決了一個更簡單的情況,例如下載分頁資源,其中到下一頁的鏈接位於當前頁面響應的標題中。

您可以使用Source.unfoldAsync方法創建鏈接源 - 其中一個項目導向下一個項目。這需要一個函數,它使用一個元素S並返回Future[Option[(S, E)]]來確定流是否應該繼續發射類型爲E的元素,並將該狀態傳遞給下一個調用。

在你的情況,這是一種像:

  1. 服用初始HttpRequest
  2. 生產Future[HttpResponse]
  3. 如果響應點到另一個URL,返回Some(request -> response),否則None

但是,有一個皺紋,這就是這個wi如果它不包含指向下一個請求的指針,則不會發送來自流的響應。

爲了解決這個問題,可以使函數傳遞給unfoldAsync返回Future[Option[(Option[HttpRequest], HttpResponse)]]。這使您可以處理以下情況:

  • 當前響應是一個錯誤
  • 當前響應指向另一個請求
  • 電流響應不指向另一個請求

接下來是一些註釋的代碼,它概述了這種方法,但首先是初步的:

當HTTP請求流式傳輸到響應Akka流,你需要確保響應體被消耗,否則會發生不好的事情(死鎖等)。如果你不需要body,你可以忽略它,但是在這裏我們使用一個函數來將HttpEntity從(潛在的)流分成嚴格實體:

import scala.concurrent.duration._ 

def convertToStrict(r: HttpResponse): Future[HttpResponse] = 
    r.entity.toStrict(10.minutes).map(e => r.withEntity(e)) 

接着,一對夫婦的功能,以從HttpResponse創建Option[HttpRequest]。這個例子使用像GitHub的分頁鏈接,其中Links頭包含e.g方案:<https://api.github.com/...> rel="next"

def nextUri(r: HttpResponse): Seq[Uri] = for { 
    linkHeader <- r.header[Link].toSeq 
    value <- linkHeader.values 
    params <- value.params if params.key == "rel" && params.value() == "next" 
} yield value.uri 

def getNextRequest(r: HttpResponse): Option[HttpRequest] = 
    nextUri(r).headOption.map(next => HttpRequest(HttpMethods.GET, next)) 

接下來,我們將傳遞到unfoldAsync的真正功能。它使用HTTP阿卡API Http().singleRequest()採取的HttpRequest併產生Future[HttpResponse]

def chainRequests(reqOption: Option[HttpRequest]): Future[Option[(Option[HttpRequest], HttpResponse)]] = 
    reqOption match { 
    case Some(req) => Http().singleRequest(req).flatMap { response => 
     // handle the error case. Here we just return the errored response 
     // with no next item. 
     if (response.status.isFailure()) Future.successful(Some(None -> response)) 

     // Otherwise, convert the response to a strict response by 
     // taking up the body and looking for a next request. 
     else convertToStrict(response).map { strictResponse => 
     getNextRequest(strictResponse) match { 
      // If we have no next request, return Some containing an 
      // empty state, but the current value 
      case None => Some(None -> strictResponse) 

      // Otherwise, pass on the request... 
      case next => Some(next -> strictResponse) 
     } 
     } 
    } 
    // Finally, there's no next request, end the stream by 
    // returning none as the state. 
    case None => Future.successful(None) 
    } 

注意,如果我們得到一個差錯響應,流將不會繼續,因爲我們在接下來的狀態返回None

您可以調用它來獲取HttpResponse物體流的,像這樣:

val initialRequest = HttpRequest(HttpMethods.GET, "http://www.my-url.com") 
Source.unfoldAsync[Option[HttpRequest], HttpResponse](
    Some(initialRequest)(chainRequests) 

至於回國的最後一個(或差錯)響應的值,你只需要使用Sink.last,因爲流將在成功完成或第一次出現錯誤響應時結束。例如:

def getStatus: Future[StatusCode] = Source.unfoldAsync[Option[HttpRequest], HttpResponse](
     Some(initialRequest))(chainRequests) 
    .map(_.status) 
    .runWith(Sink.last) 
+0

謝謝。非常鼓舞人心的答案 – Rabzu