2016-03-03 25 views
3

我正在嘗試使用cachedHostConnectionPoolHttps發出出站https請求,但似乎在Non 2XX響應的一段時間後,池流會停止發出元素的導致整個流量停止。這種特殊行爲的發生時間非常隨機,但發生並可重現。Akka-http:Http.cachedHostConnectionPoolHttps在某些時候停止處理以用於非2XX響應

下面是一個簡單的測試中,它的一些時間Expected OnNext(_), yet no element signaled during 10 seconds後停止發射結果被拋出

import java.util.UUID 

import akka.actor.ActorSystem 
import akka.http.scaladsl.Http 
import akka.http.scaladsl.model._ 
import akka.http.scaladsl.model.headers.RawHeader 
import akka.stream._ 
import akka.stream.scaladsl.Keep 
import akka.stream.testkit.scaladsl.{TestSink, TestSource} 
import org.scalatest._ 

import scala.util.Random 

class HttpPoolTest extends FlatSpec with Matchers with OptionValues with Inside with Inspectors { 

    "HttpPoolTest" should "run without exception" in { 
    implicit val httpSystem = ActorSystem("http-out") 
    implicit val httpMat = ActorMaterializer() 
    implicit val ec = httpSystem.dispatcher 

    val helloFlow = Http().cachedHostConnectionPoolHttps[Int]("android.googleapis.com",443)(httpMat) 

    val requestHeaders = scala.collection.immutable.Seq[HttpHeader](RawHeader("Authorization", "key=random"), RawHeader("Content-Type", "application/json;charset=utf-8")) 
    val httpRequest = new HttpRequest(HttpMethods.POST, "/gcm/send", requestHeaders, 
     """ 
     |{ 
     | "registration_ids": ["rnadomdata"], 
     | "delay_while_idle": true, 
     | "data": { 
     |  "message": "Hello World", 
     |  "title": "Interstellar", 
     |  "id": "1231222i3211232228w2t1xx6wxq", 
     |  "notificationType": "Text" 
     | } 
     |} 
     """. 
     stripMargin) 

    val (upstream , downstream) = TestSource.probe[(HttpRequest, Int)].via(helloFlow).toMat(TestSink.probe)(Keep.both).run() 


    while (true) { 
     val randWait = math.max(5000, Random.nextInt(20000)) //atleast 5 seconds to process the http request 
     val randRequest = math.max(1, Random.nextInt(5)) 
     downstream.request(randRequest) 

     (1 to randRequest).foreach(i => upstream.sendNext(httpRequest -> UUID.randomUUID().hashCode())) 
     println (s"Waiting for $randWait millis to process") 
     Thread.sleep(randWait) 
     (1 to randRequest) foreach (i => noException should be thrownBy downstream.expectNext()) 

    } 
    } 

} 

任何指針到底是怎麼回事錯在這裏?它可以工作一段時間,所以我懷疑處理非2xx錯誤時出現了問題。

我試過關掉akka.http.host-connection-pool.max-retries = 0akka.http.host-connection-pool.idle-timeout = infinite,但沒有結果。

回答

0

您需要消耗實體數據,以確保httpFlow繼續運行。如果你對解釋httpResponse的主體不感興趣,只需閱讀它即可下沉。

downstream.expectNext().entity.dataBytes.to(Sink.ignore)

相關問題