我正在使用Netty庫(版本4來自GitHub)。它在Scala中很好用,但我希望我的庫能夠使用continuation傳遞樣式來進行異步等待。在netty/NIO監聽器中使用scala延續
傳統與Netty的,你會做這樣的事情(一個例子異步連接操作):
//client is a ClientBootstrap
val future:ChannelFuture = client.connect(remoteAddr);
future.addListener(new ChannelFutureListener {
def operationComplete (f:ChannelFuture) = {
//here goes the code that happens when the connection is made
}
})
如果要實現一個庫(我是),那麼你基本上有三個簡單的選項,允許用戶該庫做的東西連接後:
- 剛剛從你的連接方法返回的ChannelFuture,讓用戶處理它 - 這並不網狀提供太多的抽象。
- 將ChannelFutureListener作爲connect方法的參數,並將其作爲偵聽器添加到ChannelFuture。
- 以一個回調函數對象作爲你的連接方法的參數,並調用從您創建的ChannelFutureListener內(這將使用於有點像node.js的回調驅動的風格)
我是什麼試圖做的是第四種選擇;我沒有把它列入上面的計數中,因爲它並不簡單。
我想用Scala的分隔延續,使利用圖書館是有點像堵庫,但將非阻塞幕後:
class MyLibraryClient {
def connect(remoteAddr:SocketAddress) = {
shift { retrn: (Unit => Unit) => {
val future:ChannelFuture = client.connect(remoteAddr);
future.addListener(new ChannelFutureListener {
def operationComplete(f:ChannelFuture) = {
retrn();
}
});
}
}
}
}
想象中所實現的其他讀/寫操作同樣的時尚。這之中的目標用戶的代碼可以看起來更象這樣:
reset {
val conn = new MyLibraryClient();
conn.connect(new InetSocketAddress("127.0.0.1", 1337));
println("This will happen after the connection is finished");
}
換句話說,該程序會看起來像一個簡單的阻塞式的計劃,但在幕後也不會有任何阻攔或線程。
我遇到的麻煩是我不完全理解分隔連續的輸入是如何工作的。當我嘗試以上述方式實現它時,編譯器會抱怨我的operationComplete
實現實際上返回Unit @scala.util.continuations.cpsParam[Unit,Unit => Unit]
而不是Unit
。我知道scala的CPS中有一個「陷阱」,那就是你必須用@suspendable
註釋一個shift
方法的返回類型,它會被調用堆棧傳遞到reset
,但似乎沒有任何方法可以協調即使用沒有分隔延續概念的預先存在的Java庫。
我覺得實際上一定有辦法解決這個問題 - 如果Swarm可以序列化連續性,並將它們在網絡上阻塞以便在其他地方計算,那麼就可以簡單地從一個預先存在的Java類中調用延續。但我無法弄清楚它是如何完成的。爲了實現這一點,我需要在Scala中重寫整個netty部分嗎?
我不知道HOWTO修復斯卡拉的東西,但我建議對你的想法。讓我來告訴你爲什麼。但是讓用戶「不知道」你的libary的異步特性,你會告訴他這就是它在監聽器代碼中的「阻塞」調用。事實上,他不知道他甚至把他的代碼寫在一個聽衆身上。在監聽器中進行阻塞調用可能會導致所有類型的問題。大多數時候你會看到的問題是,它會「減緩」其他io任務,並因此限制吞吐量。 – 2012-02-08 06:27:26
你有一個好點,但我不同意。我認爲我的圖書館的用戶,如果除了我之外還有其他用戶,我們可能不得不理解什麼是「重置」開始,從而理解這些調用是非阻塞的。 這實際上只是一種方法:A)更深入地理解分隔延續,B)嘗試以更清晰的方式編寫基本回調驅動的代碼。 – Jeremy 2012-02-08 16:22:42