我正在通過Java套接字實現一個面向事件的層,我在想是否有一種方法來確定是否有數據等待讀取。Java網絡:連接Socket/InputStream
我通常的做法是從套接字讀入緩衝區,並在緩衝區填充給定數量的字節時調用提供的回調函數(可能爲0,如果每次調用回調需要觸發到達),但我懷疑Java已經在爲我做緩衝了。
InputStream的available()
方法對此可靠嗎?我應該只是read()
並在Socket上做我自己的緩衝?還是有另一種方式?
我正在通過Java套接字實現一個面向事件的層,我在想是否有一種方法來確定是否有數據等待讀取。Java網絡:連接Socket/InputStream
我通常的做法是從套接字讀入緩衝區,並在緩衝區填充給定數量的字節時調用提供的回調函數(可能爲0,如果每次調用回調需要觸發到達),但我懷疑Java已經在爲我做緩衝了。
InputStream的available()
方法對此可靠嗎?我應該只是read()
並在Socket上做我自己的緩衝?還是有另一種方式?
很快就放了,沒有。 available()
不可靠(至少它不適合我)。我推薦使用java.nio.channels.SocketChannel
連接Selector
和SelectionKey
。這個解決方案有點基於事件,但比簡單的套接字更復雜。
對於客戶端:
socket
),打開一個選擇器(selector = Selector.open();
)。socket.configureBlocking(false);
socket.register(selector, SelectionKey.OP_CONNECT);
socket.connect(new InetSocketAddress(host, port));
selector.select();
OP_READ
註冊的選擇;如果「新」指的是可用的數據,只需從套接字讀取即可。但是,爲了讓它是異步的,你需要設置一個單獨的線程(儘管套接字被創建爲非阻塞,線程仍然會阻塞)來檢查是否已經到達。
對於服務器,有ServerSocketChannel
,您使用OP_ACCEPT
。
供參考,這是我的代碼(客戶端),應該給你一個提示:
private Thread readingThread = new ListeningThread();
/**
* Listening thread - reads messages in a separate thread so the application does not get blocked.
*/
private class ListeningThread extends Thread {
public void run() {
running = true;
try {
while(!close) listen();
messenger.close();
}
catch(ConnectException ce) {
doNotifyConnectionFailed(ce);
}
catch(Exception e) {
// e.printStackTrace();
messenger.close();
}
running = false;
}
}
/**
* Connects to host and port.
* @param host Host to connect to.
* @param port Port of the host machine to connect to.
*/
public void connect(String host, int port) {
try {
SocketChannel socket = SocketChannel.open();
socket.configureBlocking(false);
socket.register(this.selector, SelectionKey.OP_CONNECT);
socket.connect(new InetSocketAddress(host, port));
}
catch(IOException e) {
this.doNotifyConnectionFailed(e);
}
}
/**
* Waits for an event to happen, processes it and then returns.
* @throws IOException when something goes wrong.
*/
protected void listen() throws IOException {
// see if there are any new things going on
this.selector.select();
// process events
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while(iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
// check validity
if(key.isValid()) {
// if connectable...
if(key.isConnectable()) {
// ...establish connection, make messenger, and notify everyone
SocketChannel client = (SocketChannel)key.channel();
// now this is tricky, registering for OP_READ earlier causes the selector not to wait for incoming bytes, which results in 100% cpu usage very, very fast
if(client!=null && client.finishConnect()) {
client.register(this.selector, SelectionKey.OP_READ);
}
}
// if readable, tell messenger to read bytes
else if(key.isReadable() && (SocketChannel)key.channel()==this.messenger.getSocket()) {
// read message here
}
}
}
}
/**
* Starts the client.
*/
public void start() {
// start a reading thread
if(!this.running) {
this.readingThread = new ListeningThread();
this.readingThread.start();
}
}
/**
* Tells the client to close at nearest possible moment.
*/
public void close() {
this.close = true;
}
而對於服務器:
/**
* Constructs a server.
* @param port Port to listen to.
* @param protocol Protocol of messages.
* @throws IOException when something goes wrong.
*/
public ChannelMessageServer(int port) throws IOException {
this.server = ServerSocketChannel.open();
this.server.configureBlocking(false);
this.server.socket().bind(new InetSocketAddress(port));
this.server.register(this.selector, SelectionKey.OP_ACCEPT);
}
/**
* Waits for event, then exits.
* @throws IOException when something goes wrong.
*/
protected void listen() throws IOException {
// see if there are any new things going on
this.selector.select();
// process events
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while(iter.hasNext()) {
SelectionKey key = iter.next();
// do something with the connected socket
iter.remove();
if(key.isValid()) this.process(key);
}
}
/**
* Processes a selection key.
* @param key SelectionKey.
* @throws IOException when something is wrong.
*/
protected void process(SelectionKey key) throws IOException {
// if incoming connection
if(key.isAcceptable()) {
// get client
SocketChannel client = (((ServerSocketChannel)key.channel()).accept());
try {
client.configureBlocking(false);
client.register(this.selector, SelectionKey.OP_READ);
}
catch(Exception e) {
// catch
}
}
// if readable, tell messenger to read
else if(key.isReadable()) {
// read
}
}
希望這有助於。
available()只會告訴你,如果你可以讀取數據而不去操作系統。它在這裏不是很有用。
您可以根據自己的喜好進行封鎖或非封鎖閱讀。當沒有要讀取的數據時,非阻塞讀取會返回,這可能是您想要的。
錯誤。 available()會告訴你BufferedInputStream/BufferedReader中的數據總和(如果你使用的是)以及套接字接收緩衝區,它是一個內核數據結構。如果數據只在套接字接收緩衝區中,那麼您必須'進入操作系統'來獲取它,但在這個過程中您不會*阻止*。正如Javadoc所說。但是,如果它是例如SSLSocket,則available()始終返回零。 – EJP
我不明白。你不需要一個單獨的線程。非阻塞套接字不會按照定義進行阻塞。只要正確使用OP_READ,並且在讀取時停止的正確讀取循環返回零。 – EJP
@EJP:不反對;不過在我看來,無論阻塞如何,即使沒有任何可讀的內容,仍然會阻止來自套接字的讀取。但可能是我做錯了。我建議提問者按照你的說法去嘗試,如果它不起作用 - 嘗試線程。 – Sorrow
你幾乎可以肯定的是循環,而read()返回零。這就是我提到它的原因。這不是阻塞,這是循環。 – EJP