2011-07-20 58 views
1

我正在編寫一個程序來模擬使用Java NIO非阻塞套接字的P2P網絡上的對等節點。其思想是讓每個對等點使用相同的代碼發送和接收消息以及一個服務器,以允許對等方引導到網絡中。通過非阻塞Java NIO套接字傳遞的對象並不總是成功

我遇到的問題是,雖然最多有四個對等方可以成功加入網絡並相互交談(Ping,Pong,Query和QueryHit),但當我添加第五個對等方時,服務器始終會報告「StreamCorruptedException」 。我已經檢查過這個網站以及Java NIO代碼/教程網站的解決方案,但無濟於事。我知道通過非阻塞套接字發送對象並不容易/理想(特別是使用ObjectOutputStream和ObjectInputStream),但是我想盡量減少線程的使用(我也不想從頭開始重寫)。

我會首先展示最重要的方法(消息發送和接收),但如果需要,我可以稍後添加更多方法。

Write方法:

public void write(SelectionKey selKey){ 
    SocketChannel channel = (SocketChannel)selKey.channel(); 
    ArrayList<Message> queue = pending.get(channel); 

    ByteArrayOutputStream baos = new ByteArrayOutputStream(); 
    ObjectOutputStream oos; 
    try{ 
     oos = new ObjectOutputStream(baos); 
    }catch(Exception e){ 
     System.err.println("Could not create object output stream. Aborting..."); 
     return; 
    } 

    while(!queue.isEmpty()){    
     Message message = queue.get(0); 
     buffer.clear(); 
     try{ 
      oos.writeObject(message); 
      buffer = ByteBuffer.wrap(baos.toByteArray()); 
      channel.write(buffer); 
      oos.flush(); 
      baos.flush(); 
     }catch(Exception e){ 
      System.err.println("Could not parse object. Aborting..."); 
      queue.remove(0); 
      return; 
     } 
     queue.remove(0); 
    } 
    selKey.interestOps(SelectionKey.OP_READ); 
} 

而且讀法:

public Message read(SelectionKey selKey) throws IOException, ClassNotFoundException{ 
    SocketChannel channel = (SocketChannel)selKey.channel();   
    Message message = null; 

    buffer = ByteBuffer.allocate(8192); 

    int bytesRead = channel.read(buffer); 
    if(bytesRead > 0){ 
     buffer.flip(); 
     InputStream bais = new ByteArrayInputStream(buffer.array(), 0, buffer.limit()); 
     ObjectInputStream ois = new ObjectInputStream(bais); //Offending line. Produces the StreamCorruptedException. 
     message = (Message)ois.readObject(); 
     ois.close(); 
    } 

    return message; 
} 

任何幫助將不勝感激!

回答

3
int bytesRead = channel.read(buffer); 
if(bytesRead > 0){ 
    buffer.flip(); 
    InputStream bais = new ByteArrayInputStream(buffer.array(), 0, buffer.limit()); 
    ObjectInputStream ois = new ObjectInputStream(bais); //Offending line. Produces the StreamCorruptedException. 
    message = (Message)ois.readObject(); 
    ois.close(); 
} 

這裏是混合阻塞和非阻塞I/O。緩衝區不會徹底地從套接字讀取,它只會讀取可用的內容。如果你打算採取這種方法,你需要首先將所有數據讀入緩衝區。

+0

另一端存在同樣的問題:如果通道設置爲非阻塞模式,'channel.write(buffer)'可能不會寫入整個緩衝區。 –

+0

所以我猜我需要找出什麼大小的消息將繼續讀入緩衝區,直到它已滿?如果是這樣,我必須發送兩條消息:一條包含消息的大小,然後是消息本身? – Mike

+0

是的,這將工作。這是另一個答案中提到的帶外數據;流本身並不是設計用於對編碼對象的零件進行操作(這基本上是發生了什麼)。顯然,額外的對等點添加了足夠的數據,以前連續的數據塊正在被分割。 – NBJack

2

你不能假設你的讀取將會在一個塊中返回整個對象。你需要一個「out-of-band」協議(以確保你接收到完整的消息),並根據需要從讀取的塊中組裝消息。對於非阻塞而言,這將比現在更復雜。