2014-04-25 75 views
2

我想用一個昂貴的操作※使用scalaz-stream處理數據流。用chunk和zipWithIndex在scalaz-stream中令人費解的行爲

scala> :paste 
// Entering paste mode (ctrl-D to finish) 

    def expensive[T](x:T): T = { 
     println(s"EXPENSIVE! $x") 
     x 
    } 
    ^D 
// Exiting paste mode, now interpreting. 

expensive: [T](x: T)T 

※是的,是的,我知道混合代碼與副作用是不好的函數式編程風格。打印語句僅用於跟蹤昂貴()被調用的次數。)

在將數據傳遞給昂貴的操作之前,我首先需要將它分成塊。

scala> val chunked: Process[Task,Vector[Int]] = Process.range(0,4).chunk(2) 
chunked: scalaz.stream.Process[scalaz.concurrent.Task,Vector[Int]] = Await([email protected],<function1>,Emit(SeqView(...),Halt(scalaz.stream.Process$End$)),Emit(SeqView(...),Halt(scalaz.stream.Process$End$))) 

scala> chunked.runLog.run 
res1: scala.collection.immutable.IndexedSeq[Vector[Int]] = Vector(Vector(0, 1), Vector(2, 3), Vector()) 

然後,我將昂貴的操作映射到塊的流上。

scala> val processed = chunked.map(expensive) 
processed: scalaz.stream.Process[scalaz.concurrent.Task,Vector[Int]] = Await([email protected],<function1>,Emit(SeqViewM(...),Halt(scalaz.stream.Process$End$)),Emit(SeqViewM(...),Halt(scalaz.stream.Process$End$))) 

當我執行此,它調用昂貴的()預期的次數:

scala> processed.runLog.run 
EXPENSIVE! Vector(0, 1) 
EXPENSIVE! Vector(2, 3) 
EXPENSIVE! Vector() 
res2: scala.collection.immutable.IndexedSeq[Vector[Int]] = Vector(Vector(0, 1), Vector(2, 3), Vector()) 

但是,如果我鏈zipWithIndex一個電話,昂貴的()被調用很多次:

>scala processed.zipWithIndex.runLog.run 
EXPENSIVE! Vector() 
EXPENSIVE! Vector() 
EXPENSIVE! Vector() 
EXPENSIVE! Vector() 
EXPENSIVE! Vector(0) 
EXPENSIVE! Vector(0) 
EXPENSIVE! Vector(0) 
EXPENSIVE! Vector(0) 
EXPENSIVE! Vector(0, 1) 
EXPENSIVE! Vector() 
EXPENSIVE! Vector() 
EXPENSIVE! Vector() 
EXPENSIVE! Vector() 
EXPENSIVE! Vector(2) 
EXPENSIVE! Vector(2) 
EXPENSIVE! Vector(2) 
EXPENSIVE! Vector(2) 
EXPENSIVE! Vector(2, 3) 
EXPENSIVE! Vector() 
EXPENSIVE! Vector() 
EXPENSIVE! Vector() 
EXPENSIVE! Vector() 
EXPENSIVE! Vector() 
EXPENSIVE! Vector() 
res3: scala.collection.immutable.IndexedSeq[(Vector[Int], Int)] = Vector((Vector(0, 1),0), (Vector(2, 3),1), (Vector(),2)) 

這是一個錯誤?如果這是所需的行爲,任何人都可以解釋爲什麼?如果昂貴()需要很長時間,您可以看到爲什麼我更喜歡使用較少的調用結果。

下面是更多的例子一個要點:https://gist.github.com/underspecified/11279251

回答

2

你看到這個issue,可以採取一些different forms。問題實質上是,map可以看到(並做些什麼)chunk正在建立其結果的中間步驟。

此行爲may change in the future,但在此期間有一些可能的解決方法。最簡單的一種是包裝您昂貴的功能在處理和使用的flatMap代替map

chunked.flatMap(a => 
    Process.eval(Task.delay(expensive(a))) 
).zipWithIndex.runLog.run 

另一種解決方案是包裝您昂貴的功能在effectful道:

def expensiveChannel[A] = Process.constant((a: A) => Task.delay(expensive(a))) 

現在你可以使用through

chunked.through(expensiveChannel).zipWithIndex.runLog.run 

雖然目前的行爲可能有點令人驚訝,但它也是一個很好的提醒t您應該使用類型系統來幫助您跟蹤全部您關心的效果(以及長時間運行的計算可以是其中之一)。

+0

感謝您的解釋和參考。在延遲中包裝我昂貴的流程消除了多餘的呼叫。從本質上講,問題在於scala-stream如何混合嚴格和懶惰的評估,對吧?我仍然不明白的是,爲什麼chunk的結果在調用map之前沒有完全確定。流數據是不可能的? – underspecified