2012-11-09 53 views
2

我發現了一個related question,但它並不是特別有用,因爲它沒有提供完整的示例。如何使用AsynchronousSocketChannel#循環讀取或遞歸?

的問題:如何使用固定大小的緩衝區

第一次嘗試(讀取一次)使用AsynchronousSocketChannel未知長度的讀取數據:

final int bufferSize = 1024; 
final SocketAddress address = /*ip:port*/; 
final ThreadFactory threadFactory = Executors.defaultThreadFactory(); 
final ExecutorService executor = Executors.newCachedThreadPool(threadFactory); 
final AsynchronousChannelGroup asyncChannelGroup = AsynchronousChannelGroup.withCachedThreadPool(executor, 5); 
final AsynchronousSocketChannel client = AsynchronousSocketChannel.open(asyncChannelGroup); 
client.connect(address).get(5, TimeUnit.SECONDS);//block until the connection is established 

//write the request 
Integer bytesWritten = client.write(StandardCharsets.US_ASCII.encode("a custom request in a binary format")).get(); 

//read the response 
final ByteBuffer readTo = ByteBuffer.allocate(bufferSize); 
final StringBuilder responseBuilder = new StringBuilder(); 
client.read(readTo, readTo, new CompletionHandler<Integer, ByteBuffer>() { 
     public void completed(Integer bytesRead, ByteBuffer buffer) { 
      buffer.flip(); 
      responseBuilder.append(StandardCharsets.US_ASCII.decode(buffer).toString()); 
      try { 
       client.close(); 
      } catch (IOException e) { 
       e.printStackTrace(); 
      } 
     } 

     public void failed(Throwable exc, ByteBuffer attachment) { 
      try { 
       client.close(); 
      } catch (IOException e) { 
       e.printStackTrace(); 
      } 
     } 
    } 
); 
asyncChannelGroup.awaitTermination(5, TimeUnit.SECONDS); 
asyncChannelGroup.shutdown(); 
System.out.println(responseBuilder.toString()); 

我需要什麼樣的變化做出乾淨地執行連續讀入緩衝區,而bytesRead != -1(即流到達尾)?

回答

1

這裏是什麼,我落得這樣做(使用GuavaListenableFuture)的簡化版本:使用Commons Pool

class SomeUtilClass { 
public interface Processor<T> { 
    boolean process(Integer byteCount, ByteBuffer buffer); 
    T result(); 
} 
public static <T> ListenableFuture<T> read(
    final AsynchronousSocketChannel delegate, 
    final Processor<T> processor, 
    ByteBuffer buffer 
) { 
    final SettableFuture<T> resultFuture = SettableFuture.create(); 
    delegate.read(buffer, buffer, new Handler<T, Integer, ByteBuffer>(resultFuture) { 
     public void completed(Integer bytesRead, ByteBuffer buffer) { 
      buffer.flip(); 
      if(processor.process(bytesRead, buffer)) { 
       buffer.clear(); 
       delegate.read(buffer, buffer, this); 
      } else { 
       resultFuture.set(processor.result()); 
      } 
     } 
    }); 
    return resultFuture; 
} 
} 

進一步改進包括ByteBuffer s

0

在我看來,最簡單的方法是將此代碼拆分爲自己的方法,然後使用CompletionHandler以bytesRead != -1遞歸調用該方法。這樣,您可以將代碼的責任分開,避免在異步讀取運行時「忙等待」或使用Thread.sleep()的必要性。你可以,當然,也增加的情況下,當bytesRead == -1跟已在讀取數據的東西。

0

,因爲他們對AREN線程上運行,我不會做任何事情,從回調方法failedcompleted延長」在你的控制之下。

我知道你要繼續偵聽套接字中的新字節,即使數據流已到達其末尾(bytesRead == -1)。 將read方法放入while(true)循環中。在裏面,聽一聽failedcompleted方法設置的​​字段。我們稱之爲myBytesRead

爲了能夠停止無盡的讀取,請將while(true)替換爲其他​​條件。

private static final BYTES_READ_INIT_VALUE = Integer.MIN_VALUE; 
private static final BYTES_READ_COMPLETED_VALUE = -1; 
private static final BYTES_READ_FAILED_VALUE = -2; 
private Integer myBytesRead = BYTES_READ_INIT_VALUE; 

private void setMyBytesRead(final Integer bytesRead) { 
    synchronized(myBytesRead) { 
     this.myBytesRead = bytesRead; 
    } 
} 

private Integer getMyBytesRead() { 
    synchronized(myBytesRead) { 
     return myBytesRead; 
    } 
} 

... 

// in your method 
while (true) { 
    final int lastBytesRead = getMyBytesRead(); 
    if (lastBytesRead == BYTES_READ_FAILED_VALUE) { 
     // log failure and retry? 
    } else if (lastBytesRead != BYTES_READ_COMPLETED_VALUE) { 
     // Thread.sleep(a while); to avoid exhausting CPU 
     continue; 
    } 

    // else lastBytesRead == BYTES_READ_COMPLETED_VALUE and you can start a new read operation 
    client.read(readTo, readTo, new CompletionHandler<Integer, ByteBuffer>() { 
      public void completed(Integer bytesRead, ByteBuffer buffer) { 
       setMyBytesRead(bytesRead); 
       buffer.flip(); 
       responseBuilder.append(StandardCharsets.US_ASCII.decode(buffer).toString()); 
       try { 
        client.close(); 
       } catch (IOException e) { 
        e.printStackTrace(); 
       } 
      } 

      public void failed(Throwable exc, ByteBuffer attachment) { 
       try { 
        setMyBytesRead(BYTES_READ_FAILED_VALUE); 
        client.close(); 
       } catch (IOException e) { 
        e.printStackTrace(); 
       } 
      } 
     } 
    ); 
} 
0

我的初步嘗試:

package com.example; 

import java.nio.ByteBuffer; 
import java.nio.channels.AsynchronousSocketChannel; 
import java.nio.charset.StandardCharsets; 
import java.util.concurrent.Callable; 
import java.util.concurrent.ExecutionException; 

public class LoopingReader implements Callable<String> { 
    final AsynchronousSocketChannel client; 
    final String responseTerminator; 
    final StringBuilder responseBuilder; 

    LoopingReader(
     AsynchronousSocketChannel client, 
     String responseTerminator 
    ) { 
     this.client = client; 
     this.responseTerminator = responseTerminator; 

     responseBuilder = new StringBuilder(); 
    } 

    public String call() { 
     boolean doLoop; 
     do { 
      int bytesRead = executeIteration();//blocking 
      boolean didReachEndOfStream = bytesRead == -1; 
      boolean didEncounterResponseTerminator = responseBuilder.indexOf(responseTerminator) != -1; 

      doLoop = !didReachEndOfStream && !didEncounterResponseTerminator; 
     } while(doLoop); 
     return responseBuilder.toString(); 
    } 

    int executeIteration() { 
     final ByteBuffer buffer = ByteBuffer.allocate(256);//use pool here 
     final int bytesRead; 
     try { 
      bytesRead = client.read(buffer).get(); 
     } catch (InterruptedException | ExecutionException e) { 
      throw new IllegalStateException("Failed to read", e); 
     } 
     decodeAndAppend(buffer); 
     return bytesRead; 
    } 

    void decodeAndAppend(ByteBuffer buffer) { 
     buffer.flip(); 
     responseBuilder.append(StandardCharsets.US_ASCII.decode(buffer)); 
    } 
}