2015-05-04 27 views
3

在以下scalaz-stream(取自documentation)的使用示例中,如果輸入和/或輸出是gzip文件,需要更改哪些內容?換句話說,我該如何使用compress斯卡拉斯流充氣的使用示例

import scalaz.stream._ 
import scalaz.concurrent.Task 

val converter: Task[Unit] = 
    io.linesR("testdata/fahrenheit.txt") 
    .filter(s => !s.trim.isEmpty && !s.startsWith("//")) 
    .map(line => fahrenheitToCelsius(line.toDouble).toString) 
    .intersperse("\n") 
    .pipe(text.utf8Encode) 
    .to(io.fileChunkW("testdata/celsius.txt")) 
    .run 

// at the end of the universe... 
val u: Unit = converter.run 

回答

5

壓縮輸出很容易。由於compress.deflate()Process1[ByteVector, ByteVector]你需要插入您的管道,你都散發ByteVector秒(即text.utf8Encode之後這是一個Process1[String, ByteVector]):

val converter: Task[Unit] = 
    io.linesR("testdata/fahrenheit.txt") 
    .filter(s => !s.trim.isEmpty && !s.startsWith("//")) 
    .map(line => fahrenheitToCelsius(line.toDouble).toString) 
    .intersperse("\n") 
    .pipe(text.utf8Encode) 
    .pipe(compress.deflate()) 
    .to(io.fileChunkW("testdata/celsius.zip")) 
    .run 

對於inflate您不能使用io.linesR讀取壓縮文件。您需要一個生產ByteVector s而不是String s的流程,以便將它們流入inflate。 (您可以使用io.fileChunkR)。下一步是將解壓縮的數據解碼爲String(例如text.utf8Decode),然後使用text.lines()逐行發送文本。像這樣的應該做的伎倆:

val converter: Task[Unit] = 
    Process.constant(4096).toSource 
    .through(io.fileChunkR("testdata/fahrenheit.zip")) 
    .pipe(compress.inflate()) 
    .pipe(text.utf8Decode) 
    .pipe(text.lines()) 
    .filter(s => !s.trim.isEmpty && !s.startsWith("//")) 
    .map(line => fahrenheitToCelsius(line.toDouble).toString) 
    .intersperse("\n") 
    .pipe(text.utf8Encode) 
    .to(io.fileChunkW("testdata/celsius.txt")) 
    .run 
+0

感謝您的答覆。如果我在gzip文件中使用上述內容,則會得到'java.util.zip.DataFormatException:不正確的頭檢查',如果我使用'inflate(true)'而不是'java.util.zip.DataFormatException:invalid block type' – mitchus

+0

gzip文件包含額外的頭文件,其中'infalte'(或'inflate'使用的'java.zip.Inflater')不理解,請參閱http://en.wikipedia.org/wiki/Gzip。 'inflate'只能處理DEFLATE壓縮的有效負載 –

+0

如果你想從gzip文件讀取,你最好使用'io.linesR(in:=> InputStream)'和Java的'GZIPInputStream' –