我有一個服務器正在監聽UDP端口上的數據,並且正在使用netty。我的服務器有自己的幀解碼器,執行處理程序和業務特定的處理程序。 我的測試客戶端是用java DatagramSocket(不使用netty)製作的,只是在收聽回覆之前發送數據。看下面的例子。與netty一起使用UDP時,服務器只能返回一個答案
在我的第一個版本中,我沒有在我的服務器上發回任何回覆,並且我能夠運行數千個併發客戶端發送重複請求,沒有任何問題。我的服務器正在完美處理成千上萬的請求。
然後,我不得不在服務器端收到我的業務特定處理程序中收到的每個請求的回覆。我只是遵循netty方面提供的例子,即將我的數據寫入事件緩衝區。在客戶端,我重用了發送數據的相同udp套接字來接收響應。這個機制工作得很好,一個客戶端向我的服務器發送了數百個連續的請求,這些請求對每個請求進行了正確回答然而,目前我的客戶端關閉(而不是關閉套接字),我的服務器停止接受任何其他請求。我不得不重新啓動我的服務器再次運行該過程。當然,在運行多個併發客戶端時,只有一個可以正常工作,而且只能運行一次。
我用wireshark來分析發送和接收的數據。所有數據都正確地發送到服務器,並且服務器在第一次運行客戶端時正確地返回回覆。但是,目前我阻止我的客戶嘗試重新運行它,我的服務器停下來處理數據。使用wireshark,我可以看到數據真正發送到服務器,它確實是停止處理數據的服務器本身。
有人能告訴我在那個例子中我做錯了什麼嗎?
這裏是我的客戶的簡單版本:
public class UDPClientTester
{
public void sendAndReceiveClientTester() throws IOException
{
final InetAddress lIpAddress = InetAddress.getByName("localhost");
int lPort = 50000;
int lNbrRepeat = 1;
byte[] lDataToSendAsBytes = {1,2,3,4,5,6,7,8,9,0,1,2,3,4,5,6,7,8,9,0}; //new byte[20];
byte[] lReceivedData = new byte[22];
final DatagramSocket udpSocket = new DatagramSocket();
DatagramPacket lSendPacket = new DatagramPacket(lDataToSendAsBytes, lDataToSendAsBytes.length, lIpAddress, lPort);
DatagramPacket lReceivePacket = new DatagramPacket(lReceivedData, 22);
// No metter the number of repeats, the server will accept all requests and return a reply for each one.
for (byte k = 0; k < lNbrRepeat; k++)
{
// Send data...
udpSocket.send(lSendPacket);
// Receive response... Block here until a response is received.
udpSocket.receive(lReceivePacket);
}
udpSocket.close();
// At the moment I close the socket, the server stop to accept anymore requests.
// I cannot run this example again until I restart the server!
}
}
這裏是我的服務器的簡單版本,只顯示main()和處理程序(不顯示解碼器):
public class TesterChannelHandler extends SimpleChannelHandler
{
public void main(final String[] args) throws Exception
{
DatagramChannelFactory f = new NioDatagramChannelFactory(Executors.newCachedThreadPool());
ConnectionlessBootstrap b = new ConnectionlessBootstrap(f);
// Configure the pipeline factory.
b.setPipelineFactory(new ChannelPipelineFactory() {
ExecutionHandler executionHandler = new ExecutionHandler(new OrderedMemoryAwareThreadPoolExecutor(16, 1048576, 1048576))
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(
new OuterFrameDecoder(null),
executionHandler,
new TesterChannelHandler());
}
});
// Enable broadcast
b.setOption("broadcast", "false");
b.setOption("receiveBufferSizePredictorFactory",
new FixedReceiveBufferSizePredictorFactory(1024));
// Bind to the port and start the service.
b.bind(new InetSocketAddress(50000));
}
public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent e)
{
Channel lChannel = e.getChannel();
SocketAddress lRemoteAddress = e.getRemoteAddress();
byte[] replyFrame = {0,9,8,7,6,5,4,3,2,1};
ChannelBuffer lReceivedBuffer = (ChannelBuffer) e.getMessage();
ChannelBuffer lReplyBuffer = ChannelBuffers.wrappedBuffer(replyFrame);
if (lChannel != null && lChannel.isWritable() && lRemoteAddress != null)
{
// OK, I FOUND THE ERROR!! I DO NOT NEED TO CONNECT THE CHANNEL! I MUST USE THE REMOTE SOCKET ADDRESS ATTACHED TO THE EVENT AND PASS IT TO THE WRITE METHOD BELOW.
if (!e.getChannel().isConnected()) e.getChannel().connect(e.getRemoteAddress());
// BELOW I SHOULD PASS THE REMOTE SOCKET ADDRESS ATTACHED TO THE EVENT AS THE SECOND ARGUMENT OF THE WRITE METHOD.
e.getChannel().write(lReplyBuffer);
}
}
}