2013-08-21 57 views
8

我瞭解如何在akka中創建基於消息的非阻塞應用程序,並且可以輕鬆模擬執行併發操作並將彙總結果傳遞迴消息的示例。在我遇到困難的時候,我的理解是什麼,當我的應用程序需要響應HTTP請求時,我的 非阻塞選項是什麼。我們的目標是收到一個請求,並立即將它交給本地或遠程的演員來完成這項工作,然後將這個請求交給 可能需要一些時間的結果。不幸的是,在這種模式下,我不明白我怎麼可以用一個非阻塞的 系列的「講述」來表達,而不是阻止「詢問」。如果在鏈中的任何一點我使用一個tell,我不再有將來作爲最終響應內容(在這種情況下是finagle所需的http框架接口 - 但這不是 重要)。我理解這個請求是在它自己的線程中,我的例子是非常人爲的,但只是試圖 瞭解我的設計選項。當需要HTTP響應時,Akka非阻塞選項

綜上所述,如果我下面的設計實例可以重新設計,以減少阻塞,我非常喜歡瞭解如何實現。這是我的 第一次使用阿卡,因爲一年前+一些光線探索,並在我看過的每篇文章,文件和談話說 不要阻止服務。

概念性答案可能會有所幫助,但也可能與我已閱讀的內容相同。工作/編輯我的示例 可能是我理解我試圖解決的確切問題的關鍵。如果當前的例子通常是 需要做的事情確認也有幫助,所以我不搜索不存在的魔法。

注意以下別名:進口com.twitter.util {未來=> TwitterFuture,等待=> TwitterAwait}

object Server { 

    val system = ActorSystem("Example-System") 

    implicit val timeout = Timeout(1 seconds) 

    implicit def scalaFuture2twitterFuture[T](scFuture: Future[T]): TwitterFuture[T] = { 
    val promise = TwitterPromise[T] 
    scFuture onComplete { 
     case Success(result) ⇒ promise.setValue(result) 
     case Failure(failure) ⇒ promise.setException(failure) 
    } 
    promise 
    } 

    val service = new Service[HttpRequest, HttpResponse] { 
    def apply(req: HttpRequest): TwitterFuture[HttpResponse] = req.getUri match { 
     case "https://stackoverflow.com/a/b/c" => 
     val w1 = system.actorOf(Props(new Worker1)) 

     val r = w1 ? "take work" 

     val response: Future[HttpResponse] = r.mapTo[String].map { c => 
      val resp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK) 
      resp.setContent(ChannelBuffers.copiedBuffer(c, CharsetUtil.UTF_8)) 
      resp 
     } 

     response 
    } 
    } 

//val server = Http.serve(":8080", service); TwitterAwait.ready(server) 

    class Worker1 extends Actor with ActorLogging { 
    def receive = { 
     case "take work" => 
     val w2 = context.actorOf(Props(new Worker2)) 
     pipe (w2 ? "do work") to sender 
    } 
    } 

    class Worker2 extends Actor with ActorLogging { 
    def receive = { 
     case "do work" => 
     //Long operation... 
     sender ! "The Work" 
    } 
    } 

    def main(args: Array[String]) { 
    val r = service.apply(
     com.twitter.finagle.http.Request("https://stackoverflow.com/a/b/c") 
    ) 
    println(TwitterAwait.result(r).getContent.toString(CharsetUtil.UTF_8)) // prints The Work 
    } 
} 

預先感謝提供的任何指導!

回答

5

你能避免使用pipe pattern - 即發送一個未來的消息,在Worker1你會寫:

pipe(w2 ? "do work") to sender 

相反的:

sender ! (w2 ? "do work") 

現在r將是一個Future[String]而不是一個Future[Future[String]]


更新:上面的pipe的解決辦法是避免你的演員有應對未來的通用方法。正如尤在下面評論指出,在這種情況下,你可以把你的Worker1圈外完全告訴Worker2直接向它(Worker1)得到的消息從演員迴應:

w2.tell("do work", sender) 

這億韓元如果Worker1負責以某種方式處理來自Worker2的響應(通過在w2 ? "do work"上使用map,將多個期貨與flatMapfor-綜合等相結合),但如果沒有必要,則此版本爲更清潔,更高效。


殺死一個Await.result。您可以通過編寫類似下面擺脫對方的:

val response: Future[HttpResponse] = r.mapTo[String].map { c => 
    val resp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK) 
    resp.setContent(ChannelBuffers.copiedBuffer(c, CharsetUtil.UTF_8)) 
    resp 
} 

現在你只需要打開這個FutureTwitterFuture。我不能告訴你我的頭頂上如何做到這一點,但它應該是fairly trivial,並且絕對不需要阻止。

+0

非常感謝您的快速反應特拉維斯。這確實清理了期貨並等待了一下。所以你認爲這兩個問題的使用實際上是必需的(顯然,我只能這樣看 - 但要確定)?我將更新我的代碼以包含您的改進,幷包含從未來的未來到未來的隱含轉換。不太熟悉堆棧溢出禮儀,所以我給+1改進。有關問題的任何其他信息將會有所幫助。謝謝! – Eric

+0

很難說是否需要詢問而不知道這些演員要負責什麼,但重要的部分是詢問不需要阻塞(有一些額外的簿記涉及,但它仍然是異步的)。我還建議保持Twitter和標準庫期貨之間明顯的轉換 - 不得不調用轉換方法通常是一個小的代價,以避免在這種情況下可能出現混淆。 –

+0

非常感謝您對本Travis的幫助和見解。這完美地解決了我的顧慮。 – Eric

0

你絕對不必在這裏阻止。首先,更新你的進口對Twitter的東西:

import com.twitter.util.{Future => TwitterFuture, Await => TwitterAwait, Promise => TwitterPromise} 

您將需要嘰嘰喳喳Promise因爲這是的Future你會從apply方法返回IMPL。然後,按照特拉維斯布朗在他的回答中所說的話,讓你的演員以這樣的方式迴應,以至於你沒有嵌套的期貨。一旦你這樣做,你應該能夠改變你的apply方法是這樣的:

def apply(req: HttpRequest): TwitterFuture[HttpResponse] = req.getUri match { 
    case "https://stackoverflow.com/a/b/c" => 
    val w1 = system.actorOf(Props(new Worker1)) 

    val r = (w1 ? "take work").mapTo[String] 
    val prom = new TwitterPromise[HttpResponse] 
    r.map(toResponse) onComplete{ 
     case Success(resp) => prom.setValue(resp) 
     case Failure(ex) => prom.setException(ex)    
    } 

    prom 
} 

def toResponse(c:String):HttpResponse = { 
    val resp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK) 
    resp.setContent(ChannelBuffers.copiedBuffer(c, CharsetUtil.UTF_8)) 
    resp 
} 

這可能需要一些更多的工作。我沒有在我的IDE中設置它,所以我不能保證它能夠編譯,但我相信這個想法很健全。您從apply方法返回的內容是尚未完成的TwitterFuture。當演員詢問(?)的未來完成並且通過非阻塞onComplete回調進行時,它將完成。

+0

我已經用隱式和更改更新了我的答案,刷新了並且看到了你的答案,它在功能上基本上是相同的。感謝您抽出寶貴的時間。我通過你的答案都假設問題確實沒問題。這大部分是通過你的方法或Travis提出的方法來殺死等待的練習,我認爲這些方法是相當正確的?再次感謝。 – Eric

+0

使用這樣的響應構建邏輯來滾動未來的轉換對我來說似乎並不理想 - 尤其是如果轉換在其他地方也是必需的(這很可能)。是否有理由建議這種方法通過映射未來然後轉換? –

+0

@TravisBrown,只是爲了簡單的例子,我將在一秒內更新顯示'map'首先在執行'onComplete'之前進行轉換。這就是你說的對嗎? – cmbaxter