2013-08-23 37 views
2

免責聲明 - 我不是Java程序員。賠率是我需要做任何建議給我的功課,但我會很樂意這樣做:)Netty和BoneCP /基本Socket服務器的更多concuser用戶

這就是說,我寫了一個完整的數據庫支持的套接字服務器,這對我的小測試工作得很好,並且現在我正準備首次發佈。由於我不太瞭解Java/Netty/BoneCP,我不知道在我的服務器出現問題之前,是否會在某個地方犯了一個根本性的錯誤。

例如,我不知道執行者組的確切作用以及我應該使用的編號。是否可以將BoneCP作爲單例實現,是否真的有必要爲每個數據庫查詢擁有所有這些try/catch?等

我試圖減少我的整個服務器的一個基本的例子,其操作方式與真實的東西(我剝離了所有的文本,沒有在java本身測試,所以請原諒任何語法錯誤,由於那)。

其基本思想是客戶端可以連接,與服務器交換消息,斷開其他客戶端,並無限期地保持連接,直到他們選擇或被迫斷開連接。 (客戶端將每分鐘發送一次ping消息以保持連接正常)

唯一的主要區別是,除了未驗證此示例外,clientID是如何設置的(安全地假設它對於每個連接的客戶端來說是真正唯一的),並且存在一些更多的商業邏輯檢查值等

底線-可以做任何事情來改善這個,所以它可以處理最可能的併發用戶?謝謝!


//MAIN 
public class MainServer { 
    public static void main(String[] args) { 
     EdgeController edgeController = new EdgeController(); 
     edgeController.connect(); 
    } 
} 


//EdgeController 
public class EdgeController { 

    public void connect() throws Exception { 
     ServerBootstrap b = new ServerBootstrap(); 
     ChannelFuture f; 


     try { 
      b.group(new NioEventLoopGroup(), new NioEventLoopGroup()) 
        .channel(NioServerSocketChannel.class) 
        .localAddress(9100) 
        .childOption(ChannelOption.TCP_NODELAY, true) 
        .childOption(ChannelOption.SO_KEEPALIVE, true) 
        .childHandler(new EdgeInitializer(new DefaultEventExecutorGroup(10))); 


      // Start the server. 
      f = b.bind().sync(); 

      // Wait until the server socket is closed. 
      f.channel().closeFuture().sync(); 

     } finally { //Not quite sure how to get here yet... but no matter 
      // Shut down all event loops to terminate all threads. 
      b.shutdown(); 

     } 
    } 
} 

//EdgeInitializer 
public class EdgeInitializer extends ChannelInitializer<SocketChannel> { 
    private EventExecutorGroup executorGroup; 

    public EdgeInitializer(EventExecutorGroup _executorGroup) { 
     executorGroup = _executorGroup; 
    } 

    @Override 
    public void initChannel(SocketChannel ch) throws Exception { 
     ChannelPipeline pipeline = ch.pipeline(); 

     pipeline.addLast("idleStateHandler", new IdleStateHandler(200,0,0)); 
     pipeline.addLast("idleStateEventHandler", new EdgeIdleHandler()); 
     pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.nulDelimiter())); 
     pipeline.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8)); 
     pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8)); 
     pipeline.addLast(this.executorGroup, "handler", new EdgeHandler()); 
    }  
} 

//EdgeIdleHandler 
public class EdgeIdleHandler extends ChannelHandlerAdapter { 
    private static final Logger logger = Logger.getLogger(EdgeIdleHandler.class.getName()); 


    @Override 
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception{ 
     if(evt instanceof IdleStateEvent) { 
      ctx.close(); 
     } 
    } 

    private void trace(String msg) { 
     logger.log(Level.INFO, msg); 
    } 

} 

//DBController 
public enum DBController { 
    INSTANCE; 

    private BoneCP connectionPool = null; 
    private BoneCPConfig connectionPoolConfig = null; 

    public boolean setupPool() { 
     boolean ret = true; 

     try { 
      Class.forName("com.mysql.jdbc.Driver"); 

      connectionPoolConfig = new BoneCPConfig(); 
      connectionPoolConfig.setJdbcUrl("jdbc:mysql://" + DB_HOST + ":" + DB_PORT + "/" + DB_NAME); 
      connectionPoolConfig.setUsername(DB_USER); 
      connectionPoolConfig.setPassword(DB_PASS); 

      try { 
       connectionPool = new BoneCP(connectionPoolConfig); 
      } catch(SQLException ex) { 
       ret = false; 
      } 

     } catch(ClassNotFoundException ex) { 
      ret = false; 
     } 

     return(ret); 
    } 

    public Connection getConnection() { 
     Connection ret; 

     try { 
      ret = connectionPool.getConnection(); 
     } catch(SQLException ex) { 
      ret = null; 
     } 

     return(ret); 
    } 
} 

//EdgeHandler 
public class EdgeHandler extends ChannelInboundMessageHandlerAdapter<String> { 

    private final Charset CHARSET_UTF8 = Charset.forName("UTF-8"); 
    private long clientID; 
    static final ChannelGroup channels = new DefaultChannelGroup(); 

    @Override 
    public void channelActive(ChannelHandlerContext ctx) throws Exception { 
     Connection dbConnection = null; 
     Statement statement = null; 
     ResultSet resultSet = null; 
     String query; 
     Boolean okToPlay = false; 


     //Check if status for ID #1 is true 
     try { 
      query = "SELECT `Status` FROM `ServerTable` WHERE `ID` = 1"; 

      dbConnection = DBController.INSTANCE.getConnection(); 
      statement = dbConnection.createStatement(); 
      resultSet = statement.executeQuery(query); 

      if (resultSet.first()) { 
       if (resultSet.getInt("Status") > 0) { 
        okToPlay = true; 
       } 
      } 
     } catch (SQLException ex) { 
      okToPlay = false; 
     } finally { 
      if (resultSet != null) { 
       try { 
        resultSet.close(); 
       } catch (SQLException logOrIgnore) { 
       } 
      } 
      if (statement != null) { 
       try { 
        statement.close(); 
       } catch (SQLException logOrIgnore) { 
       } 
      } 
      if (dbConnection != null) { 
       try { 
        dbConnection.close(); 
       } catch (SQLException logOrIgnore) { 
       } 
      } 
     } 

     if (okToPlay) { 
      //clientID = setClientID(); 
      sendCommand(ctx, "HELLO", "WORLD"); 
     } else { 
      sendErrorAndClose(ctx, "CLOSED"); 
     } 
    } 

    @Override 
    public void channelInactive(ChannelHandlerContext ctx) throws Exception { 
     channels.remove(ctx.channel()); 
    } 

    @Override 
    public void messageReceived(ChannelHandlerContext ctx, String request) throws Exception { 
     // Generate and write a response. 
     String[] segments_whitespace; 
     String command, command_args; 

     if (request.length() > 0) { 

      segments_whitespace = request.split("\\s+"); 
      if (segments_whitespace.length > 1) { 
       command = segments_whitespace[0]; 
       command_args = segments_whitespace[1]; 

       if (command.length() > 0 && command_args.length() > 0) { 
        switch (command) { 
         case "HOWDY": processHowdy(ctx, command_args); break; 
         default: break; 
        } 
       } 
      } 
     } 
    } 

    @Override 
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 
     TraceUtils.severe("Unexpected exception from downstream - " + cause.toString()); 
     ctx.close(); 
    } 

    /*          */ 
    /*  STATES -/CLIENT SETUP  */ 
    /*          */ 
    private void processHowdy(ChannelHandlerContext ctx, String howdyTo) { 
     Connection dbConnection = null; 
     Statement statement = null; 
     ResultSet resultSet = null; 
     String replyBack = null; 

     try { 
      dbConnection = DBController.INSTANCE.getConnection(); 
      statement = dbConnection.createStatement(); 
      resultSet = statement.executeQuery("SELECT `to` FROM `ServerTable` WHERE `To`='" + howdyTo + "'"); 

      if (resultSet.first()) { 
       replyBack = "you!"; 
      } 
     } catch (SQLException ex) { 
     } finally { 
      if (resultSet != null) { 
       try { 
        resultSet.close(); 
       } catch (SQLException logOrIgnore) { 
       } 
      } 
      if (statement != null) { 
       try { 
        statement.close(); 
       } catch (SQLException logOrIgnore) { 
       } 
      } 
      if (dbConnection != null) { 
       try { 
        dbConnection.close(); 
       } catch (SQLException logOrIgnore) { 
       } 
      } 
     } 

     if (replyBack != null) { 
      sendCommand(ctx, "HOWDY", replyBack); 
     } else { 
      sendErrorAndClose(ctx, "ERROR"); 
     } 
    } 

    private boolean closePeer(ChannelHandlerContext ctx, long peerClientID) { 
     boolean success = false; 
     ChannelFuture future; 

     for (Channel c : channels) { 
      if (c != ctx.channel()) { 
       if (c.pipeline().get(EdgeHandler.class).receiveClose(c, peerClientID)) { 
        success = true; 
        break; 
       } 
      } 
     } 

     return (success); 

    } 

    public boolean receiveClose(Channel thisChannel, long remoteClientID) { 
     ChannelFuture future; 
     boolean didclose = false; 
     long thisClientID = (clientID == null ? 0 : clientID); 

     if (remoteClientID == thisClientID) { 
      future = thisChannel.write("CLOSED BY PEER" + '\n'); 
      future.addListener(ChannelFutureListener.CLOSE); 

      didclose = true; 
     } 

     return (didclose); 
    } 


    private ChannelFuture sendCommand(ChannelHandlerContext ctx, String cmd, String outgoingCommandArgs) { 
     return (ctx.write(cmd + " " + outgoingCommandArgs + '\n')); 
    } 

    private ChannelFuture sendErrorAndClose(ChannelHandlerContext ctx, String error_args) { 

     ChannelFuture future = sendCommand(ctx, "ERROR", error_args); 

     future.addListener(ChannelFutureListener.CLOSE); 

     return (future); 
    } 
} 

回答

2

當網絡消息在到達服務器,它會被解碼,並會釋放出的messageReceived事件。

如果你看看你的管道,最後添加到管道的東西是執行器。由於該執行者將接收到已解碼的內容並釋放messageReceived事件。

執行者是事件的處理器,服務器會告訴通過他們發生了哪些事件。所以執行者如何被使用是一個重要的主題。如果只有一個執行者,並且正因爲如此,所有使用這個執行者的客戶端,都會有一個使用這個執行者的隊列。

當執行者很多時,事件的處理時間會減少,因爲不會有任何等待執行者自由的事件。

在你的代碼

新DefaultEventExecutorGroup(10)

意味着這個ServerBootstrap將在其一生都只能使用10執行人。

當初始化新信道,使用同一組執行:

pipeline.addLast(此。executorGroup,「處理程序」,新的EdgeHandler());

因此,每個新的客戶端通道將使用相同的執行程序組(10個執行程序線程)。

如果10個線程能夠正確處理傳入事件,那麼這是高效且足夠的。但是,如果我們可以看到消息正在被解碼/編碼,但不能很快處理爲事件,那意味着需要增加消息的數量。

我們可以增加執行人的數量從10個到100這樣的:

新DefaultEventExecutorGroup(100)

所以將處理事件隊列速度更快,如果有足夠的CPU處理能力。

什麼不應該做的是創造每個新頻道的新執行人:

pipeline.addLast(新DefaultEventExecutorGroup(10), 「處理程序」,新EdgeHandler());

上方爲每個新通道創建一個新的執行程序組,這會大大減慢速度,例如,如果有3000個客戶端,則會有3000個executorgroups(線程)。這是消除NIO的主要優勢,使用低線程的能力。

與其爲每個通道創建1個執行程序,我們可以在啓動時創建3000個執行程序,並且每次客戶端連接/斷開連接時至少不會刪除和創建它們。

.childHandler(new EdgeInitializer(new DefaultEventExecutorGroup(3000)));

上面的行比創建1個執行爲每個客戶端更可接受的,因爲所有的客戶端都連接到相同的ExecutorGroup和當客戶端斷開執行人即使客戶端數據被除去仍然存在。

如果我們必須談論數據庫請求,一些數據庫查詢可能需要很長時間才能完成,所以如果有10個執行者並且有10個作業正在處理中,那麼第11個作業必須等到其中一個作業完成。如果服務器同時接收超過10個非常耗時的數據庫作業,這是一個瓶頸。越來越多的執行者將在一定程度上解決瓶頸問題。

+0

問題 - 你說「執行程序將用於不同的客戶端,因爲你讓他們成爲單身人士......」我不知道我做了那個......我知道我讓數據庫管理員成爲單身人士,但是執行人?我在哪裏做的? :) 如果上面的代碼是真正的最佳實踐,唯一要改變的是執行者的數量,是否有一個經驗法則來達到這個數字或一個確定的實用程序來確定它? – davidkomer

+0

用'DefaultEventExecutorGroup(10)'構造的'EdgeInitializer',所以我認爲只有一個'EdgeInitializer'和ExecutorGroup。您可以檢查在登錄多個客戶端時創建了多少次EdgeInitializer。如果只有一個'EdgeInitializer',那麼什麼時候可以說只有一個executorgroup,我們可以說系統高效。對於簡單的快速響應應用程序(以毫秒爲單位的響應時間),10個線程將處理數百個客戶端。但數據庫調用可能需要一段時間才能完成,您可以將它增加到500多個線程,甚至有300多個線程掛起,其餘的將會響應。 – meka

+0

我的意思是Netty不應該像這樣暫停,並且數據庫作業在一段時間內會暫停網絡線程。在正常的系統中,線程應該很快完成,並且應該立即接收下一份工作爲了隱藏我們所做的,我們正在增加線程數量。我不確定這是否是最佳做法,但是有很多線程應該可以。另一種方法是在另一個線程系統中接收來自netty的請求並處理與netty不相關的線程系統的請求,因此10個線程就足夠了,因爲netty線程將重複工作。 – meka