我已設計有阿卡-IO ACKING工作的演員, 使得上游發送消息(到 網絡)時,它會等待ACK。該角色是 後端的異步應用程序的接口。集成與阿卡流基於ACK的演員
我想有一層包裝,讓我這個 演員轉換成阿卡流Flow[Incoming, Outgoing, ???]
,使 可以與期望這樣的 簽名更新的圖書館集成。
(從上游收到的消息是罕見的,所以我們不關心太多 太多關於backpressuring有,但它不會是一件壞事 擁有它。)
sealed trait Incoming //... with implementations
sealed trait Outgoing //... with implementations
object Ack
// `upstream` is an akka-io connection actor that will send Ack
// when it writes an Outgoing message to the socket
class SimpleActor(upstream: Actor) extends Actor {
def receive = {
case in: Incoming if sender() == upstream =>
// does some work in response to upstream
case other =>
// does some work in response to downstream
// including sending messages to upstream and
// `becoming` a stashing state waiting for Ack
// to `unbecome`, then sending Ack downstream
// (which will respect the backpressure).
}
}
我有它來自akka用戶郵件列表的良好權限 在沒有代碼的AKK流中集成了演員和 流,並且爲了將演員插入到流並保留 基於Ack的背壓,必須實現 PushPullStage。
看來,我們會真正需要兩個PushPullStage
這兒......一個 爲upstream => SimpleActor
,一個用於SimpleActor => upstream
。
我的問題是:
- 是否有提供集成的庫,例如該演員和流之間?
- 有沒有比從零開始實施雙向
PushPullStage
更簡單的方法? - 是否有任何現有的測試框架允許這樣的實現進行壓力測試?
您是否嘗試過使用actor ask和'mapAsync'?如果不是,那麼編寫'PushPullStage'仍然比編寫ActorProcessor更容易。 – jrudolph
@jrudolph,但我沒有一個查詢的單一回應(如果是這樣的話,我會用REST而不是WebSockets)。發佈者可以隨時發送消息。 – fommil
我想用「在任何時候」你是指反壓會允許它嗎?這是如何工作的?如果您爲每個輸入元素生成一個輸出流,則可以對其進行建模,然後將流平坦化。 – jrudolph