2012-02-07 96 views
1

我正在嘗試使用管道輸入流寫入數據。但是從線程轉儲看起來好像管道輸入流上存在鎖定。管道輸入流被鎖定

PipedOutputStream pos = new PipedOutputStream(); 
PipedInputStream pis = new PipedInputStream(pos); 
FileInputStream fis = null; 
GZIPOutputStream gos = null; 
byte[] buffer = new byte[1024]; 
try { 
    fis = new FileInputStream(file); 
    gos = new GZIPOutputStream(pos); 
    int length; 
    while ((length = fis.read(buffer, 0, 1024)) != -1) 
     gos.write(buffer, 0, length); 
    } catch(Exception e){ 
     print("Could not read the file"); 
    } 
    finally { 
     try { 
      fis.close(); 
      gos.close(); 
     }catch (Exception ie){ 
      printException(ie); 
     } 
    } 
writeObject(pis); 
pos.close(); 

writeobj方法將簡單地從流中讀取,但read方法被鎖定。 線程轉儲指示一些等待管道輸入流。

main" prio=10 tid=0x08066000 nid=0x48d2 in Object.wait() [0xb7fd2000..0xb7fd31e8] 
    java.lang.Thread.State: TIMED_WAITING (on object monitor) 
    at java.lang.Object.wait(Native Method) 
    - waiting on <0xa5c28be8> (a java.io.PipedInputStream) 
    at java.io.PipedInputStream.awaitSpace(PipedInputStream.java:257) 
    at java.io.PipedInputStream.receive(PipedInputStream.java:215) 
    - locked <0xa5c28be8> (a java.io.PipedInputStream) 
    at java.io.PipedOutputStream.write(PipedOutputStream.java:132) 
    at java.util.zip.GZIPOutputStream.finish(GZIPOutputStream.java:95) 
    at java.util.zip.DeflaterOutputStream.close(DeflaterOutputStream.java:146) 

    Locked ownable synchronizers: 
    - None 

我不確定是誰鎖定了它。閱讀文檔以確定鎖定呼叫。但無法弄清楚什麼是錯誤的以及如何克服它。

回答

4

使用PipedInputStream和PipedOutputStream必須位於不同的線程中。

仔細閱讀的Javadoc: http://docs.oracle.com/javase/6/docs/api/java/io/PipedInputStream.html

典型地,數據被從一個對象的PipedInputStream由一個線程讀取,並且數據被從其他線程寫入相應的PipedOutputStream。不建議嘗試從單個線程使用這兩個對象,因爲它可能使線程死鎖。

0

我需要一個過濾器攔截,我需要儘快關閉數據庫連接,所以我最初使用Java的管道,但是當他們實現更緊密看着連接速度慢,這是所有同步,所以我結束了使用創造我自己的QueueInputStream小緩衝區和阻塞隊列將緩衝區放入隊列中,一旦被填滿,除非在LinkedBlockingQueue中使用的鎖定條件,在小緩衝區的幫助下它應該便宜,否則它是無鎖的,該類僅用於使用單個生產者和消費者的每個實例:

import java.io.IOException; 
import java.io.OutputStream; 
import java.util.concurrent.*; 

public class QueueOutputStream extends OutputStream 
{ 
    private static final int DEFAULT_BUFFER_SIZE=1024; 
    private static final byte[] END_SIGNAL=new byte[]{}; 

    private final BlockingQueue<byte[]> queue=new LinkedBlockingDeque<>(); 
    private final byte[] buffer; 

    private boolean closed=false; 
    private int count=0; 

    public QueueOutputStream() 
    { 
    this(DEFAULT_BUFFER_SIZE); 
    } 

    public QueueOutputStream(final int bufferSize) 
    { 
    if(bufferSize<=0){ 
     throw new IllegalArgumentException("Buffer size <= 0"); 
    } 
    this.buffer=new byte[bufferSize]; 
    } 

    private synchronized void flushBuffer() 
    { 
    if(count>0){ 
     final byte[] copy=new byte[count]; 
     System.arraycopy(buffer,0,copy,0,count); 
     queue.offer(copy); 
     count=0; 
    } 
    } 

    @Override 
    public synchronized void write(final int b) throws IOException 
    { 
    if(closed){ 
     throw new IllegalStateException("Stream is closed"); 
    } 
    if(count>=buffer.length){ 
     flushBuffer(); 
    } 
    buffer[count++]=(byte)b; 
    } 

    @Override 
    public synchronized void write(final byte[] b, final int off, final int len) throws IOException 
    { 
    super.write(b,off,len); 
    } 

    @Override 
    public synchronized void close() throws IOException 
    { 
    flushBuffer(); 
    queue.offer(END_SIGNAL); 
    closed=true; 
    } 

    public Future<Void> asyncSendToOutputStream(final ExecutorService executor, final OutputStream outputStream) 
    { 
    return executor.submit(
      new Callable<Void>() 
      { 
       @Override 
       public Void call() throws Exception 
       { 
       try{ 
        byte[] buffer=queue.take(); 
        while(buffer!=END_SIGNAL){ 
        outputStream.write(buffer); 
        buffer=queue.take(); 
        } 
        outputStream.flush(); 
       } catch(Exception e){ 
        close(); 
        throw e; 
       } finally{ 
        outputStream.close(); 
       } 
       return null; 
       } 
      } 
    ); 
    } 
3

的PipedInputStream有一個小的非膨脹緩衝。一旦緩衝區滿了,寫入PipedOutputStream塊,直到被另一個線程讀取緩衝輸入。您不能在同一個線程中使用這兩個線程,因爲寫入操作將等待不會發生的讀取。

在你的情況,你是不是讀的任何數據,直到你已經寫了這一切,因此該解決方案是使用ByteArrayOutputStreamByteArrayInputStream代替:

  1. 所有數據都寫入到一個ByteArrayOutputStream。
  2. 完成後,調用流上的ByteArray()以檢索字節數據。
  3. (可選)使用字節數據創建一個ByteArrayInputStream,將其作爲InputStream讀取。