2014-02-18 103 views
-1

我創建將處理> 1000所連接的服務器。我決定在服務器中使用非阻塞IO。我在互聯網上發現了一些代碼,它基本上是一個回顯服務器。我認爲一切都很好,但我不明白服務器中的一些概念。非阻塞IO Java中與邏輯

import java.io.IOException; 
import java.net.InetAddress; 
import java.net.InetSocketAddress; 
import java.net.Socket; 
import java.net.SocketAddress; 
import java.nio.ByteBuffer; 
import java.nio.channels.SelectionKey; 
import java.nio.channels.Selector; 
import java.nio.channels.ServerSocketChannel; 
import java.nio.channels.SocketChannel; 
import java.util.*; 

public class EchoServer { 
    private InetAddress addr; 
    private int port; 
    private Selector selector; 
    private Map<SocketChannel,List<byte[]>> dataMap; 

public EchoServer(InetAddress addr, int port) throws IOException { 
    this.addr = addr; 
    this.port = port; 
    dataMap = new HashMap<SocketChannel,List<byte[]>>(); 
    startServer(); 
} 

private void startServer() throws IOException { 
    // create selector and channel 
    this.selector = Selector.open(); 
    ServerSocketChannel serverChannel = ServerSocketChannel.open(); 
    serverChannel.configureBlocking(false); 

    // bind to port 
    InetSocketAddress listenAddr = new InetSocketAddress(this.addr, this.port); 
    serverChannel.socket().bind(listenAddr); 
    serverChannel.register(this.selector, SelectionKey.OP_ACCEPT); 

    log("Echo server ready. Ctrl-C to stop."); 

    // processing 
    while (true) { 
     // wait for events 
     this.selector.select(); 

     // wakeup to work on selected keys 
     Iterator keys = this.selector.selectedKeys().iterator(); 
     while (keys.hasNext()) { 
      SelectionKey key = (SelectionKey) keys.next(); 

      // this is necessary to prevent the same key from coming up 
      // again the next time around. 
      keys.remove(); 

      if (! key.isValid()) { 
       continue; 
      } 

      if (key.isAcceptable()) { 
       this.accept(key); 
      } 
      else if (key.isReadable()) { 
       this.read(key); 
      } 
      else if (key.isWritable()) { 
       this.write(key); 
      } 
     } 
    } 
} 

private void accept(SelectionKey key) throws IOException { 
    ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel(); 
    SocketChannel channel = serverChannel.accept(); 
    channel.configureBlocking(false); 

    // write welcome message 
    channel.write(ByteBuffer.wrap("Welcome, this is the echo server\r\n".getBytes("US-  ASCII"))); 

    Socket socket = channel.socket(); 
    SocketAddress remoteAddr = socket.getRemoteSocketAddress(); 
    log("Connected to: " + remoteAddr); 

    // register channel with selector for further IO 
    dataMap.put(channel, new ArrayList<byte[]>()); 
    channel.register(this.selector, SelectionKey.OP_READ); 
} 

private void read(SelectionKey key) throws IOException { 
    SocketChannel channel = (SocketChannel) key.channel(); 

    ByteBuffer buffer = ByteBuffer.allocate(8192); 
    int numRead = -1; 
    try { 
     numRead = channel.read(buffer); 
    } 
    catch (IOException e) { 
     e.printStackTrace(); 
    } 

    if (numRead == -1) { 
     this.dataMap.remove(channel); 
     Socket socket = channel.socket(); 
     SocketAddress remoteAddr = socket.getRemoteSocketAddress(); 
     log("Connection closed by client: " + remoteAddr); 
     channel.close(); 
     key.cancel(); 
     return; 
    } 

    byte[] data = new byte[numRead]; 
    System.arraycopy(buffer.array(), 0, data, 0, numRead); 
    log("Got: " + new String(data, "US-ASCII")); 

    // write back to client 
    doEcho(key, data); 
} 

private void write(SelectionKey key) throws IOException { 
    SocketChannel channel = (SocketChannel) key.channel(); 
    List<byte[]> pendingData = this.dataMap.get(channel); 
    Iterator<byte[]> items = pendingData.iterator(); 
    while (items.hasNext()) { 
     byte[] item = items.next(); 
     items.remove(); 
     channel.write(ByteBuffer.wrap(item)); 
    } 
    key.interestOps(SelectionKey.OP_READ); 
} 

private void doEcho(SelectionKey key, byte[] data) { 
    SocketChannel channel = (SocketChannel) key.channel(); 
    List<byte[]> pendingData = this.dataMap.get(channel); 
    pendingData.add(data); 
    key.interestOps(SelectionKey.OP_WRITE); 
} 

private static void log(String s) { 
    System.out.println(s); 
} 

    public static void main(String[] args) throws Exception { 
     new EchoServer(null, 8989); 
    } 
} 

所以對於這段代碼,我有幾個問題。一,如果我讀了10個字節,但我不想做任何事情,直到我讀了100個字節,我該如何實現?另外,如果我只想寫一個計數器達到一定數量時,我將如何實現該非阻塞?關於這個代碼的事情是,無論字節緩衝區有多大,它都會回顯。我該如何改變它,只有當它有100個字節時纔會回顯?我只有在計數器達到一定大小時才能寫入數據?謝謝!

將投入read方法工作的,如果(numRead < 100){做休息}其他{}回報的第一個問題? 此外,將投入一個if(計數器> 100){做休息}否則將在第二寫入方法工作{}的回報?

回答

0

你必須代碼部分,基本上你需要跟蹤的字節讀取,不斷增加讀取臨時緩衝區的字節,一旦你達到了你的要求限制,您可以通過該緩衝區您的工作線程。

我會建議你使用netty它提供了所有你正在尋找的開箱即用的東西。 看this link

希望這有助於

0

就與非阻塞模式返回-1通道,而讀書?對於你的問題,你可以設置ByteBuffer的限制: -

例如: -

ByteBuffer buff = ByteBuffer.allocate(1024); 
buff.clear(); 

buff.limit(your_limit);//is this what you want?? 

while(buff.remaining>0&&channel.read(buff)); // if will reach till your limit only. 

System.out.println(new String(buff.array())); 

希望這有助於