我想用一個昂貴的操作※使用scalaz-stream處理數據流。用chunk和zipWithIndex在scalaz-stream中令人費解的行爲
scala> :paste
// Entering paste mode (ctrl-D to finish)
def expensive[T](x:T): T = {
println(s"EXPENSIVE! $x")
x
}
^D
// Exiting paste mode, now interpreting.
expensive: [T](x: T)T
※是的,是的,我知道混合代碼與副作用是不好的函數式編程風格。打印語句僅用於跟蹤昂貴()被調用的次數。)
在將數據傳遞給昂貴的操作之前,我首先需要將它分成塊。
scala> val chunked: Process[Task,Vector[Int]] = Process.range(0,4).chunk(2)
chunked: scalaz.stream.Process[scalaz.concurrent.Task,Vector[Int]] = Await([email protected],<function1>,Emit(SeqView(...),Halt(scalaz.stream.Process$End$)),Emit(SeqView(...),Halt(scalaz.stream.Process$End$)))
scala> chunked.runLog.run
res1: scala.collection.immutable.IndexedSeq[Vector[Int]] = Vector(Vector(0, 1), Vector(2, 3), Vector())
然後,我將昂貴的操作映射到塊的流上。
scala> val processed = chunked.map(expensive)
processed: scalaz.stream.Process[scalaz.concurrent.Task,Vector[Int]] = Await([email protected],<function1>,Emit(SeqViewM(...),Halt(scalaz.stream.Process$End$)),Emit(SeqViewM(...),Halt(scalaz.stream.Process$End$)))
當我執行此,它調用昂貴的()預期的次數:
scala> processed.runLog.run
EXPENSIVE! Vector(0, 1)
EXPENSIVE! Vector(2, 3)
EXPENSIVE! Vector()
res2: scala.collection.immutable.IndexedSeq[Vector[Int]] = Vector(Vector(0, 1), Vector(2, 3), Vector())
但是,如果我鏈zipWithIndex一個電話,昂貴的()被調用很多次:
>scala processed.zipWithIndex.runLog.run
EXPENSIVE! Vector()
EXPENSIVE! Vector()
EXPENSIVE! Vector()
EXPENSIVE! Vector()
EXPENSIVE! Vector(0)
EXPENSIVE! Vector(0)
EXPENSIVE! Vector(0)
EXPENSIVE! Vector(0)
EXPENSIVE! Vector(0, 1)
EXPENSIVE! Vector()
EXPENSIVE! Vector()
EXPENSIVE! Vector()
EXPENSIVE! Vector()
EXPENSIVE! Vector(2)
EXPENSIVE! Vector(2)
EXPENSIVE! Vector(2)
EXPENSIVE! Vector(2)
EXPENSIVE! Vector(2, 3)
EXPENSIVE! Vector()
EXPENSIVE! Vector()
EXPENSIVE! Vector()
EXPENSIVE! Vector()
EXPENSIVE! Vector()
EXPENSIVE! Vector()
res3: scala.collection.immutable.IndexedSeq[(Vector[Int], Int)] = Vector((Vector(0, 1),0), (Vector(2, 3),1), (Vector(),2))
這是一個錯誤?如果這是所需的行爲,任何人都可以解釋爲什麼?如果昂貴()需要很長時間,您可以看到爲什麼我更喜歡使用較少的調用結果。
下面是更多的例子一個要點:https://gist.github.com/underspecified/11279251
感謝您的解釋和參考。在延遲中包裝我昂貴的流程消除了多餘的呼叫。從本質上講,問題在於scala-stream如何混合嚴格和懶惰的評估,對吧?我仍然不明白的是,爲什麼chunk的結果在調用map之前沒有完全確定。流數據是不可能的? – underspecified