2015-01-05 46 views
8

是否有一些代碼示例使用org.reactivestreams庫來處理使用Java NIO(用於高性能)的大型數據流?我瞄準分佈式處理,所以使用Akka的例子是最好的,但我可以弄清楚。如何使用Reactive Streams進行NIO二進制處理?

它似乎仍然是這種情況,大多數(我希望不是全部)的讀取階度假村文件Source(非二進制)或直接的Java NIO的例子(甚至之類的東西Files.readAllBytes!)

也許我錯過了一個激活模板? (Akka Streams with Scala!是接近處理我需要的一切,除了二進制/ NIO端)

回答

4

我們實際上使用akka流來處理二進制文件。這是一個有點棘手得到的東西去,因爲不解決這個任何文件,但是這是我們想出了:

val binFile = new File(filePath) 
val inputStream = new BufferedInputStream(new FileInputStream(binFile)) 
val binStream = Stream.continually(inputStream.read).takeWhile(-1 != _).map(_.toByte) 
val binSource = Source(binStream) 

一旦你有了binSource,這是一個阿卡Source[Byte]你可以繼續開始應用您想要的任何流轉換(map,flatMap,transform等)。此功能利用Source伴隨對象的apply,該對象需要Iterable,傳入一個應該在數據中緩慢讀入數據的scala Stream,並使其可用於轉換。

編輯

正如康拉德在評論部分中指出,可以把流與較大的文件的問題,由於它執行它遇到因爲它懶洋洋地建立了流元素的記憶化的事實。如果你不小心,這可能會導致內存不足。但是,如果你看一下文檔的Stream有避免記憶化在內存中建立了一個提示:

一個必須謹慎記憶化的;如果你不小心,你可以很快吃掉大量的記憶。原因是流的記憶創建了一個非常類似於 scala.collection.immutable.List的結構。只要頭部保持 ,頭部保持在尾部,所以它遞歸地繼續 。另一方面,如果頭部沒有任何東西(例如我們用def來定義流),那麼一旦它不再是直接使用的 ,它就會消失。

所以考慮到這一點,你可以修改我原來的例子如下:

val binFile = new File(filePath) 
val inputStream = new BufferedInputStream(new FileInputStream(binFile))  
val binSource = Source(() => binStream(inputStream).iterator) 

def binStream(in:BufferedInputStream) = Stream.continually(in.read).takeWhile(-1 != _).map(_.toByte) 

所以這裏的想法是通過def打造Stream,而不是分配給val,然後立即獲得從它的iterator並使用它來初始化Akka Source。以這種方式設置應避免與momoization有關的問題。我對一個大文件運行舊代碼,並通過在Source上執行foreach來產生OutOfMemory的情況。當我將它切換到新代碼時,我能夠避免這個問題。

+2

scala.collection.immutable.Stream的使用在這裏相當危險 - 它使用memoization(!)(請參閱文檔http://www.scala-lang.org/api/current/index.html#scala.collection .immutable.Stream),所以你最終會把整個文件放在內存中,而不是通過(!)來流式傳輸。 –

+0

@ Konrad'ktoso'Malawski,優秀的點。我將發佈更新以解決memoization問題的解決方法。 – cmbaxter

+1

好的更新,公開輸入流的迭代器工作正常。記得在流完成時關閉資源。 –

7

不要使用scala.collection.immutable.Stream來使用這樣的文件,原因是它執行記憶 - 也就是說,雖然它是懶惰的,它會保持整個流緩衝(記憶)在內存中!

這絕對是不是當你想到「流處理文件」時你想要什麼。 Scala Stream的工作原理是因爲在功能設置中它是完全有意義的 - 您可以輕鬆避免計算fibbonachi數字,例如,有關更多詳細信息,請參見ScalaDoc

Akka Streams提供了Reactive Streams實現,並提供了一個FileIO類,您可以在此處使用它(它將正確地反壓並僅在需要時將數據從文件中提取出來,並且其餘流將準備好使用它) :

import java.io._ 
import akka.actor.ActorSystem 
import akka.stream.scaladsl.{ Sink, Source } 

object ExampleApp extends App { 


    implicit val sys = ActorSystem() 
    implicit val mat = FlowMaterializer() 

    FileIO.fromPath(Paths.get("/example/file.txt")) 
    .map(c ⇒ { print(c); c }) 
    .runWith(Sink.onComplete(_ ⇒ { f.close(); sys.shutdown() })) 
} 

以下是有關與IO with Akka Streams 注意,這是當前-AS-的寫作阿卡的版本,所以2.5.X系列的詳細文檔。

希望這會有所幫助!

+0

非常感謝你的回答 - 我必須再次找到我自己的問題才能知道我在找什麼:http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.2/java /stream-io.html#Streaming_File_IO - 而akka 2.4已經不在了(大概意思是說它是NIO 2)! (我會接受一旦你或我更新/使用API​​創建代碼答案) – Stephen

+0

它會真的始終保持整個流內存?或者是否依賴於您持有對流開頭的引用?我的(一廂情願的)印象是,如果你反覆處理尾部並忘記頭部,「流」項最終會被釋放。 – dividebyzero

+0

請閱讀文檔,我已經將它們鏈接到下面; http://www.scala-lang.org/api/current/scala/collection/immutable/Stream.html –