2017-03-03 109 views
0

在一個WebSocket中接收消息時,將此消息轉發給其他連接的WebSocket的問題。我使用jetty-9.2.20.v20161216。會話管理Jetty Websocket服務器

所以我初始化服務器..

JettyWSServer websocketServer = new JettyWSServer("localhost", 8000, new MySocketHandler(), new QueuedThreadPool(128)); 

public <T extends WebSocketHandler> JettyWSServer(String hostName, int port, T webscoketHandler, QueuedThreadPool threadPool) { 
    this.hostName = hostName; 
    this.port = port; 
    this.handler = webscoketHandler; 
    this.threadPool = threadPool; 
    this.socket = null; 

    //create server 
    this.server = new Server(this.threadPool); 

    //set connector 
    ServerConnector connector = new ServerConnector(server); 
    connector.setHost(this.hostName); 
    connector.setPort(this.port); 
    this.server.addConnector(connector); 

    //set handler 
    this.server.setHandler(this.handler); 

    //set listener 
    setLifecycleListener(); 
} 

MySocketHandler.java

public class MySocketHandler extends WebSocketHandler { 

    private final String TAG = MySocketHandler.class.getSimpleName(); 
    private MySocketCreator creator; 

    @Override 
    public void configure(WebSocketServletFactory webSocketServletFactory) { 
     this.creator = new MySocketCreator(); 
     webSocketServletFactory.setCreator(this.creator); 
    } 


    public Set<ServerSocket> getSockets(){ 
     return this.creator.getSockets(); 
    } 
} 

MySocketCreator.java

public class MySocketCreator implements WebSocketCreator { 

    private static final String TAG = MySocketCreator.class.getSimpleName(); 
    private static Log log = new Log(TAG, true); 

    private Set<ServerSocket> sockets = new HashSet<>(); 
    private Set<Session> guests = new HashSet<>(); 
    private ConcurrentHashMap<ServiceUser, ArrayList<WSDeviceSessionWrapper>> users = new ConcurrentHashMap<>(); 


    @Override 
    public Object createWebSocket(ServletUpgradeRequest servletUpgradeRequest, ServletUpgradeResponse servletUpgradeResponse) { 
     ServerSocket socket = new ServerSocket(statusCallback); 
     sockets.add(socket); 
     return socket; 
    } 


    private OnSessionStatusListener statusCallback = new OnSessionStatusListener() { 
     @Override 
     public void onGuestIn(Session session) { 


      synchronized (this) { 
       guests.add(session); 
       Integer totalAgeReduce = users.values() 
         .stream() 
         .map(wsDeviceSessionWrappers -> { 
          return 1; 
         }) 
         .reduce(
           0, 
           (a, b) -> a + b); 
       log.debug("onGuestIn() " + "Guests: " + guests.size() + " Registered: " + totalAgeReduce); 
      } 
     } 

     @Override 
     public void onUserIn(Session session, ServiceUser user, Device device) { 


      synchronized (this) { 

       if (guests.contains(session)) guests.remove(session); 

       if (!users.containsKey(user)) { 
        users.put(user, new ArrayList<WSDeviceSessionWrapper>()); 
       } 
       users.get(user).add(new WSDeviceSessionWrapper(session, device)); 

       log.debug("onUserIn() " + "Guests: " + guests.size() + " Registered: " + users.size()); 
      } 

     } 

     @Override 
     public void sendResponse(ArrayList<ServiceUser> users, WSResponse response) { 
     log.debug("Send message to [" + (users != null ? users.size() : null) + "] current users " + MySocketCreator.this.users.size()); 
     MySocketCreator.this.users.keySet().forEach(user -> { 
       users.forEach(u -> { 
        if (user.equals(u)) { 

         ArrayList<WSDeviceSessionWrapper> wsDeviceSessionWrappers = MySocketCreator.this.users.get(user); 
         new ArrayList<>(wsDeviceSessionWrappers).forEach(wrapper -> { 
           wrapper.getSession().getRemote().sendStringByFuture(response.toJSON()); 
          } 
         }); 

        } 
       }); 
      }); 


     } 

     @Override 
     public void sendResponse(ServiceUser user, WSResponse response, Device excludeDevice) { 
       MySocketCreator.this.users.get(user).forEach(wrapper -> { 
        wrapper.getSession().getRemote().sendStringByFuture(response.toJSON()); 
       }); 
     } 

     @Override 
     public void onExit(Session session, ServiceUser user, Device device) { 

      synchronized (this) { 

       //remove from guest sessions 
       if (session != null && guests.contains(session)) guests.remove(session); 


       if (user != null && device != null && users.containsKey(user)) { 
        ArrayList<WSDeviceSessionWrapper> wrappers = users.get(user); 
        Iterator<WSDeviceSessionWrapper> iterator = wrappers.iterator(); 
        while (iterator.hasNext()) { 
         WSDeviceSessionWrapper wrapper = iterator.next(); 
         if (wrapper.getSession() == session || wrapper.getSession().equals(session) && wrapper.getDevice() == device || wrapper.getDevice().equals(device)) { 
          //remove session for current device 
          iterator.remove(); 

          //if user does not have session on server 
          //remove him from current server users 
          if (wrappers.size() == 0) { 
           users.remove(user); 
          } 
         } 
        } 
       } 

       Integer totalRegisteredDevices = users.values() 
         .stream() 
         .map(wsDeviceSessionWrappers -> { 
          return 1; 
         }) 
         .reduce(
           0, 
           (a, b) -> a + b); 
       log.debug("onExit() " + "Guests: " + guests.size() + " Registered: " + totalRegisteredDevices); 
      } 


     } 
    }; 

    public Set<ServerSocket> getSockets() { 
     return sockets; 
    } 

} 

這段代碼的邏輯:

MySocketCreator的類中,當創建一個新的套接字時,我傳遞給回調函數。接下來在事件onOpen的套接字中,我調用回調並將它傳遞給會話,並將會話存儲在MySocketCreator類中,在此會話之後,我將與用戶和設備相關聯。

的問題是,當我嘗試通過回調方法

@Override 
    public void sendResponse(ArrayList<ServiceUser> users, WSResponse response) { 
    log.debug("Send message to [" + (users != null ? users.size() : null) + "] current users " + MySocketCreator.this.users.size()); 
     MySocketCreator.this.users.keySet().forEach(user -> { 
      users.forEach(u -> { 
       if (user.equals(u)) { 
        ArrayList<WSDeviceSessionWrapper> wsDeviceSessionWrappers = MySocketCreator.this.users.get(user); 
        new ArrayList<>(wsDeviceSessionWrappers).forEach(wrapper -> { 
          wrapper.getSession().getRemote().sendStringByFuture(response.toJSON()); 
         } 
        }); 

       } 
      }); 
     }); 
    } 

字符串將消息發送給所有用戶,從任何websocketwrapper.getSession().getRemote().sendStringByFuture(response.toJSON());

鎖的線程和服務器不能再工作。試圖取代它

wrapper.getSession().getRemote().sendString(response.toJSON());

拋出異常

java.lang.IllegalStateException: Blocking message pending 10000 for BLOCKING 

org.eclipse.jetty.websocket.api.WebSocketException: RemoteEndpoint unavailable, outgoing connection not open 

這兩個選項不會對300間的連接工作/

問題:我怎樣才能將消息發送給所有用戶?

回答

0

您不能盲目地向所有用戶發送消息。

如果某些用戶擁塞並且沒有像其他用戶一樣快速讀取該怎麼辦?

如果一個用戶以更快的速度生成郵件,那麼您的網絡連接速度最慢?

在你的情況下,你必須處理Future你儘快回到sendStringByFuture()(或可替代的sendString(String,WriteCallback)),如果是連接速度慢,或失敗,你必須從你的用戶列表中刪除。如果您收到意味着該消息已發送的事件,那麼您知道該特定用戶/客戶端不擁塞且可以自由發送另一條消息。

您發送的循環必須爲每個用戶/客戶端排隊消息,而不是作爲一個整體,但每個用戶/客戶端都有自己的隊列。

只有在發送時已知該用戶未擁塞,您的環路纔會發送。

您甚至可能需要刪除消息以真正減慢客戶端速度,或者在隊列變得太大時完全斷開它們。

是的,這是很複雜的,這就是爲什麼有這麼多的庫建立在websocket之上才能做到這一點。

考慮使用cometd及其websocket傳輸。

+0

當我使用sendStringByFuture和sendString(String,WriteCallback)並且如果客戶端沒有收到消息,循環不會向其他用戶發送消息。我如何解決它? –