2015-05-27 28 views
0

我已設計有阿卡-IO ACKING工作的演員, 使得上游發送消息(到 網絡)時,它會等待ACK。該角色是 後端的異步應用程序的接口。集成與阿卡流基於ACK的演員

我想有一層包裝,讓我這個 演員轉換成阿卡流Flow[Incoming, Outgoing, ???],使 可以與期望這樣的 簽名更新的圖書館集成。

(從上游收到的消息是罕見的,所以我們不關心太多 太多關於backpressuring有,但它不會是一件壞事 擁有它。)

sealed trait Incoming //... with implementations 
sealed trait Outgoing //... with implementations 
object Ack 

// `upstream` is an akka-io connection actor that will send Ack 
// when it writes an Outgoing message to the socket 
class SimpleActor(upstream: Actor) extends Actor { 
    def receive = { 
    case in: Incoming if sender() == upstream => 
     // does some work in response to upstream 
    case other => 
     // does some work in response to downstream 
     // including sending messages to upstream and 
     // `becoming` a stashing state waiting for Ack 
     // to `unbecome`, then sending Ack downstream 
     // (which will respect the backpressure). 
    } 
} 

我有它來自akka用戶郵件列表的良好權限 在沒有代碼的AKK流中集成了演員和 流,並且爲了將演員插入到流並保留 基於Ack的背壓,必須實現 PushPullStage

看來,我們會真正需要兩個PushPullStage這兒......一個 爲upstream => SimpleActor,一個用於SimpleActor => upstream

我的問題是:

  1. 是否有提供集成的庫,例如該演員和流之間?
  2. 有沒有比從零開始實施雙向PushPullStage更簡單的方法?
  3. 是否有任何現有的測試框架允許這樣的實現進行壓力測試?
+0

您是否嘗試過使用actor ask和'mapAsync'?如果不是,那麼編寫'PushPullStage'仍然比編寫ActorProcessor更容易。 – jrudolph

+0

@jrudolph,但我沒有一個查詢的單一回應(如果是這樣的話,我會用REST而不是WebSockets)。發佈者可以隨時發送消息。 – fommil

+0

我想用「在任何時候」你是指反壓會允許它嗎?這是如何工作的?如果您爲每個輸入元素生成一個輸出流,則可以對其進行建模,然後將流平坦化。 – jrudolph

回答

1

是的,你可以整合演員與流。
有這方面的特殊角色:演員發佈者和演員訂閱者。

這一切都在這裏:http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-RC3/scala/stream-integrations.html

當然,你必須寫演員以這樣一種方式,它的工作原理與流背壓。但是你不需要推拉階段。

+0

這聽起來比引入新的階段和實現其方法容易得多。我希望避免編寫一箇中間角色,但它看起來像是唯一的出路 - 實現發佈者和訂閱者。這將是一個很大的樣板: - / – fommil

5

我認爲akka-stream的理念是提供低級別磚塊,並在其上構建更高級別的工具。 如果你看看我們最近發佈的開源庫https://github.com/MfgLabs/akka-stream-extensions,你會發現我們已經完成了。我們提供了一些有用的結構,使它更容易管理限速器,有狀態的處理器,懶惰&發電機等... 對於演員集成,我認爲應該有可能創建一些幫助器,以便更容易地將演員與akka試圖傳播背壓的流。 Akka-Stream仍然年輕,生態系統不斷髮展;)