2017-02-27 77 views
3

的夫婦後,我有一對夫婦的運行小時後掛起的HTTP連接池:阿卡HTTP連接池掛起時間

private def createHttpPool(host: String): SourceQueue[(HttpRequest, Promise[HttpResponse])] = { 
    val pool = Http().cachedHostConnectionPoolHttps[Promise[HttpResponse]](host) 
    Source.queue[(HttpRequest, Promise[HttpResponse])](config.poolBuffer, OverflowStrategy.dropNew) 
     .via(pool).toMat(Sink.foreach { 
     case ((Success(res), p)) => p.success(res) 
     case ((Failure(e), p)) => p.failure(e) 
     })(Keep.left).run 
    } 

我排隊的項目:

private def enqueue(uri: Uri): Future[HttpResponse] = { 
    val promise = Promise[HttpResponse] 
    val request = HttpRequest(uri = uri) -> promise 

    queue.offer(request).flatMap { 
     case Enqueued => promise.future 
     case _ => Future.failed(ConnectionPoolDroppedRequest) 
    } 
} 

和解決這樣的響應:

private def request(uri: Uri): Future[HttpResponse] = { 
    def retry = { 
     Thread.sleep(config.dispatcherRetryInterval) 
     logger.info(s"retrying") 
     request(uri) 
    } 

    logger.info("req-start") 
    for { 
     response <- enqueue(uri) 

     _ = logger.info("req-end") 

     finalResponse <- response.status match { 
     case TooManyRequests => retry 
     case OK => Future.successful(response) 
     case _ => response.entity.toStrict(10.seconds).map(s => throw Error(s.toString, uri.toString)) 
     } 
    } yield finalResponse 
} 

如果未來成功,該函數的結果將始終轉換:

def get(uri: Uri): Future[Try[JValue]] = { 
    for { 
    response <- request(uri) 
    json <- Unmarshal(response.entity).to[Try[JValue]] 
    } yield json 
} 

一切都正常工作了一段時間,然後我在日誌中看到的所有內容都是req-start和no req-end。

我阿卡的配置是這樣的:

akka { 
    actor.deployment.default { 
    dispatcher = "my-dispatcher" 
    } 
} 

my-dispatcher { 
    type = Dispatcher 
    executor = "fork-join-executor" 

    fork-join-executor { 
    parallelism-min = 256 
    parallelism-factor = 128.0 
    parallelism-max = 1024 
    } 
} 

akka.http { 
    host-connection-pool { 
    max-connections = 512 
    max-retries = 5 
    max-open-requests = 16384 
    pipelining-limit = 1 
    } 
} 

我不知道這是否是一個配置問題或一個代碼問題。我有我的並行和連接數量如此之高,因爲沒有它,我得到很低的請求率(我想盡可能請求 - 我有其他速率限制代碼來保護服務器)。

回答

3

您沒有使用從服務器返回的響應實體。引用以下文檔:

消費(或放棄)請求的實體是強制性的!如果 意外地沒有被使用或丟棄Akka HTTP將假定 傳入的數據應該保持反壓,並且將通過TCP反壓機制阻塞傳入數據。無論HttpResponse的狀態如何,客戶端應該使用實體( )。

該實體的形式爲Source[ByteString, _]需要運行以避免資源匱乏。

如果您不需要讀取實體,以消耗實體最簡單的方法字節丟棄它們,通過使用

res.discardEntityBytes() 

(您可以通過添加附加回調 - 例如 - .future().map(...)) 。

This page in the docs描述了所有這些替代方法,包括如何在需要時讀取字節。

---編輯

經過代碼/信息被提供的,很顯然,資源消耗是沒有問題的。在這個實現中還有另一個大紅旗,即重試方法中的Thread.sleep。 這是一個阻止調用,很可能會讓您的底層actor系統的線程基礎架構捱餓。

docs中提供了爲什麼這是危險的完整解釋。

嘗試更改並使用akka.pattern.afterdocs)。示例如下:

def retry = akka.pattern.after(200 millis, using = system.scheduler)(request(uri)) 
+0

我實際上在獲得響應後會消耗實體。我會用更多信息更新帖子。 – asuna

+0

只是將我的代碼更改爲使用akka.pattern.after,如果問題再次出現,將發佈更新。 雖然我確實介紹了較早的Thread.sleep代碼,並且分析器顯示沒有任何線程在停止工作時正在休眠。每當我得到429時,jvisualvm顯示其中一個線程正在休眠約500ms,然後該線程再次開始運行,所以我有點懷疑使用調度程序將修復它。不過,使用Thread.sleep非常糟糕 - 感謝給我一個很好的解決方案來解決這個問題。 – asuna

+0

我遇到了同樣的問題。這是線程轉儲:https://gist.github.com/pradyuman/bf83a8f3a293d8c679fcb6dc5f566a80 – asuna