我有使用java nio的問題,並希望有很多java nio知識的人可以幫助我澄清一些錯誤概念。如何讓選擇器在Java中的socketchannel鍵更改nio
我正在使用java nio套接字。使用socketchannel.write()可能會填充寫入緩衝區。在這種情況下,剩餘的緩衝區排隊並且密鑰更改爲OP_WRITE。我的一個場景是隊列長度很長。每次調用selector.select()之前,我都會從另一個名爲pendingRequest的隊列中將密鑰更改爲OP_WRITE。但是我發現讀取過程非常緩慢,在發送處理完成後,有許多消息未寫入,並且仍然在隊列中。如何處理這個問題?
在我的代碼中,我有兩個寫作的地方。一個來自發生器:當它有消息要發佈時,它直接寫入頻道。如果緩衝區已滿,數據將被排隊。第二個地方是調度員:當密鑰可寫入時,它會調用write()來寫入排隊數據。我猜這兩部分可以競爭寫作。我只是覺得我的代碼缺少一些處理來配合兩次寫入。
有沒有解決上述問題的解決方案?我發現在我的代碼中有很多排隊的數據不能寫出來。當密鑰可寫時,生成器可能會再次寫入數據,這會導致排隊的數據變化較少而被寫出。如何使這部分正確?感謝
//在WriteListener()中,寫入代碼被以下三個部分
public synchronized int writeData(EventObject source) {
int n = 0;
int count = 0;
SocketChannel socket = (SocketChannel)source.getSource();
ByteBuffer buffer = ((WriteEvent)source).getBuffer();
try {
write(socket);
} catch (IOException e1) {
e1.printStackTrace();
}
while (buffer.position()>0) {
try {
buffer.flip();
n = socket.write(buffer);
if(n == 0) {
key.interestOps(SelectionKey.OP_WRITE); synchronized (this.pendingData) {
List<ByteBuffer> queue = (List<ByteBuffer>) this.pendingData.get(socket);
if(queue == null) {
queue = new ArrayList<ByteBuffer>();
this.pendingData.put(socket, queue);
}
queue.add(buffer);
logger.logInfo("queue length:" + queue.size());
}
break;
}
count += n;
} catch (IOException e) {
e.printStackTrace();
} finally {
buffer.compact();
}
}
if(buffer.position()==0) {
key.interestOps(SelectionKey.OP_READ);
}
return count;
}
// ====該寫入方法是用來寫排隊緩衝器
public synchronized int write(SocketChannel sc, ByteBuffer wbuf) {
int n = 0;
int count = 0;
SelectionKey key = sc.keyFor(this.dispatcher.getDemultiplexer().getDemux());
while (wbuf.position()>0) {
try {
wbuf.flip();
n = sc.write(wbuf);
if(n == 0) {
key.interestOps(SelectionKey.OP_WRITE);
synchronized (this.pendingData) {
List<ByteBuffer> queue = (List<ByteBuffer>) this.pendingData.get(sc);
if(queue == null) {
queue = new ArrayList<ByteBuffer>();
this.pendingData.put(sc, queue);
}
queue.add(wbuf);
}
break;
}
count += n;
} catch (IOException e) {
e.printStackTrace();
} finally {
wbuf.compact();
}
}
if(wbuf.position()==0) {
wbuf.clear();
key.interestOps(SelectionKey.OP_READ);
}
return n;
}
// ====當key.isWritable()爲真時,此方法是Dispatch的回調
public void write(SocketChannel socketChannel) throws IOException {
SelectionKey key = socketChannel.keyFor(this.dispatcher.getDemultiplexer().getDemux());
synchronized (this.pendingData) {
List<ByteBuffer> queue = (List<ByteBuffer>) this.pendingData.get(socketChannel);
if(queue == null || queue.isEmpty()) {
// We wrote away all data, so we're no longer interested
// in writing on this socket. Switch back to waiting for data.
try {
if (key!=null)
key.interestOps(SelectionKey.OP_READ);
} catch(Exception ex) {
if (key!=null)
key.cancel();
}
}
// Write until there's not more data ...
int n = 0;
while (queue != null && !queue.isEmpty()) {
ByteBuffer buf = (ByteBuffer) queue.get(0);
// zero length write, break the loop and wait for next writable time
n = write(socketChannel, buf);
logger.logInfo("queue length:" + queue.size() + " used time: " + (t2-t1) + " ms.");
if(n==0) {
break;
}
queue.remove(0);
}
}
請張貼一些代碼(最好是[SSCCE](http://sscce.org/)),證明您遇到的問題。 –
最後的方法,即寫入隊列的方法,總是從隊列中刪除緩衝區,即使是「if(n == 0)」。你應該在刪除緩衝區之前做這個測試並打破*,實際上你應該只在緩衝區爲空的時候刪除它。目前你正在丟失數據。 – EJP
在我當前的代碼中,在測試寫入的字節數後刪除(0)。目前的代碼工作,但性能不如它應該做的那樣好。在緩衝區滿後的下一個可寫時間需要多長時間? – susan