我想使用akka-http-client將鏈接http請求作爲Stream。鏈中的每個http請求都取決於先前請求的成功/響應,並使用它來構建新的請求。如果請求不成功,Stream應返回不成功請求的響應。鏈中的Akka-http客戶端請求
我該如何在akka-http中構建這樣的流? 我應該使用哪個akka-http客戶端級API?
我想使用akka-http-client將鏈接http請求作爲Stream。鏈中的每個http請求都取決於先前請求的成功/響應,並使用它來構建新的請求。如果請求不成功,Stream應返回不成功請求的響應。鏈中的Akka-http客戶端請求
我該如何在akka-http中構建這樣的流? 我應該使用哪個akka-http客戶端級API?
如果您正在製作網絡爬蟲,請查看this post。此答案解決了一個更簡單的情況,例如下載分頁資源,其中到下一頁的鏈接位於當前頁面響應的標題中。
您可以使用Source.unfoldAsync
方法創建鏈接源 - 其中一個項目導向下一個項目。這需要一個函數,它使用一個元素S
並返回Future[Option[(S, E)]]
來確定流是否應該繼續發射類型爲E
的元素,並將該狀態傳遞給下一個調用。
在你的情況,這是一種像:
HttpRequest
Future[HttpResponse]
Some(request -> response)
,否則None
但是,有一個皺紋,這就是這個wi如果它不包含指向下一個請求的指針,則不會發送來自流的響應。
爲了解決這個問題,可以使函數傳遞給unfoldAsync
返回Future[Option[(Option[HttpRequest], HttpResponse)]]
。這使您可以處理以下情況:
接下來是一些註釋的代碼,它概述了這種方法,但首先是初步的:
當HTTP請求流式傳輸到響應Akka流,你需要確保響應體被消耗,否則會發生不好的事情(死鎖等)。如果你不需要body,你可以忽略它,但是在這裏我們使用一個函數來將HttpEntity
從(潛在的)流分成嚴格實體:
import scala.concurrent.duration._
def convertToStrict(r: HttpResponse): Future[HttpResponse] =
r.entity.toStrict(10.minutes).map(e => r.withEntity(e))
接着,一對夫婦的功能,以從HttpResponse
創建Option[HttpRequest]
。這個例子使用像GitHub的分頁鏈接,其中Links
頭包含e.g方案:<https://api.github.com/...> rel="next"
:
def nextUri(r: HttpResponse): Seq[Uri] = for {
linkHeader <- r.header[Link].toSeq
value <- linkHeader.values
params <- value.params if params.key == "rel" && params.value() == "next"
} yield value.uri
def getNextRequest(r: HttpResponse): Option[HttpRequest] =
nextUri(r).headOption.map(next => HttpRequest(HttpMethods.GET, next))
接下來,我們將傳遞到unfoldAsync
的真正功能。它使用HTTP阿卡API Http().singleRequest()
採取的HttpRequest
併產生Future[HttpResponse]
:
def chainRequests(reqOption: Option[HttpRequest]): Future[Option[(Option[HttpRequest], HttpResponse)]] =
reqOption match {
case Some(req) => Http().singleRequest(req).flatMap { response =>
// handle the error case. Here we just return the errored response
// with no next item.
if (response.status.isFailure()) Future.successful(Some(None -> response))
// Otherwise, convert the response to a strict response by
// taking up the body and looking for a next request.
else convertToStrict(response).map { strictResponse =>
getNextRequest(strictResponse) match {
// If we have no next request, return Some containing an
// empty state, but the current value
case None => Some(None -> strictResponse)
// Otherwise, pass on the request...
case next => Some(next -> strictResponse)
}
}
}
// Finally, there's no next request, end the stream by
// returning none as the state.
case None => Future.successful(None)
}
注意,如果我們得到一個差錯響應,流將不會繼續,因爲我們在接下來的狀態返回None
。
您可以調用它來獲取HttpResponse
物體流的,像這樣:
val initialRequest = HttpRequest(HttpMethods.GET, "http://www.my-url.com")
Source.unfoldAsync[Option[HttpRequest], HttpResponse](
Some(initialRequest)(chainRequests)
至於回國的最後一個(或差錯)響應的值,你只需要使用Sink.last
,因爲流將在成功完成或第一次出現錯誤響應時結束。例如:
def getStatus: Future[StatusCode] = Source.unfoldAsync[Option[HttpRequest], HttpResponse](
Some(initialRequest))(chainRequests)
.map(_.status)
.runWith(Sink.last)
謝謝。非常鼓舞人心的答案 – Rabzu
如果所有請求均成功,那麼該流應實現哪些內容?想必你想從他們那裏得到一些數據? – Mikesname
是的,我想獲得類HTTPResponse在流的末尾;當實現時,我想知道它是失敗的成功和失敗的原因。 – Rabzu