2008-12-13 88 views
9

有沒有人有任何建議用Java創建管道對象,都是InputStream和OutputStream,因爲Java沒有多重繼承,並且這兩個流都是抽象類而不是接口?Java中的輸入和輸出流管道

底層需求是將一個對象傳遞給需要InputStream或OutputStream的對象,以便將來自一個線程的輸出管道輸入另一個線程。

回答

8

看來這個問題的重點正在被忽略。如果我理解正確,您需要一個在一個線程中像InputStream一樣運行的對象,另一個在另一個線程中使用OutputStream來創建兩個線程之間的通信方式。

也許一個答案是使用組合而不是繼承(無論如何,這是推薦的做法)。使用getInputStream()和getOutputStream()方法創建一個包含PipedInputStream和PipedOutputStream的管道,並相互連接。

您不能直接將Pipe對象傳遞給需要流的對象,但可以傳遞它的get方法的返回值。

這是否適合您?

3

這是一件很常見的事情,我想。看到這個問題。

Easy way to write contents of a Java InputStream to an OutputStream

+0

I瞭解PipedXxxStream ...但我只想創建一個Pipe對象,這個對象可以作爲InputStream給予一個線程,並將OutputStream給予另一個。希望我可能錯過了一些東西。 – Baginsss 2008-12-13 07:35:19

+0

編寫一些可以爲輸出流提供管道到輸出流的東西並不難,但是你將無法擴展InputStream和OutputStream。繼承,不好。組成,很好。 – Apocalisp 2008-12-13 08:14:43

5

java.io.PipedOutputStream中和java.io.PipedInputStream中看起來是使用了此方案的課程。它們被設計成一起用於在線程之間管理數據。

如果你真的想要傳遞一個單獨的對象,它需要包含其中的每一個,並通過getter公開它們。

1

不能創造出既從InputStreamOutputStream派生,因爲這些都不是接口和他們有共同的方法和Java不允許多重繼承(編譯器不知道是否調用類InputStream.close()OutputStream.close()如果您在新對象上調用close())。

另一個問題是緩衝區。 Java想爲數據分配一個靜態緩衝區(不會改變)。這意味着當你使用`java.io.PipedXxxStream'時,寫入數據將最終阻塞,除非你使用兩個不同的線程。

所以Apocalisp的答案是正確的:你必須寫一個複製循環。

我建議你在你的項目中包含Apache的commons-io,它包含許多隻用於這樣的任務的輔助例程(在流,文件,字符串及其所有組合之間複製數據)。

+0

您可以使用內部類來實現與共享緩衝區時同時公開輸入流和輸出流的類相似的內容。 – Charlie 2015-10-08 06:08:08

0

我不得不實現用於低速連接到的Servlet所以基本上我包裹servlet的輸出流分成QueueOutputStream這將增加每一個字節(在小的緩衝區)的過濾器,進入隊列,然後輸出那些小的緩衝區到第二個輸出流,所以這種方式作爲輸入/輸出流,恕我直言,這比JDK管道更好,它不會很好地擴展,基本上,標準JDK實現中的上下文切換太多讀/寫),阻塞隊列僅適用於單個生產者/消費者場景:

import java.io.IOException; 
import java.io.OutputStream; 
import java.util.concurrent.*; 

public class QueueOutputStream extends OutputStream 
{ 
    private static final int DEFAULT_BUFFER_SIZE=1024; 
    private static final byte[] END_SIGNAL=new byte[]{}; 

    private final BlockingQueue<byte[]> queue=new LinkedBlockingDeque<>(); 
    private final byte[] buffer; 

    private boolean closed=false; 
    private int count=0; 

    public QueueOutputStream() 
    { 
    this(DEFAULT_BUFFER_SIZE); 
    } 

    public QueueOutputStream(final int bufferSize) 
    { 
    if(bufferSize<=0){ 
     throw new IllegalArgumentException("Buffer size <= 0"); 
    } 
    this.buffer=new byte[bufferSize]; 
    } 

    private synchronized void flushBuffer() 
    { 
    if(count>0){ 
     final byte[] copy=new byte[count]; 
     System.arraycopy(buffer,0,copy,0,count); 
     queue.offer(copy); 
     count=0; 
    } 
    } 

    @Override 
    public synchronized void write(final int b) throws IOException 
    { 
    if(closed){ 
     throw new IllegalStateException("Stream is closed"); 
    } 
    if(count>=buffer.length){ 
     flushBuffer(); 
    } 
    buffer[count++]=(byte)b; 
    } 

    @Override 
    public synchronized void write(final byte[] b, final int off, final int len) throws IOException 
    { 
    super.write(b,off,len); 
    } 

    @Override 
    public synchronized void close() throws IOException 
    { 
    flushBuffer(); 
    queue.offer(END_SIGNAL); 
    closed=true; 
    } 

    public Future<Void> asyncSendToOutputStream(final ExecutorService executor, final OutputStream outputStream) 
    { 
    return executor.submit(
      new Callable<Void>() 
      { 
       @Override 
       public Void call() throws Exception 
       { 
       try{ 
        byte[] buffer=queue.take(); 
        while(buffer!=END_SIGNAL){ 
        outputStream.write(buffer); 
        buffer=queue.take(); 
        } 
        outputStream.flush(); 
       } catch(Exception e){ 
        close(); 
        throw e; 
       } finally{ 
        outputStream.close(); 
       } 
       return null; 
       } 
      } 
    ); 
    }