2016-03-11 60 views
0

首先發布這裏,所以大家好。 我正在構建一個小型客戶端,需要通過套接字發送心跳,並通過套接字接收各種固定長度的二進制消息。 我有一個連接管理器類,用於啓動套接字連接並創建2個線程,每個線程每隔x秒發送一次心跳。另一個線程監聽輸入。Java ExecutorService線程寫入相同的字段

當它接收到輸入時,它將頭部讀入字節數組(總是4個字節),得到消息長度x = byte [3],然後讀取下一個x字節到消息字節數組。然後它使用ExecutorService創建一個新的messasge處理器對象,它接收消息字節數組。

messasge處理器實現可運行並打印出消息字節。 但是,如果套接字接收到大量數據,即服務器按順序發送消息,那麼我的消息處理器似乎會打印出不同線程中的混合數據 - 我認爲我的執行程序會創建一個新線程安全的對象,並具有自己的二進制實例[ ] msgBytes。

的ConnectionManager低於其產生髮送/接收線程,RCV線程創建一個線程池,並且當讀出消息創建新的MessageProcessor目的是處理字節[]消息

public class connectionManager extends Thread { 
    public connectionManager(InetAddress host, int serverSocket) { 
     System.out.println("in connectionManager create"); 
     try { 
      clientSocket = new Socket(host, serverSocket); 
      outToServer = new DataOutputStream(clientSocket.getOutputStream()); 
      connected = true; 
      final ExecutorService executor = Executors.newFixedThreadPool(10); 

      final Thread inThread = new Thread() { 
       @Override 
       public void run() { 
        while (connected) { 
         try { 
          DataInputStream dIn = new DataInputStream(clientSocket.getInputStream()); 

          byte[] header = new byte[4]; 
          dIn.readFully(header); // read the message        

          String msgType = new String(new byte[]{ header[3] }, "US-ASCII"); 

          short length = getShortFromLittleEndianRange(header, 1); //test function to return length offset is start position of length 
          byte[] message = new byte[length - 1]; //minus the msg type byte 

          dIn.readFully(message); 

          executor.execute(new MessageProcessor(message, msgType)); 
          } catch (Exception e) { 
           e.printStackTrace(); 
           connected = false; 
          } 
        } 
        executor.shutdown(); 
        System.out.println("Shutdown executor"); 
     }; 
      }; 
      inThread.start();   


      final Thread outThread = new Thread() { 
      @Override 
       public void run() { 
        Heartbeat hb = new Heartbeat(); 
        while(connected) { 
         PrintWriter out = null; 
         try { 
          this.sleep(3000);       
          outToServer.write(hb.serialize()); 
         } catch (Exception e) { 
          e.printStackTrace(); 
          System.out.println("cant send heartbeat server not alive..."); 
         } 
        } 
     }; 
      }; 
      outThread.start();    
     } catch (Exception e) { 
      System.out.println("Cannot connect to server"); 
      System.exit(0); 
     } 
    }  
} 

MessageProcess實現可運行我期望執行到創建新的對象MessageProcess各有自己的msgBytes的實例,並MSGTYPE

public class MessageProcessor implements Runnable { 

    private byte[] msgBytes; 
    private String msgType; 

    MessageProcessor(byte[] newMsgBytes, String newMsgType) { 
    msgBytes = newMsgBytes; 
    msgType = newMsgType; 
    }  

    @Override 
    public void run() { 
    output(); 
    } 

    synchronized void output() { 
     System.out.println("\nMessageProcessor process inbound message " + Thread.currentThread().getId()); 
     System.out.println("message type : " + " " + msgType); 
     for(byte b : msgBytes){ 
      System.out.printf("%02X",b); 
     }   
     System.out.println("\nfinished MessageProcessor " + Thread.currentThread().getId());   
    } 
} 

然而,當我運行並接收來自服務器我康索爾輸出看起來封郵件「洪水」,好像有一個線程安全ISSU e:下面的示例輸出。

MessageProcessor process inbound message 16 
message type : 8 

01D40806004530515266385779624C6D653830 
MessageProcessor process inbound message 17 
2D36667664374E6D6A4100000000000000003030515266444E31417A766846000000000000000000000000020000000000A90DAE0400000001000000000000000000000000FA10000000000100000000000000004C43484C4742324500000052D7048024E3F70400BCBDE256306F0100000203003030515266444E31417A766800000000message type : 8 


finished MessageProcessor 16 
01D50806004530515266385779624C6D6638302D366676643176584A5200000000000000003030515266444E31417A766246000000000000000000000000010000000000A90DAE0400000001000000090000000109000000FA10000000000200000000000000004C43484C4742324500000041D7048024E3F70400BCBDE256306F0100000003003030515266444E31417A766200000000 
finished MessageProcessor 17 

我希望看到

MessageProcessor process inbound message 16 
message type : 8 
<hex> 

finished MessageProcessor 16 

MessageProcessor process inbound message 17 
message type : 8 
<hex> 

finished MessageProcessor 17 

我不是在我在做什麼threas安全嗎?

非常感謝 馬特

回答

0

由於您使用的MessageProcessor(每封郵件一個)不同的情況下,上output()的​​標籤沒有我想您所期待的效果。每個實例將能夠訪問自己的output()而不被其他實例阻擋。

您看到的輸出是來自不同實例的輸出的混合。爲防止混音,您可以使用StringBuilder構建您的輸出,並在一次調用中將其提交給System.out。如果您想跨實例同步輸出,請考慮使用隊列。

+0

感謝您的快速響應 - 所以我線程安全,它只是一個輸出問題?即如果我要從消息字節創建一個消息對象,那麼每個消息對象都有正確的數據。 –

+0

'MessageProcessor'內的數據是安全的。假設你以安全的方式構造消息對象,它們將會很好。 – bradimus

+0

非常感謝bradimus我改成了StringBuilder,輸出看起來不錯。 –

相關問題