2014-09-11 40 views
4

我有一個函數get: T => scala.concurrent.Future[T]- 再EVAL T => scala.concurrent.Future [T]至過程[1,T]

欲迭代它喜歡:

val futs: Iterator[Future[T]] = Iterator.iterate(get(init)){ 
     _.flatMap(prev => get(prev)) 
    } 

但類型迭代器是Future[T],處理這個迭代器並不容易。

我怎麼能轉移,爲Process[?, T]

(也許T => Future[T]如上下文類型F)。

+0

這是一個有點不清楚(至少對我來說)你問。 'Process.eval'可以讓你將'Future [A]'變成'Process [Future,A]' - 這可能是你想要開始的地方。你需要'Future'的'Catchable'實例來運行它,但是這不應該太難寫。 – 2014-09-11 14:37:55

+0

@TravisBrown我已經讓我的問題更加詳細,可以看看它。 – jilen 2014-09-12 00:11:33

回答

2

不是超級很好的解決方案,但工程

import scala.concurrent.ExecutionContext.Implicits.global 
    import scala.concurrent.{Future => SFuture} 
    import scala.language.implicitConversions 
    import scalaz.concurrent.Task 
    import scalaz.stream._ 

    implicit class Transformer[+T](fut: => SFuture[T]) { 
    def toTask(implicit ec: scala.concurrent.ExecutionContext): Task[T] = { 
     import scala.util.{Success, Failure} 
     import scalaz.syntax.either._ 
     Task.async { 
     register => 
      fut.onComplete { 
      case Success(v) => register(v.right) 
      case Failure(ex) => register(ex.left) 
      } 
     } 
    } 
    } 

    val init: Int = 0 

    def f(i: Int): SFuture[Int] = SFuture(i + 1) 

    val p = Process.repeatEval[Task, Int] { 
    var prev = init 
    f(prev).toTask.map(next => {prev = next; next}) 
    } 

    println(p.take(10).runLog.run) 
2

假設你知道如何轉換的未來 - >任務(無論是通過隱性或通過Process.transform)本應工作:

def get(t:T): Task[T] = ??? 
val initial : T = ??? 

val signal = scalaz.stream.async.signal[T] 

// emit initial value, and follow by any change of `T` within the signal 
val source:Process[Task,T] = eval_(signal.set(t)) fby signal.discrete 

// sink to update `T` within the signal 
val signalSink:Sink[Task,T] = constant((t:T) => signal.set(t)) 

// result, that esentially converts T => Task[T] into Process[Task,T] 
val result: Process[Task,T] = source.observe(signalSink) 
+0

我看不到'get [T]:Task [T]'應用在哪裏,有什麼缺失? – jilen 2014-09-12 23:21:29

2

最後我得到了Pavel Chlupacek想說的話。信號看起來很酷,但對於初學者來說有點神祕。

import scala.concurrent.{Future => SFuture} 
    import scala.language.implicitConversions 
    import scalaz.concurrent.Task 
    import scalaz.stream._ 
    import scala.concurrent.ExecutionContext.Implicits.global 

    implicit class Transformer[+T](fut: => SFuture[T]) { 
    def toTask(implicit ec: scala.concurrent.ExecutionContext): Task[T] = { 
     import scala.util.{Failure, Success} 
     import scalaz.syntax.either._ 
     Task.async { 
     register => 
      fut.onComplete { 
      case Success(v) => register(v.right) 
      case Failure(ex) => register(ex.left) 
      } 
     } 
    } 
    } 

    val init: Int = 0 

    def f(i: Int): SFuture[Int] = SFuture(i + 1) 

    val signal = scalaz.stream.async.signal[Int] 

    // Observe value and push them to signal 
    val signalSink: Process[Task, Int => Task[Unit]] = // =:= Sink[Task, Int] 
    Process.constant((input: Int) => signal.set(input)) 

    // Start from init and then consume from signal 
    val result = (Process.eval(f(init).toTask) ++ signal.discrete.evalMap(i => f(i).toTask)) observe signalSink 

    println(result.take(10).runLog.run) 
+0

感謝這個可運行的代碼 – jilen 2014-09-13 03:50:38

1

我提出的另一個解決方案

def iterate[F[_],A](init: A)(f: A => F[A]): Process[F, A] = { 
    Process.emit(init) ++ Process.await(f(init)) { next => iterate(next)(f)} 
    } 

這已經是scalaz流0.6的特徵,請參閱本pr詳細

中序使用scala.concurrent.Future作爲上下文類型F

我們需要import scalaz.std.scalaFuture._Catchable例如

implicit def futureCatchable(implicit ctx: ExecCtx): Catchable[Future] = { 
    new Catchable[Future] { 
     def attempt[A](f: Future[A]) = f.map(\/-(_)).recover { case e => -\/(e)} 
     def fail[A](err: Throwable) = Future.failed(err) 
    } 
    } 

最後我得到這個:

package stream 
import scala.concurrent._ 
import scalaz._ 
import scalaz.stream._ 

package object future { 
    type ExecCtx = ExecutionContext 

    def iterate[F[_],A](init: A)(f: A => F[A]): Process[F, A] = { 
    Process.emit(init) ++ Process.await(f(init)) { next => iterate(next)(f)} 
    } 

    implicit def futureCatchable(implicit ctx: ExecCtx): Catchable[Future] = { 
    new Catchable[Future] { 
     def attempt[A](f: Future[A]) = f.map(\/-(_)).recover { case e => -\/(e)} 
     def fail[A](err: Throwable) = Future.failed(err) 
    } 
    } 
} 

object futureApp extends App { 
    import scalaz.Scalaz._ 
    import future._ 
    import scala.concurrent.ExecutionContext.Implicits.global 
    def get(i: Int) = Future { 
    println(i + 1) 
    i + 1 
    } 
    iterate(0)(get).takeWhile(_ < 100000).run 
}