2015-10-15 93 views
1

所以我最近在netty(5.0.0.Alpha2)上工作過一些,我非常喜歡它!但不幸的是,我無法發送/接收消息。奇怪的是,連接和斷開工作就像一個魅力。服務器收到客戶端連接/斷開連接的消息,並添加/刪除通道。只是消息不能正常工作。Netty:發送消息的問題

我嘗試了很多其他方式(例如沒有編碼器),但它從來沒有工作..也許有人在這裏有任何想法?我真的很感激它!

提前致謝!您可以在下面找到使用的所有的源代碼:

CoreClient.java

package me.creepsterlgc.coreclient; 

import io.netty.bootstrap.Bootstrap; 
import io.netty.channel.Channel; 
import io.netty.channel.ChannelFuture; 
import io.netty.channel.ChannelInitializer; 
import io.netty.channel.ChannelPipeline; 
import io.netty.channel.EventLoopGroup; 
import io.netty.channel.nio.NioEventLoopGroup; 
import io.netty.channel.socket.SocketChannel; 
import io.netty.channel.socket.nio.NioSocketChannel; 
import io.netty.handler.codec.DelimiterBasedFrameDecoder; 
import io.netty.handler.codec.Delimiters; 
import io.netty.handler.codec.string.StringDecoder; 
import io.netty.handler.codec.string.StringEncoder; 

public class CoreClient extends Thread { 

    private String host; 
    private int port; 

    private Channel channel; 

    public CoreClient(String host, int port) { 
     this.host = host; 
     this.port = port; 
    } 

    public void run() { 
     EventLoopGroup group = new NioEventLoopGroup(); 

     try { 
      Bootstrap bootstrap = new Bootstrap() 
      .group(group) 
      .channel(NioSocketChannel.class) 
      .handler(new ChannelInitializer<SocketChannel>() { 

        @Override 
        protected void initChannel(SocketChannel channel) throws Exception { 

         ChannelPipeline pipeline = channel.pipeline(); 
         pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter())); 
         pipeline.addLast("encoder", new StringEncoder()); 
         pipeline.addLast("decoder", new StringDecoder()); 
         pipeline.addLast("handler", new CoreClientHandler()); 

        } 

      }); 

      ChannelFuture f = bootstrap.connect(host, port); 
      channel = f.sync().channel(); 
      ChannelFuture cf = null; 
      try { 
       cf = channel.writeAndFlush("Testing..").sync(); 
      } catch (InterruptedException e) { 
       // TODO Auto-generated catch block 
       e.printStackTrace(); 
      } 
      if (!cf.isSuccess()) { 
       System.out.println("Send failed: " + cf.cause()); 
      } 

     } 
     catch (InterruptedException e) { 
      e.printStackTrace();  
     } 
     finally { 

     } 

    } 

    public void send(String message) { 
     ChannelFuture cf = null; 
     try { 
      cf = channel.writeAndFlush(message).sync(); 
     } catch (InterruptedException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 
     channel.flush(); 
     if (!cf.isSuccess()) { 
      System.out.println("Send failed: " + cf.cause()); 
     } 
    } 

    public void shutdown() { 

    } 

} 

CoreClientHandler.java

package me.creepsterlgc.coreclient; 

import io.netty.channel.Channel; 
import io.netty.channel.ChannelHandlerAdapter; 
import io.netty.channel.ChannelHandlerContext; 
import io.netty.channel.group.ChannelGroup; 
import io.netty.channel.group.DefaultChannelGroup; 
import io.netty.util.concurrent.GlobalEventExecutor; 

public class CoreClientHandler extends ChannelHandlerAdapter { 

    ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); 

    @Override 
    public void channelRead(ChannelHandlerContext context, Object message) throws Exception { 
     context.write(message); 
     Channel channel = context.channel(); 
     Log.message(channel.remoteAddress().toString(), message.toString()); 
    } 

    @Override 
    public void channelReadComplete(ChannelHandlerContext context) { 
     context.flush(); 
    } 

    @Override 
    public void exceptionCaught(ChannelHandlerContext context, Throwable cause) { 
     cause.printStackTrace(); 
     context.close(); 
    } 

} 

CoreServer.java

package me.creepsterlgc.coreserver; 

import io.netty.bootstrap.ServerBootstrap; 
import io.netty.channel.ChannelFuture; 
import io.netty.channel.ChannelInitializer; 
import io.netty.channel.ChannelPipeline; 
import io.netty.channel.EventLoopGroup; 
import io.netty.channel.nio.NioEventLoopGroup; 
import io.netty.channel.socket.SocketChannel; 
import io.netty.channel.socket.nio.NioServerSocketChannel; 
import io.netty.handler.codec.DelimiterBasedFrameDecoder; 
import io.netty.handler.codec.Delimiters; 
import io.netty.handler.codec.string.StringDecoder; 
import io.netty.handler.codec.string.StringEncoder; 

public class CoreServer extends Thread { 

    private int port; 

    public CoreServer(int port) { 
     this.port = port; 
    } 

    public void run() { 

     EventLoopGroup boss = new NioEventLoopGroup(); 
     EventLoopGroup worker = new NioEventLoopGroup(); 

     try { 
      ServerBootstrap bootstrap = new ServerBootstrap() 
      .group(boss, worker) 
      .channel(NioServerSocketChannel.class) 
      .childHandler(new ChannelInitializer<SocketChannel>() { 

        @Override 
        protected void initChannel(SocketChannel channel) throws Exception { 

         ChannelPipeline pipeline = channel.pipeline(); 
         pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter())); 
         pipeline.addLast("encoder", new StringEncoder()); 
         pipeline.addLast("decoder", new StringDecoder()); 
         pipeline.addLast("handler", new CoreServerHandler()); 

        } 

      }); 

      ChannelFuture f = bootstrap.bind(port).sync(); 
      f.channel().closeFuture().sync(); 

     } 
     catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 
     finally { 
      boss.shutdownGracefully(); 
      worker.shutdownGracefully(); 
     } 

    } 

} 

CoreServerHandler.java

package me.creepsterlgc.coreserver; 

import io.netty.buffer.ByteBuf; 
import io.netty.channel.Channel; 
import io.netty.channel.ChannelFuture; 
import io.netty.channel.ChannelHandlerAdapter; 
import io.netty.channel.ChannelHandlerContext; 
import io.netty.channel.group.ChannelGroup; 
import io.netty.channel.group.DefaultChannelGroup; 
import io.netty.util.ReferenceCountUtil; 
import io.netty.util.concurrent.GlobalEventExecutor; 

public class CoreServerHandler extends ChannelHandlerAdapter { 

    public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); 

    @Override 
    public void handlerAdded(ChannelHandlerContext context) { 
     Channel channel = context.channel(); 
     channels.add(channel); 
     Log.connect(channel.remoteAddress().toString()); 
     System.out.println("There are currently " + channels.size() + " clients connected."); 
     ChannelFuture cf = null; 
     cf = channel.write("Successfully connected to: master"); 
     channel.flush(); 
     if (!cf.isSuccess()) { 
      System.out.println("Send failed: " + cf.cause()); 
     } 
    } 

    @Override 
    public void handlerRemoved(ChannelHandlerContext context) { 
     Channel channel = context.channel(); 
     channels.remove(channel); 
     Log.disconnect(channel.remoteAddress().toString()); 
    } 

    @Override 
    public void channelRead(ChannelHandlerContext context, Object message) throws Exception { 
     context.write(message); 
     System.out.println("Received: "); 
     ByteBuf in = (ByteBuf) message; 
     try { 
      while (in.isReadable()) { 
       System.out.print((char) in.readByte()); 
       System.out.flush(); 
      } 
     } finally { 
      ReferenceCountUtil.release(message); 
     } 
     Channel channel = context.channel(); 
     Log.message(channel.remoteAddress().toString(), message.toString()); 
    } 

    @Override 
    public void channelReadComplete(ChannelHandlerContext context) { 
     context.flush(); 
    } 

    @Override 
    public void exceptionCaught(ChannelHandlerContext context, Throwable cause) { 
     cause.printStackTrace(); 
     context.close(); 
    } 

    public static void read() { 
     for(Channel channel : channels) channel.read(); 
    } 

} 
+0

什麼是網狀的版本使用的是? – Sudheera

+0

嗨Sudheera,Iam使用以下版本:5.0.0.Alpha2 – CreepsterLGC

回答

0

此時應更換

try { 
    cf = channel.write("Testing..").sync(); 
} catch (InterruptedException e) { 
    // TODO Auto-generated catch block 
    e.printStackTrace(); 
} 
channel.flush(); 

有:

try { 
    cf = channel.writeAndFlush("Testing..").sync(); 
} catch (InterruptedException e) { 
    // TODO Auto-generated catch block 
    e.printStackTrace(); 
} 
+0

嗨,諾曼,謝謝你的anwser!我換成了channel.flush();與直接.writeAndFlush();但似乎還沒有消息發送。 Iam通過兩次客戶端獲取以下日誌:http://pastebin.com/yaYewqUY – CreepsterLGC