2017-02-25 69 views
1

這是我剛纔的問題:Send big file over reactive stream發送數據

我設法用發送文件在阿卡流FileIO.fromPath(Paths.get(file.toURI())),它工作正常。但是,我想在發送文件之前對其進行壓縮和加密。我創建了方法,打開FileInputStream,通過壓縮流,然後通過加密流路由它,現在我想使用Akka流將其引導到套接字中。

文件 - >的FileInputStream - > CompressedInputStream - > EncryptedInputStream? - >春/阿卡流

的一點是,我可以即時而讀它由一塊塊壓縮/加密文件(我不是在磁盤上創建額外的文件),並且我不知道如何通過Akka/Spring流發送InputStream(壓縮和加密)(Spring流,我的意思是項目Reactor API下的Akka流)。

問題是:如何壓縮,加密和發送文件,而無需將整個壓縮/加密文件保存到磁盤上?

回答

2

實際上,有一個專門用於處理輸入流等資源的源代碼。這就是所謂的Source.unfoldResource

Source<ByteString, NotUsed> source = Source.unfoldResource(
    () -> prepareEncryptedStream(), 
    is -> readChunk(is, 4096), 
    InputStream::close 
); 

Optional<ByteString> readChunk(InputStream is, int size) throws IOException { 
    byte[] data = new byte[size]; 
    int read = is.read(data); 
    if (read < 0) { 
     return Optional.empty(); 
    } 
    return Optional.of(ByteString.fromArray(data, 0, read)); 
} 

InputStream prepareEncryptedStream() { ... } 

這裏prepareCompressedFile()是應該回到你想創建一個反應流加密流的方法,並readChunk()是一個便捷方法讀取指定大小的InputStream一個ByteString

如果您可以將壓縮和加密例程表示爲ByteString -> ByteString函數,那麼您並不需要這些;所有你需要做的就是將這些程序傳遞給map()流量:

Source<ByteString, CompletionStage<IOResult>> source = 
    FileIO.fromPath(Paths.get("somewhere")) 
     .map(bs -> compress(bs)) 
     .map(bs -> encrypt(bs)); 

ByteString encrypt(ByteString bs) { ... } 

ByteString compress(ByteString bs) { ... }