首先發布這裏,所以大家好。 我正在構建一個小型客戶端,需要通過套接字發送心跳,並通過套接字接收各種固定長度的二進制消息。 我有一個連接管理器類,用於啓動套接字連接並創建2個線程,每個線程每隔x秒發送一次心跳。另一個線程監聽輸入。Java ExecutorService線程寫入相同的字段
當它接收到輸入時,它將頭部讀入字節數組(總是4個字節),得到消息長度x = byte [3],然後讀取下一個x字節到消息字節數組。然後它使用ExecutorService創建一個新的messasge處理器對象,它接收消息字節數組。
messasge處理器實現可運行並打印出消息字節。 但是,如果套接字接收到大量數據,即服務器按順序發送消息,那麼我的消息處理器似乎會打印出不同線程中的混合數據 - 我認爲我的執行程序會創建一個新線程安全的對象,並具有自己的二進制實例[ ] msgBytes。
的ConnectionManager低於其產生髮送/接收線程,RCV線程創建一個線程池,並且當讀出消息創建新的MessageProcessor目的是處理字節[]消息
public class connectionManager extends Thread {
public connectionManager(InetAddress host, int serverSocket) {
System.out.println("in connectionManager create");
try {
clientSocket = new Socket(host, serverSocket);
outToServer = new DataOutputStream(clientSocket.getOutputStream());
connected = true;
final ExecutorService executor = Executors.newFixedThreadPool(10);
final Thread inThread = new Thread() {
@Override
public void run() {
while (connected) {
try {
DataInputStream dIn = new DataInputStream(clientSocket.getInputStream());
byte[] header = new byte[4];
dIn.readFully(header); // read the message
String msgType = new String(new byte[]{ header[3] }, "US-ASCII");
short length = getShortFromLittleEndianRange(header, 1); //test function to return length offset is start position of length
byte[] message = new byte[length - 1]; //minus the msg type byte
dIn.readFully(message);
executor.execute(new MessageProcessor(message, msgType));
} catch (Exception e) {
e.printStackTrace();
connected = false;
}
}
executor.shutdown();
System.out.println("Shutdown executor");
};
};
inThread.start();
final Thread outThread = new Thread() {
@Override
public void run() {
Heartbeat hb = new Heartbeat();
while(connected) {
PrintWriter out = null;
try {
this.sleep(3000);
outToServer.write(hb.serialize());
} catch (Exception e) {
e.printStackTrace();
System.out.println("cant send heartbeat server not alive...");
}
}
};
};
outThread.start();
} catch (Exception e) {
System.out.println("Cannot connect to server");
System.exit(0);
}
}
}
MessageProcess實現可運行我期望執行到創建新的對象MessageProcess各有自己的msgBytes的實例,並MSGTYPE
public class MessageProcessor implements Runnable {
private byte[] msgBytes;
private String msgType;
MessageProcessor(byte[] newMsgBytes, String newMsgType) {
msgBytes = newMsgBytes;
msgType = newMsgType;
}
@Override
public void run() {
output();
}
synchronized void output() {
System.out.println("\nMessageProcessor process inbound message " + Thread.currentThread().getId());
System.out.println("message type : " + " " + msgType);
for(byte b : msgBytes){
System.out.printf("%02X",b);
}
System.out.println("\nfinished MessageProcessor " + Thread.currentThread().getId());
}
}
然而,當我運行並接收來自服務器我康索爾輸出看起來封郵件「洪水」,好像有一個線程安全ISSU e:下面的示例輸出。
MessageProcessor process inbound message 16
message type : 8
01D40806004530515266385779624C6D653830
MessageProcessor process inbound message 17
2D36667664374E6D6A4100000000000000003030515266444E31417A766846000000000000000000000000020000000000A90DAE0400000001000000000000000000000000FA10000000000100000000000000004C43484C4742324500000052D7048024E3F70400BCBDE256306F0100000203003030515266444E31417A766800000000message type : 8
finished MessageProcessor 16
01D50806004530515266385779624C6D6638302D366676643176584A5200000000000000003030515266444E31417A766246000000000000000000000000010000000000A90DAE0400000001000000090000000109000000FA10000000000200000000000000004C43484C4742324500000041D7048024E3F70400BCBDE256306F0100000003003030515266444E31417A766200000000
finished MessageProcessor 17
我希望看到
MessageProcessor process inbound message 16
message type : 8
<hex>
finished MessageProcessor 16
MessageProcessor process inbound message 17
message type : 8
<hex>
finished MessageProcessor 17
我不是在我在做什麼threas安全嗎?
非常感謝 馬特
感謝您的快速響應 - 所以我線程安全,它只是一個輸出問題?即如果我要從消息字節創建一個消息對象,那麼每個消息對象都有正確的數據。 –
'MessageProcessor'內的數據是安全的。假設你以安全的方式構造消息對象,它們將會很好。 – bradimus
非常感謝bradimus我改成了StringBuilder,輸出看起來不錯。 –