2011-09-23 106 views
4

我正在嘗試編寫一段解除阻塞代碼以從PipedInputStream中讀取。它主要檢查是否有任何調用阻塞前閱讀閱讀API:Java PipedInputStream available()方法返回值

int n = 0; 
if ((n = pipedInputStream_.available()) > 0) { 
    pipedInputStream_.read(...) 
} 

通過java API doc我不能肯定地告訴該檢查應該是什麼閱讀,因爲可能的值爲零(意味着沒有數據,或關閉/破碎流)或大於零。那麼來電者怎麼知道是否有什麼可讀的東西呢? 「

」返回可以從此輸入流中讀取的無阻塞字節數,或者如果通過調用close()方法關閉了此輸入流,或者管道未連接或斷開,則返回0。 「

查看來源,它似乎是唯一的值是零或大於零。

public synchronized int available() throws IOException { 
    if(in < 0) 
     return 0; 
    else if(in == out) 
     return buffer.length; 
    else if (in > out) 
     return in - out; 
    else 
     return in + buffer.length - out; 
} 
+1

零並不意味着封閉或斷開的流。每個規格僅允許零或大於零。 – EJP

+0

你能澄清爲什麼你說零不暗示關閉或破流?請看看我同意的那個解釋。 Java文檔還指出: 「返回: 可以從此輸入流讀取的無阻塞字節數,如果此輸入流已通過調用close()方法關閉,或者管道是未連接或斷開。「 – USQ

+0

你誤會了。零意味着*要麼*沒有數據要被讀取而沒有阻塞*或*流被關閉*或*管道未連接*或*管道損壞。這並不意味着只有這些條件中的一個。 – EJP

回答

2

如果available()返回零,則目前沒有字節可供讀取。根據您引用的文檔,可能有以下幾種原因:

  • 管道已關閉。
  • 管道破裂。
  • 所有先前可用的輸入(如果有)已被使用。

available()威力返回值爲零,意味着已經發生了錯誤,這意味着你將永遠無法通過將來管道讀取任何數據,但你不能肯定地告訴這裏,因爲零可能表示上面的第三個條件,其中在InputStream#read()上的阻塞可能最終產生更多數據,相應的OutputStream側將通過管道推進。

我看不出有可能輪詢PipedInputStreamavailable(),直到有更多數據可用爲止,因爲您永遠無法區分上面的終端情況(第一個和第二個)與讀者是否更加飢餓比作家。像很多流接口一樣,這裏也必須嘗試使用​​並準備好失敗。那是陷阱; InputStream#read()將會阻止,但直到您承諾阻止嘗試閱讀時,您才能夠辨別出沒有更多輸入即將到來。

將您的消費行爲建立在available()上並不可行。如果它返回一個正數,則有需要閱讀,但當然,即使現在有什麼可能不足以滿足您的消費者。如果您提交一個線程以阻塞方式使用InputStream並跳過available()的輪詢,則會發現您的應用程序更易於管理。讓InputStream#read()成爲你在這裏的唯一預言。

+0

謝謝。是的,在查看PipedInputStream代碼的其餘部分,尤其是循環緩衝區的實現/使用之後,我看到第三個終端條件也是零的原因之一。 – USQ

0

我需要一個過濾器來攔截速度較慢的連接,因爲我需要儘快關閉數據庫連接,所以我最初使用Java管道,但是當它們的實現看起來更接近時,它們都是同步的,因此我最終使用一個小緩衝區創建了自己的QueueInputStream和阻塞隊列將緩衝區放入隊列中一旦被填滿,除非在LinkedBlockingQueue中使用的鎖定條件,在小緩衝區的幫助下它應該便宜,否則它是無鎖的,該類僅用於單生產者和消費者每個實例:

import java.io.IOException; 
import java.io.OutputStream; 
import java.util.concurrent.*; 

public class QueueOutputStream extends OutputStream 
{ 
    private static final int DEFAULT_BUFFER_SIZE=1024; 
    private static final byte[] END_SIGNAL=new byte[]{-1}; 

    private final BlockingQueue<byte[]> queue=new LinkedBlockingDeque<>(); 
    private final byte[] buffer; 

    private boolean closed=false; 
    private int count=0; 

    public QueueOutputStream() 
    { 
    this(DEFAULT_BUFFER_SIZE); 
    } 

    public QueueOutputStream(final int bufferSize) 
    { 
    if(bufferSize<=0){ 
     throw new IllegalArgumentException("Buffer size <= 0"); 
    } 
    this.buffer=new byte[bufferSize]; 
    } 

    private synchronized void flushBuffer() 
    { 
    if(count>0){ 
     final byte[] copy=new byte[count]; 
     System.arraycopy(buffer,0,copy,0,count); 
     queue.offer(copy); 
     count=0; 
    } 
    } 

    @Override 
    public synchronized void write(final int b) throws IOException 
    { 
    if(closed){ 
     throw new IllegalStateException("Stream is closed"); 
    } 
    if(count>=buffer.length){ 
     flushBuffer(); 
    } 
    buffer[count++]=(byte)b; 
    } 

    @Override 
    public synchronized void close() throws IOException 
    { 
    flushBuffer(); 
    queue.offer(END_SIGNAL); 
    closed=true; 
    } 

    public Future<Void> asyncSendToOutputStream(final ExecutorService executor, final OutputStream outputStream) 
    { 
    return executor.submit(
      new Callable<Void>() 
      { 
       @Override 
       public Void call() throws Exception 
       { 
       try{ 
        byte[] buffer=queue.take(); 
        while(buffer!=END_SIGNAL){ 
        outputStream.write(buffer); 
        buffer=queue.take(); 
        } 
        outputStream.flush(); 
       } catch(Exception e){ 
        close(); 
        throw e; 
       } finally{ 
        outputStream.close(); 
       } 
       return null; 
       } 
      } 
    ); 
    } 

}