0
我正在編寫一個NIO服務器,現在要測試我正在連接到自己,我已將它寫入客戶端發送握手協議的位置,並且服務器必須響應,但是當我嘗試讀取握手,即使有數據等待讀取,我所得到的仍是字節緩衝區中的空字節。我已附上以下協議代碼準備讀取時,NIO字節緩衝區有空字節
public class HooMENodeProtocol implements TCPProtocol {
private int bufSize; // Size of I/O buffer
/**
* initializes buffer size
* @param bufSize the size of the byte buffer
*/
public HooMENodeProtocol(int bufSize) {
this.bufSize = bufSize;
}
/**
* accepts incoming network connections
*/
public void handleAccept(SelectionKey key) throws IOException {
SocketChannel clntChan = ((ServerSocketChannel) key.channel()).accept();
clntChan.configureBlocking(false); // Must be nonblocking to register
// Register the selector with new channel for read and attach byte buffer
clntChan.register(key.selector(), SelectionKey.OP_READ, new HooMEAttachment(ByteBuffer.allocate(bufSize),
new LinkedBlockingQueue<HooMEMessage>(Consts.SEND_QUEUE_CAPACITY), false));
}
/**
* handles read connections for channels
*/
public void handleRead(SelectionKey key) throws IOException {
// Client socket channel has pending data
SocketChannel clntChan = (SocketChannel) key.channel();
ByteBuffer buf = ((HooMEAttachment) key.attachment()).getBuffer();
buf.flip();
long bytesRead = clntChan.read(buf);
if (bytesRead == -1) { // Did the other end close?
clntChan.close();
} else if (bytesRead > 0 && !HooMEUtilities.isEmpty(buf.array())) {
System.out.println(Arrays.toString(buf.array()));
InputStream anInputStream = new ByteArrayInputStream (buf.array());
MessageInput anInput = new MessageInput (anInputStream);
//if we are supposed to receive a handshake
if(!((HooMEAttachment)key.attachment()).getSendHandshake()&&!((HooMEAttachment)key.attachment()).connectionOk()) {
String greeting = null;
try {
System.out.println("got a handshake");
greeting = anInput.readString();
anInput.readString();
} //consume the second new line character
catch (BadAttributeValueException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
if(greeting.equals(Consts.HANDSHAKE_GREETING)) {
key.interestOps(SelectionKey.OP_WRITE);
} else {
//handshake greeting was wrong we will terminate this connection
key.channel().close();
key.cancel();
}
// if we are supposed to be reading an acknowledgment
} else if(((HooMEAttachment)key.attachment()).getSendHandshake() && !((HooMEAttachment)key.attachment()).connectionOk()) {
String response = null;
try {
response = anInput.readString();
if(response.equals(Consts.HANDSHAKE_RESPONSE)) {
((HooMEAttachment)key.attachment()).setConnectionOk(true);
key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
anInput.readString(); //consume the second newline character
} else {
key.channel().close();
key.cancel();
}
} catch (BadAttributeValueException e) {
e.printStackTrace();
}
} else {
HooMEMessage aMessage= null;
try {
aMessage = HooMEMessage.decode(anInput);
//if we have a response sent back
if(aMessage instanceof HooMEResponse || aMessage.getTtl()>0) { //treat all responses as comcast
//look up id of message in the forwarding table
for (Entry<byte[], InetAddress> entry : HooMENode.mappingTable.entrySet()) {
if(Arrays.equals(aMessage.getId(), entry.getKey())) { //if we have a mapping for this id in our routing table
//loop through our neighbours and find the one we want
for(SelectionKey aKey:key.selector().keys()) {
if(aKey.channel() instanceof SocketChannel &&
((SocketChannel)aKey.channel()).socket().getInetAddress()==entry.getValue()) {
aMessage.setTtl(aMessage.getTtl()-1);
((HooMEAttachment)key.attachment()).getSendQueue().add(aMessage);
break;
}
}
}
}
} else if (aMessage instanceof HooMESearch) { //if a search message has been sent .
//broadcast service requested broadcast the search to all and sundry
if(aMessage.getRoutingService().getServiceCode()==0 &&aMessage.getTtl()>0) {
aMessage.setTtl(aMessage.getTtl()-1);
for(SelectionKey aKey:key.selector().keys()) {
if(aKey.attachment()!=null && aKey.attachment() instanceof HooMEAttachment) {
((HooMEAttachment)aKey.attachment()).getSendQueue().add(aMessage);
}
}
//place mapping for the broadcast message in the mapping table
HooMENode.mappingTable.put(aMessage.getId(), ((SocketChannel)key.channel()).socket().getInetAddress());
}
//respond to the search
List<Result> searchResults = HooMENode.getFileResults(((HooMESearch)aMessage).getSearchString());
HooMEMessage response = new HooMEResponse(aMessage.getId(), 1, RoutingService.getRoutingService(1), aMessage.getDestinationHooMEAddress(), aMessage.getSourceHooMEAddress(),
new InetSocketAddress(HooMENode.address,HooMENode.downloadPort));
((HooMEResponse)response).getResultList().addAll(searchResults);
((HooMEAttachment)key.attachment()).getSendQueue().add(response);
}
// Indicate via key that reading/writing are both of interest now.
key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
}catch (IOException e) {
e.printStackTrace();
} catch (BadAttributeValueException e) {
System.out.println(e.getMessage());
}
}
}
}
/**
* handles write connections for channels
*/
public void handleWrite(SelectionKey key) throws IOException {
OutputStream anOutput = new ByteArrayOutputStream();
ByteBuffer buf = ((HooMEAttachment) key.attachment()).getBuffer();
/*
* Channel is available for writing, and key is valid (i.e., client channel
* not closed).
*/
//we are supposed to send an acknowledgement of a handshake
if(!((HooMEAttachment)key.attachment()).getSendHandshake()&&!((HooMEAttachment)key.attachment()).connectionOk()) {
anOutput.write((Consts.HANDSHAKE_RESPONSE+Consts.EOLN+Consts.EOLN).getBytes(HooMEMessage.CHARENCODING));
//confirm that the connection is ok
((HooMEAttachment)key.attachment()).setConnectionOk(true);
buf.put(((ByteArrayOutputStream)anOutput).toByteArray());
SocketChannel clntChan = (SocketChannel) key.channel();
clntChan.write(buf);
System.out.println(Arrays.toString(buf.array()));
if(!buf.hasRemaining()) {
key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
}
//if we are supposed to be sending an handshake
} else if(((HooMEAttachment)key.attachment()).getSendHandshake() && !((HooMEAttachment)key.attachment()).connectionOk()) {
anOutput.write((Consts.HANDSHAKE_GREETING+Consts.EOLN+Consts.EOLN).getBytes(HooMEMessage.CHARENCODING));
buf.put(((ByteArrayOutputStream)anOutput).toByteArray());
SocketChannel clntChan = (SocketChannel) key.channel();
clntChan.write(buf);
System.out.println(Arrays.toString(buf.array()));
if(!buf.hasRemaining()) {
key.interestOps(SelectionKey.OP_READ);
}
} else {
//check the send queue for messages that can be sent if there are none then do nothing
HooMEMessage aMessage = ((HooMEAttachment) key.attachment()).getSendQueue().poll();
if(aMessage != null) {
MessageOutput aMessageOutput = new MessageOutput(anOutput);
aMessage.encode(aMessageOutput);
}
buf.put(((ByteArrayOutputStream)anOutput).toByteArray());
SocketChannel clntChan = (SocketChannel) key.channel();
clntChan.write(buf);
System.out.println(Arrays.toString(buf.array()));
if(!buf.hasRemaining()) {
key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
}
}
// Make room for more data to be read in
}
}