2017-02-12 80 views
0

我想跟隨在一些差距https://github.com/functional-streams-for-scala/fs2/wiki/Binding-to-asynchronous-processes從隊列中scalaz

灌裝的第一個例子創建過程,並增加了一些調試打印我到了下面的代碼:

import java.util.concurrent.ScheduledExecutorService 
import scala.concurrent.{Await, Future} 
import scala.concurrent.duration._ 
import scalaz.concurrent.Task 
import scalaz.stream.async.mutable.Queue 
import scalaz.stream.{Process, Sink} 

object ProcessTest { 

    def main(args: Array[String]): Unit = { 
    import scala.concurrent.ExecutionContext.Implicits.global 
    import scalaz.stream.async 

    val q: Queue[Int] = async.unboundedQueue[Int] 
    val src: Process[Task, Int] = q.dequeue 

    // Thread 1 
    val f1 = Future { 
     for (i <- 0 to 10) { 
     println(s"enqueueOne $i") 
     Thread.sleep(100) 
     q.enqueueOne(i) 
     } 
     println("closing") 
     q.close 
     println("closed") 
    } 

    // Thread 2 
    val f2 = Future { 
     val buf = new collection.mutable.ArrayBuffer[Int] 
     val snk: Sink[Task, Int] = scalaz.stream.io.fillBuffer(buf) 
     val run: Task[Unit] = src.map(x => { 
     println(s"map $x") 
     x 
     }).to(snk).run 
     println("running") 
     run.get.runFor(3.seconds) 
     println(s"result = ${buf.toList}") 
    } 

    Await.result(f1, 10.seconds) 
    Await.result(f2, 10.seconds) 
    } 
} 

當我嘗試運行此,線程2中沒有收到任何內容:

enqueueOne 0 
running 
enqueueOne 1 
enqueueOne 2 
enqueueOne 3 
enqueueOne 4 
enqueueOne 5 
enqueueOne 6 
enqueueOne 7 
enqueueOne 8 
enqueueOne 9 
enqueueOne 10 
closing 
closed 
[error] (run-main-9) java.util.concurrent.TimeoutException 

我做錯了什麼? 這個阻塞在哪裏?

(我用scalaz流0.8.6)

回答

1

好吧,我發現這個問題:enqueueOneclose回報任務,必須運行:

// Thread 1 
val f1 = Future { 
    for (i <- 0 to 10) { 
    println(s"enqueueOne $i") 
    Thread.sleep(100) 
    q.enqueueOne(i).run 
    } 
    println("closing") 
    q.close.run 
    println("closed") 
}