2017-04-12 23 views
0

比方說,我們有方法返回Future[T]或java CompletableFuture[T]或自定義AsyncCompletionHandler[T]org.asynchttpclient。我想節流全部調用這種方法。如何實現對支持異步回調的方法的調用的節流

你會怎麼做?目前我使用MergeHub.source爲基礎的Sink通過它漏斗的所有請求。問題我有

  1. 有沒有更好的辦法?
  2. 在我的日誌輸出中,我看到花費在所有請求上的時間少於我的預期。爲什麼?

下面是代碼

import java.time.ZonedDateTime 

import akka.actor.ActorSystem 
import akka.stream.scaladsl.{MergeHub, Sink, Source} 
import akka.stream.{ActorMaterializer, ActorMaterializerSettings, ThrottleMode} 
import org.asynchttpclient.{DefaultAsyncHttpClient, _} 

import scala.concurrent.duration._ 
import scala.concurrent.{Await, Future, Promise} 
import scala.language.postfixOps 
import scala.util.{Failure, Success, Try} 

object Main { 

    private implicit val system = ActorSystem("root") 
    private implicit val executor = system.dispatcher 
    private implicit val mat = ActorMaterializer(ActorMaterializerSettings(system)) 

    type PendingRequest =() => Future[Try[Response]] 

    private val throttlingSink = 
    MergeHub.source[PendingRequest] 
     .throttle(1, FiniteDuration(2000, MILLISECONDS), 1, ThrottleMode.Shaping) 
     .mapAsync(4)(_.apply()) 
     .to(Sink.ignore) 
     .run() 

    def wrap(p: Promise[Try[Response]]): AsyncCompletionHandler[Response] = new AsyncCompletionHandler[Response] { 
    override def onThrowable(t: Throwable): Unit = 
     p.success(Failure(t)) 

    override def onCompleted(response: Response): Response = { 
     p.success(Success(response)) 
     response 
    } 
    } 

    def makeRequest(url: String): Future[Response] = { 

    val p = Promise[Try[Response]] 

    Source.single[PendingRequest](() => { 
     asyncHttpClient 
     .prepareGet(url) 
     .execute(wrap(p)) 

     p.future 
    }) 
     .runWith(throttlingSink) 

    p.future.flatMap { 
     case Success(r) => Future.successful(r) 
     case Failure(ex) => Future.failed(ex) 
    } 
    } 

    val asyncHttpClient = new DefaultAsyncHttpClient() 

    def main(args: Array[String]): Unit = { 

    val start = ZonedDateTime.now() 
    println("Start!") 
    Source(1 to 20) 
     .mapAsync(4) { index => 
     println(s"${ZonedDateTime.now().toEpochSecond - start.toEpochSecond} s - Requesting $index") 
     makeRequest(s"https://httpbin.org/get?param=$index").map { r => 
      println(s"${ZonedDateTime.now().toEpochSecond - start.toEpochSecond} s - Got $index - Code ${r.getStatusCode}") 
     } 
     } 
     .runWith(Sink.ignore) 
     .onComplete { 
     case Success(_) => 
      println(s"${ZonedDateTime.now().toEpochSecond - start.toEpochSecond} Done!") 
      asyncHttpClient.close() 
      system.terminate() 
     case Failure(ex) => 
      ex.printStackTrace() 
      asyncHttpClient.close() 
      system.terminate() 
     } 

    Await.result(system.whenTerminated, Duration.Inf) 
    } 
} 

換句話說也有多處類似的主要內容。所有這些都應該作爲調用的總和來節制。

回答

1

作爲一般性評論,您可能可以不使用MergeHub步驟並簡化您的管道。見下面的例子

object Main { 

    private implicit val system = ActorSystem("root") 
    private implicit val executor = system.dispatcher 
    private implicit val mat = ActorMaterializer(ActorMaterializerSettings(system)) 

    def makeRequest(url: String): Future[Response] = { 
    val promise = Promise[Response]() 
    asyncHttpClient.prepareGet(url).execute(new AsyncCompletionHandler[Response] { 
     def onCompleted(response: Response) = { 
     promise.success(response) 
     response 
     } 
     override def onThrowable(t: Throwable) { 
     promise.failure(t) 
     super.onThrowable(t) 
     } 
    }) 
    promise.future 
    } 

    val asyncHttpClient = new DefaultAsyncHttpClient() 

    def main(args: Array[String]): Unit = { 

    val start = ZonedDateTime.now() 
    println("Start!") 
    Source(1 to 20) 
     .throttle(1, FiniteDuration(2000, MILLISECONDS), 1, ThrottleMode.Shaping) 
     .mapAsync(4) { index => 
     println(s"${ZonedDateTime.now().toEpochSecond - start.toEpochSecond} s - Requesting $index") 
     makeRequest(s"http://httpbin.org/get?param=$index").map { r => 
      println(s"${ZonedDateTime.now().toEpochSecond - start.toEpochSecond} s - Got $index - Code ${r.getStatusCode}") 
     } 
     } 
     .runWith(Sink.ignore) 
     .onComplete { 
     case Success(_) => 
      println(s"${ZonedDateTime.now().toEpochSecond - start.toEpochSecond} Done!") 
      asyncHttpClient.close() 
      system.terminate() 
     case Failure(ex) => 
      ex.printStackTrace() 
      asyncHttpClient.close() 
      system.terminate() 
     } 

    Await.result(system.whenTerminated, Duration.Inf) 
    } 
} 

然而,在兩種實施方式中我看到請求正確節流 - 一個每2秒,從第二〜0大致開始第二〜38。

您能詳細說明您在這裏的期望嗎?

+0

謝謝你,斯特凡諾! 'MergeHub'的目標是限制所有對REST API的調用。換句話說,像'main'的內容有多個地方。所有這些都應該作爲調用的總和來節制。 – expert

+0

好的,但即使使用'MergeHub',我可以看到節流大致起作用,每2秒打一次電話。你期望看到什麼? –

+0

它或多或少的作用,但我想知道是否有可能更精確地解決節流。正如你在給定的例子中正確地注意到的那樣,它花費了38秒,而不是我預期的40秒。 – expert