2016-03-21 62 views
0

我試圖使用akka-http爲了向單個主機發送http請求(例如「akka.io」)。問題在於創建的流(Http()。cachedHostConnectionPool)僅在發出N個http請求後纔開始發送響應,其中N等於max-connections。爲什麼akka http不會針對前N個請求發出響應?

import scala.util.Failure 
import scala.util.Success 
import com.typesafe.config.ConfigFactory 
import akka.actor.ActorSystem 
import akka.http.scaladsl.Http 
import akka.http.scaladsl.model.HttpRequest 
import akka.http.scaladsl.model.Uri.apply 
import akka.http.scaladsl.settings.ConnectionPoolSettings 
import akka.stream.ActorMaterializer 
import akka.stream.scaladsl.Sink 
import akka.stream.scaladsl.Source 

object ConnectionPoolExample extends App { 

    implicit val system = ActorSystem() 
    implicit val executor = system.dispatcher 
    implicit val materializer = ActorMaterializer() 

    val config = ConfigFactory.load() 

    val connectionPoolSettings = ConnectionPoolSettings(config).withMaxConnections(10) 
    lazy val poolClientFlow = Http().cachedHostConnectionPool[Unit]("akka.io", 80, connectionPoolSettings) 

    val fakeSource = Source.fromIterator[Unit] {() => Iterator.continually { Thread.sleep(1000);() } } 
    val requests = fakeSource.map { _ => println("Creating request"); HttpRequest(uri = "/") -> (()) } 

    val responses = requests.via(poolClientFlow) 

    responses.runForeach { 
    case (tryResponse, jsonData) => 
     tryResponse match { 
     case Success(httpResponse) => 
      httpResponse.entity.dataBytes.runWith(Sink.ignore) 
      println(s"status: ${httpResponse.status}") 
     case Failure(e) => { 
      println(e) 
     } 
     } 
    } 
} 

輸出看起來是這樣的:

Creating request 
Creating request 
Creating request 
Creating request 
Creating request 
Creating request 
Creating request 
Creating request 
Creating request 
Creating request 
status: 200 OK 
Creating request 
status: 200 OK 
Creating request 
status: 200 OK 
... 

我不能儘快找到任何配置參數,這將允許發射的反應,因爲他們已經準備好,而不是當池超出免費連接。

謝謝!

回答

0

原因是您通過調用Thread.sleep來阻止客戶端從事其他工作 - 這種方法在被動程序中是被禁止的。正確和簡單的方法是使用Source.tick

+0

謝謝羅蘭。這個特殊的例子通過使用Source.tick來解決。在這個fakeSource中使用Thread.sleep(1000)是不幸的。真正的來源是從卡夫卡讀取,它通過擴展GraphStage [SourceShape [A]] ... 'val stream = consumerMap.getOrElse(topicName,List())來實現。重寫DEF onPull():單位= {。 VAL jsonData = JsonParser(stream.head.message())的ConvertTo [A] 推(下,jsonData) } })' ... 是它也阻斷客戶端? – uladzimir

+0

是的,您需要在該源上添加'.async'以將其與其餘流相分離。我們也在研究適當的卡夫卡整合,參見反應卡夫卡。 –