2011-05-11 154 views
6

我正在通過Java套接字實現一個面向事件的層,我在想是否有一種方法來確定是否有數據等待讀取。Java網絡:連接Socket/InputStream

我通常的做法是從套接字讀入緩衝區,並在緩衝區填充給定數量的字節時調用提供的回調函數(可能爲0,如果每次調用回調需要觸發到達),但我懷疑Java已經在爲我做緩衝了。

InputStream的available()方法對此可靠嗎?我應該只是read()並在Socket上做我自己的緩衝?還是有另一種方式?

回答

8

很快就放了,沒有。 available()不可靠(至少它不適合我)。我推薦使用java.nio.channels.SocketChannel連接SelectorSelectionKey。這個解決方案有點基於事件,但比簡單的套接字更復雜。

對於客戶端:

  1. 構建體插座通道(socket),打開一個選擇器(selector = Selector.open();)。
  2. 使用非阻塞socket.configureBlocking(false);
  3. 註冊選擇用於連接socket.register(selector, SelectionKey.OP_CONNECT);
  4. 連接socket.connect(new InetSocketAddress(host, port));
  5. 看看是否有什麼新selector.select();
  6. 如果「新」指的是連接成功,爲OP_READ註冊的選擇;如果「新」指的是可用的數據,只需從套接字讀取即可。

但是,爲了讓它是異步的,你需要設置一個單獨的線程(儘管套接字被創建爲非阻塞,線程仍然會阻塞)來檢查是否已經到達。

對於服務器,有ServerSocketChannel,您使用OP_ACCEPT

供參考,這是我的代碼(客戶端),應該給你一個提示:

private Thread readingThread = new ListeningThread(); 

/** 
    * Listening thread - reads messages in a separate thread so the application does not get blocked. 
    */ 
private class ListeningThread extends Thread { 
    public void run() { 
    running = true; 
    try { 
    while(!close) listen(); 
    messenger.close(); 
    } 
    catch(ConnectException ce) { 
    doNotifyConnectionFailed(ce); 
    } 
    catch(Exception e) { 
// e.printStackTrace(); 
    messenger.close(); 
    } 
    running = false; 
    } 
} 

/** 
    * Connects to host and port. 
    * @param host Host to connect to. 
    * @param port Port of the host machine to connect to. 
    */ 
public void connect(String host, int port) { 
    try { 
    SocketChannel socket = SocketChannel.open(); 
    socket.configureBlocking(false); 
    socket.register(this.selector, SelectionKey.OP_CONNECT); 
    socket.connect(new InetSocketAddress(host, port)); 
    } 
    catch(IOException e) { 
    this.doNotifyConnectionFailed(e); 
    } 
} 

/** 
    * Waits for an event to happen, processes it and then returns. 
    * @throws IOException when something goes wrong. 
    */ 
protected void listen() throws IOException { 
    // see if there are any new things going on 
    this.selector.select(); 
    // process events 
    Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); 
    while(iter.hasNext()) { 
    SelectionKey key = iter.next(); 
    iter.remove(); 
    // check validity 
    if(key.isValid()) { 
    // if connectable... 
    if(key.isConnectable()) { 
    // ...establish connection, make messenger, and notify everyone 
    SocketChannel client = (SocketChannel)key.channel(); 
    // now this is tricky, registering for OP_READ earlier causes the selector not to wait for incoming bytes, which results in 100% cpu usage very, very fast 
    if(client!=null && client.finishConnect()) { 
     client.register(this.selector, SelectionKey.OP_READ); 
    } 
    } 
    // if readable, tell messenger to read bytes 
    else if(key.isReadable() && (SocketChannel)key.channel()==this.messenger.getSocket()) { 
    // read message here 
    } 
    } 
    } 
} 

/** 
    * Starts the client. 
    */ 
public void start() { 
    // start a reading thread 
    if(!this.running) { 
    this.readingThread = new ListeningThread(); 
    this.readingThread.start(); 
    } 
} 

/** 
    * Tells the client to close at nearest possible moment. 
    */ 
public void close() { 
    this.close = true; 
} 

而對於服務器:

/** 
    * Constructs a server. 
    * @param port Port to listen to. 
    * @param protocol Protocol of messages. 
    * @throws IOException when something goes wrong. 
    */ 
public ChannelMessageServer(int port) throws IOException { 
    this.server = ServerSocketChannel.open(); 
    this.server.configureBlocking(false); 
    this.server.socket().bind(new InetSocketAddress(port)); 
    this.server.register(this.selector, SelectionKey.OP_ACCEPT); 
} 

/** 
    * Waits for event, then exits. 
    * @throws IOException when something goes wrong. 
    */ 
protected void listen() throws IOException { 
    // see if there are any new things going on 
    this.selector.select(); 
    // process events 
    Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); 
    while(iter.hasNext()) { 
    SelectionKey key = iter.next(); 
    // do something with the connected socket 
    iter.remove(); 
    if(key.isValid()) this.process(key); 
    } 
} 

/** 
    * Processes a selection key. 
    * @param key SelectionKey. 
    * @throws IOException when something is wrong. 
    */ 
protected void process(SelectionKey key) throws IOException { 
    // if incoming connection 
    if(key.isAcceptable()) { 
    // get client 
    SocketChannel client = (((ServerSocketChannel)key.channel()).accept()); 
    try { 
    client.configureBlocking(false); 
    client.register(this.selector, SelectionKey.OP_READ); 
    } 
    catch(Exception e) { 
    // catch 
    } 
    } 
    // if readable, tell messenger to read 
    else if(key.isReadable()) { 
    // read 
    } 
} 

希望這有助於。

+0

我不明白。你不需要一個單獨的線程。非阻塞套接字不會按照定義進行阻塞。只要正確使用OP_READ,並且在讀取時停止的正確讀取循環返回零。 – EJP

+0

@EJP:不反對;不過在我看來,無論阻塞如何,即使沒有任何可讀的內容,仍然會阻止來自套接字的讀取。但可能是我做錯了。我建議提問者按照你的說法去嘗試,如果它不起作用 - 嘗試線程。 – Sorrow

+0

你幾乎可以肯定的是循環,而read()返回零。這就是我提到它的原因。這不是阻塞,這是循環。 – EJP

0

available()只會告訴你,如果你可以讀取數據而不去操作系統。它在這裏不是很有用。

您可以根據自己的喜好進行封鎖或非封鎖閱讀。當沒有要讀取的數據時,非阻塞讀取會返回,這可能是您想要的。

+2

錯誤。 available()會告訴你BufferedInputStream/BufferedReader中的數據總和(如果你使用的是)以及套接字接收緩衝區,它是一個內核數據結構。如果數據只在套接字接收緩衝區中,那麼您必須'進入操作系統'來獲取它,但在這個過程中您不會*阻止*。正如Javadoc所說。但是,如果它是例如SSLSocket,則available()始終返回零。 – EJP