2011-09-03 60 views
12

假設我有一個睡眠功能:轉換斯卡拉@suspendable方法爲未來

def sleep(delay:Int) : Unit @suspendable = { 
    .... 
} 

是有可能有一個功能,未來的創建可同步等待的睡眠功能的異步版本。

def future(targetFunc: (Int => Unit @suspendable)) : (Int => Future) = { 
    .... 
} 

class Future { 
    def await : Unit @suspendable = { 
    .... 
    } 
} 

,你應該能夠做這樣的事情:

reset { 
    val sleepAsync = future(sleep) 
    val future1 = sleepAsync(2000) 
    val future2 = sleepAsync(3000) 
    future1.await 
    future2.await 
    /* finishes after a delay of 3000 */ 
} 

兩個調用sleepAsync應該出現馬上返回,並且兩次調用未來#伺機應該出現阻塞。當然,它們都會在復位結束時完全消失,並且之後的代碼負責在延遲之後調用延續。

否則是否存在一種替代方法來並行運行兩個@suspendable函數並等待它們完成?

我有什麼,我想要做的骨架的編譯要點:https://gist.github.com/1191381

+0

我寫道:https://gist.github.com/1191571這似乎工作,但它似乎很複雜。我覺得我可能會錯過一個簡單的做法。 – benmmurphy

+0

也發現這個:http://days2011.scala-lang.org/node/138/288這似乎做得更好。 – benmmurphy

+0

你對「獲勝」答案有偏好嗎?我需要頒發賞金獎勵。 –

回答

1

我不知道,我完全理解這個問題,但這裏的一個嘗試:

import scala.util.continuations._ 

class Future(thread: Thread) { 
    def await = thread.join 
} 

object Future { 

    def sleep(delay: Long) = Thread.sleep(delay) 

    def future[A,B](f: A => B) = (a: A) => shift { k: (Future => Unit) => 
    val thread = new Thread { override def run() { f(a) } } 
    thread.start() 

    k(new Future(thread)) 
    } 

    def main(args:Array[String]) = reset { 
    val sleepAsync = future(sleep) 
    val future1 = sleepAsync(2000) // returns right away 
    val future2 = sleepAsync(3000) // returns right away 
    future1.await // returns after two seconds 
    future2.await // returns after an additional one second 
    // finished after a total delay of three seconds 
    } 
} 

這裏, Future實例只不過是Thread上的句柄,因此您可以使用其join方法進行阻止,直至完成。

future函數採用A => B類型的函數,並返回一個函數,當與一個A供給將揭開一個線程來運行「futured」功能,並且在Future,其被注射回包起來進入延續,從而將其分配給val future1

這是否接近您要去的地方?

+0

我想使用continuations而不是線程 – benmmurphy

+0

不管怎麼樣,您都可以運行延續,但不知何故它們需要從當前線程運行(否則總運行時間將爲5000 ms而不是3000 ms)。在實踐中,你可能會使用線程池而不是自己創建'Thread'實例。你想如何運行'future1'和'future2'? – earldouglas

+0

啊。我希望未來能夠採用暫停功能而非正常功能 – benmmurphy

2
object Forks { 

    import scala.util.continuations._ 

    case class Forker(forks: Vector[() => Unit @suspendable]) { 
    def ~(block: => Unit @suspendable): Forker = Forker(forks :+ (() => block)) 
    def joinIf(pred: Int => Boolean): Unit @suspendable = shift { k: (Unit => Unit) => 
     val counter = new java.util.concurrent.atomic.AtomicInteger(forks.size) 
     forks foreach { f => 
     reset { 
      f() 
      if (pred(counter.decrementAndGet)) k() 
     } 
     } 
    } 
    def joinAll() = joinIf(_ == 0) 
    def joinAny() = joinIf(_ == forks.size - 1) 
    } 

    def fork(block: => Unit @suspendable): Forker = Forker(Vector(() => block)) 
} 

使用fork(),我們現在可以等待很多「暫停」。使用〜()來連接暫掛。使用joinAll()等待所有可掛起和joinAny()等待一個。使用joinIf()來定製連接策略。

object Tests extends App { 

    import java.util.{Timer, TimerTask} 
    import scala.util.continuations._ 

    implicit val timer = new Timer 

    def sleep(ms: Int)(implicit timer: Timer): Unit @suspendable = { 
    shift { k: (Unit => Unit) => 
     timer.schedule(new TimerTask { 
     def run = k() 
     }, ms) 
    } 
    } 

    import Forks._ 

    reset { 
    fork { 
     println("sleeping for 2000 ms") 
     sleep(2000) 
     println("slept for 2000 ms") 
    } ~ { 
     println("sleeping for 4000 ms") 
     sleep(4000) 
     println("slept for 4000 ms") 
    } joinAll() 
    println("and we are done") 
    } 
    println("outside reset") 
    readLine 
    timer.cancel 
} 

並且這是輸出。程序開始時間T:

sleeping for 2000 ms 
sleeping for 4000 ms 
outside reset   <<<<<< T + 0 second 
slept for 2000 ms  <<<<<< T + 2 seconds 
slept for 4000 ms  <<<<<< T + 4 seconds 
and we are done  <<<<<< T + 4 seconds