2012-02-07 45 views
9

我正在使用Netty庫(版本4來自GitHub)。它在Scala中很好用,但我希望我的庫能夠使用continuation傳遞樣式來進行異步等待。在netty/NIO監聽器中使用scala延續

傳統與Netty的,你會做這樣的事情(一個例子異步連接操作):

//client is a ClientBootstrap 
val future:ChannelFuture = client.connect(remoteAddr); 
future.addListener(new ChannelFutureListener { 
    def operationComplete (f:ChannelFuture) = { 
     //here goes the code that happens when the connection is made 
    } 
}) 

如果要實現一個庫(我是),那麼你基本上有三個簡單的選項,允許用戶該庫做的東西連接後:

  1. 剛剛從你的連接方法返回的ChannelFuture,讓用戶處理它 - 這並不網狀提供太多的抽象。
  2. 將ChannelFutureListener作爲connect方法的參數,並將其作爲偵聽器添加到ChannelFuture。
  3. 以一個回調函數對象作爲你的連接方法的參數,並調用從您創建的ChannelFutureListener內(這將使用於有點像node.js的回調驅動的風格)

我是什麼試圖做的是第四種選擇;我沒有把它列入上面的計數中,因爲它並不簡單。

我想用Scala的分隔延續,使利用圖書館是有點像堵庫,但將非阻塞幕後:

class MyLibraryClient { 
    def connect(remoteAddr:SocketAddress) = { 
     shift { retrn: (Unit => Unit) => { 
       val future:ChannelFuture = client.connect(remoteAddr); 
       future.addListener(new ChannelFutureListener { 
        def operationComplete(f:ChannelFuture) = { 
         retrn(); 
        } 
       }); 
      } 
     } 
    } 
} 

想象中所實現的其他讀/寫操作同樣的時尚。這之中的目標用戶的代碼可以看起來更象這樣:

reset { 
    val conn = new MyLibraryClient(); 
    conn.connect(new InetSocketAddress("127.0.0.1", 1337)); 
    println("This will happen after the connection is finished"); 
} 

換句話說,該程序會看起來像一個簡單的阻塞式的計劃,但在幕後也不會有任何阻攔或線程。

我遇到的麻煩是我不完全理解分隔連續的輸入是如何工作的。當我嘗試以上述方式實現它時,編譯器會抱怨我的operationComplete實現實際上返回Unit @scala.util.continuations.cpsParam[Unit,Unit => Unit]而不是Unit。我知道scala的CPS中有一個「陷阱」,那就是你必須用@suspendable註釋一個shift方法的返回類型,它會被調用堆棧傳遞到reset,但似乎沒有任何方法可以協調即使用沒有分隔延續概念的預先存在的Java庫。

我覺得實際上一定有辦法解決這個問題 - 如果Swarm可以序列化連續性,並將它們在網絡上阻塞以便在其他地方計算,那麼就可以簡單地從一個預先存在的Java類中調用延續。但我無法弄清楚它是如何完成的。爲了實現這一點,我需要在Scala中重寫整個netty部分嗎?

+0

我不知道HOWTO修復斯卡拉的東西,但我建議對你的想法。讓我來告訴你爲什麼。但是讓用戶「不知道」你的libary的異步特性,你會告訴他這就是它在監聽器代碼中的「阻塞」調用。事實上,他不知道他甚至把他的代碼寫在一個聽衆身上。在監聽器中進行阻塞調用可能會導致所有類型的問題。大多數時候你會看到的問題是,它會「減緩」其他io任務,並因此限制吞吐量。 – 2012-02-08 06:27:26

+1

你有一個好點,但我不同意。我認爲我的圖書館的用戶,如果除了我之外還有其他用戶,我們可能不得不理解什麼是「重置」開始,從而理解這些調用是非阻塞的。 這實際上只是一種方法:A)更深入地理解分隔延續,B)嘗試以更清晰的方式編寫基本回調驅動的代碼。 – Jeremy 2012-02-08 16:22:42

回答

4

我發現這個解釋Scala's continuations當我開始時非常有幫助。特別要注意他解釋的部分shift[A, B, C]reset[B, C]。添加虛擬null作爲operationComplete的最後聲明應該有所幫助。

順便說一句,如果它可能嵌套在shift之內,則需要在另一個reset內調用retrn()

編輯:這是一個工作示例

import scala.util.continuations._ 
import java.util.concurrent.Executors 

object Test { 

    val execService = Executors.newFixedThreadPool(2) 

    def main(args: Array[String]): Unit = { 
    reset { 
     val conn = new MyLibraryClient(); 
     conn.connect("127.0.0.1"); 
     println("This will happen after the connection is finished"); 
    } 
    println("Outside reset"); 
    } 
} 

class ChannelFuture { 
    def addListener(listener: ChannelFutureListener): Unit = { 
    val future = this 
    Test.execService.submit(new Runnable { 
     def run(): Unit = { 
     listener.operationComplete(future) 
     } 
    }) 
    } 
} 

trait ChannelFutureListener { 
    def operationComplete(f: ChannelFuture): Unit 
} 

class MyLibraryClient { 
    def connect(remoteAddr: String): [email protected][Unit] = { 
    shift { 
     retrn: (Unit => Unit) => { 
     val future: ChannelFuture = new ChannelFuture() 
     future.addListener(new ChannelFutureListener { 
      def operationComplete(f: ChannelFuture): Unit = { 
      println("operationComplete starts") 
      retrn(); 
      null 
      } 
     }); 
     } 
    } 
    } 
} 

有可能輸出:

Outside reset 
operationComplete starts 
This will happen after the connection is finished 
+0

這實際上確實使編譯器感到高興,甚至似乎正常工作。 我猜想的關鍵是你將匿名'ChannelFutureListener'外的'shift'移到了'operationComplete'中,並用閉包調用continuation。我不確定我明白爲什麼會這樣,而另一種方式不行,但我會接受。謝謝! – Jeremy 2012-02-08 16:27:57

+0

這是關於scala延續的一個很好的閱讀。他們應該從scala-lang.org頁面中刪除有關延續的毫無價值的例子,並將它們替換爲您鏈接的文章。 – Jeremy 2012-02-08 16:31:04

+0

@Jeremy是啊,那篇文章是非常好的:) – shams 2012-02-08 17:48:48