2017-08-31 54 views
1

如何將 Stream<Object>轉換爲InputStream?目前,我得到的迭代器,並遍歷所有的數據將其轉換爲ByteArray並將其添加到一個InputStream的:流<Object>到InputStream

ByteArrayOutputStream bos = new ByteArrayOutputStream(); 
ObjectOutputStream oos = new ObjectOutputStream(bos); 

Iterator<MyType> myItr = MyObject.getStream().iterator(); 

while (myItr.hasNext()) { 

     oos.writeObject(myItr.next().toString() 
     .getBytes(StandardCharsets.UTF_8)); 
    } 
    oos.flush(); 
    oos.close(); 

    InputStream is = new ByteArrayInputStream(bao.toByteArray()); 

是什麼,雖然這樣做的開銷?如果我的數據流包含1TB的數據,我不會將1TB的數據吸入內存嗎?有沒有更好的方法來實現這一目標?

+0

您確保Stream這種InputStream是你需要的嗎?您將對象轉換爲字符串,將它們的UTF-8表示形式作爲字節數組使用,並對這些數組對象使用對象序列化。完全不清楚你想在另一端接收什麼,目前它既不是,對象也不是字符串。您可以直接編寫字符串,也可以使用'Writer'編寫簡單的文本表示,而不用使用對象序列化協議的開銷,但都不會重新創建原始對象。 – Holger

回答

2

你應該能夠在OutputStream轉換成使用管道的InputStream

PipedOutputStream pos = new PipedOutputStream(); 
InputStream is = new PipedInputStream(pos); 

new Thread(() -> { 
    try (ObjectOutputStream oos = new ObjectOutputStream(pos)) { 
     Iterator<MyType> myItr = MyObject.getStream().iterator(); 
     while (myItr.hasNext()) { 
      oos.writeObject(myItr.next().toString() 
       .getBytes(StandardCharsets.UTF_8)); 
     } 
    } catch (IOException e) { 
     // handle closed pipe etc. 
    } 
}).start(); 

靈感來自this answer

+0

嗯......我不認爲我很明白這段代碼應該做什麼? pos永遠不會被分配任何數據:S,看起來我們正在循環並將myItr中的內容無緣無故添加到oos中,因爲它不會與輸入流包含的內容關聯起來?我在這裏錯過了什麼嗎? – BigBug

+1

'is'和'oos'都綁定到'pos',它將數據從輸出流傳輸到輸入流。但現在我想到了它,它仍然需要將它全部緩衝在內存中,所以這並不能真正解決您的問題。 – shmosel

+2

@BigBug好吧,我把它全部緩衝到內存中是錯誤的。它實際上會阻塞一旦達到緩衝區大小,這意味着您必須在單獨的線程上提供輸出流。看到我更新的答案。 – shmosel

0

這對你有用嗎?

https://gist.github.com/stephenhand/292cdd8bba7a452d83c51c00d9ef113c

這是一個InputStream實現,需要一個Stream<byte[]>作爲輸入數據。你只需要.map()你的abitrary對象到字節數組,但是你希望每個對象被表示爲字節。

它只是呼籲時InputStream在讀取Stream終端操作,拉動對象關爲消費者讀更多InputStream,因此它永遠不會加載整套到內存