2015-05-19 23 views
0

我有一個工作基於jetty的WebSocket服務器,我需要遷移到使用大氣。我需要將傳入的WebSocket消息寫入內部隊列以供後續處理,並且原始消息作爲對事務的肯定確認返回給客戶端。大氣WebSocketProtocol廣播

我已經實現了使用Atmosphere的WebSocketProtocol API。連接成功啓動,單個消息從客戶端發送到服務器。但是,沒有收到任何響應,看起來連接關閉。

以下是對於連接請求的日誌消息:連接建立

INFO [2015-05-19 14:46:08,222] org.eclipse.jetty.server.ServerConnector: Started [email protected]{HTTP/1.1}{0.0.0.0:9090} 
INFO [2015-05-19 14:46:08,236] org.eclipse.jetty.server.ServerConnector: Started [email protected]{HTTP/1.1}{0.0.0.0:8081} 
INFO [2015-05-19 14:46:08,237] org.eclipse.jetty.server.Server: Started @15902ms 
INFO [2015-05-19 14:46:08,344] com.df.sensoringest.services.SensorIngestServiceIntegrationTest: Connecting to: ws://localhost:9090/dF/sensorIngest 
127.0.0.1 - - [19/May/2015:14:46:08 +0000] "GET /dF/sensorIngest HTTP/1.1" 101 - "-" "-" 259 
INFO [2015-05-19 14:46:09,014] com.df.sensoringest.services.SensorIngestServiceIntegrationTest: Connected: WebSocketSession[websocket=JettyListenerEventDriver[com.df.sensoringest.services.SensorIngestServiceIntegrationTest$TestClient],behavior=CLIENT,[email protected]{IDLE}{f=Flusher[queueSize=0,aggregateSize=0,failure=null],g=Generator[CLIENT,validating],[email protected][ExtensionStack,s=START,c=0,len=0,f=null,[email protected][behavior=CLIENT,maxTextMessageSize=65536,maxTextMessageBufferSize=32768,maxBinaryMessageSize=65536,maxBinaryMessageBufferSize=32768,asyncWriteTimeout=60000,idleTimeout=300000,inputBufferSize=4096]]},[email protected][batching=true],incoming=JettyListenerEventDriver[com.df.sensoringest.services.SensorIngestServiceIntegrationTest$TestClient],outgoing=ExtensionStack[queueSize=0,extensions=[],incoming=org.eclipse.jetty.websocket.common.WebSocketSession,outgoing=org.eclipse.jetty.websocket.client.io.WebSocketClientConnection]] 
INFO [2015-05-19 14:46:09,032] com.df.sensoringest.resources.socket.SocketResource: connection requested: 4c3b4254-a3bc-4a1f-900b-3b8ad9b9afc6 

後,我送由所述的WebSocket服務器接收單個消息:

INFO [2015-05-19 14:46:16,015] com.df.sensoringest.services.SensorIngestServiceIntegrationTest: sending: This is a test message 
INFO [2015-05-19 14:46:16,035] com.df.sensoringest.resources.socket.SocketResource: received: This is a test message 
INFO [2015-05-19 14:46:16,352] com.df.sensoringest.resources.socket.SocketResource: connection closed: 4c3b4254-a3bc-4a1f-900b-3b8ad9b9afc6 

的響應是從未被客戶收到,並且連接關閉。沒有任何例外,所以我不知道發生了什麼。

這裏是WebSocket的服務器代碼:

@Slf4j 
@WebSocketProtocolService 
public class SocketResource implements WebSocketProtocol { 

    @Override 
    public List<AtmosphereRequest> onMessage(WebSocket webSocket, String data) { 
    log.info("received: {}", data); 
    webSocket.resource().getBroadcaster().broadcast(data); 
    return null; 
    } 

    @Override 
    public List<AtmosphereRequest> 
     onMessage(WebSocket webSocket, byte[] data, int offset, int length) { 
    try { 
     String bStr = new String(data, "UTF-8"); 
     log.info("received unexpected byte message: {}", bStr); 
    } catch (UnsupportedEncodingException e) { 
     log.error("unsupported text encoding: {}", e); 
    } 
    return null; 
    } 

    @Override 
    public void configure(AtmosphereConfig config) { 
    log.info("config method called"); 
    } 

    @Override 
    public void onOpen(WebSocket webSocket) { 
    log.info("connection requested: {}", webSocket.resource().uuid()); 
    } 

    @Override 
    public void onClose(WebSocket webSocket) { 
    log.info("connection closed: {}", webSocket.resource().uuid()); 
    } 

    @Override 
    public void onError(WebSocket webSocket, WebSocketProcessor.WebSocketException t) { 
    log.error("websocket error: {}", t.getMessage()); 
    log.error("status {}, message {}", t.response().getStatus(), t.response().getStatusMessage()); 
    } 
} 

,這裏是測試客戶端:

/** 
    * The Test WebSockets Client 
    */ 
    public static class TestClient extends WebSocketAdapter { 
    @Override 
    public void onWebSocketClose(int statusCode, String reason) { 
     log.error("websocket session closed: {}, {}", statusCode, reason); 
    } 

    @Override 
    public void onWebSocketConnect(Session session) { 
     log.info("Connected: {}", session); 
     try { 
     Thread.sleep((long)(5 * DateTimeConstants.MILLIS_PER_SECOND)); 
     } catch (Exception e) { 
     log.error("sleep interrupted ...", e); 
     } 

     try { 
     log.info("sending: {}", sensorMessage); 
     session.getRemote().sendString(sensorMessage); 
     } catch (Exception e) { 
     log.error("error sending message: {}", e); 
     } 
    } 

    @Override 
    public void onWebSocketError(Throwable cause) { 
     log.info("unexpected websockets exception: {}", cause); 
    } 

    @Override 
    public void onWebSocketBinary(byte[] payload, int offset, int len) { 
     log.info("unexpected binary message received"); 
    } 

    @Override 
    public void onWebSocketText(String message) { 
     if (message != null) { 
     response = message; 
     log.info("websocket response received: {}", message); 
     } 
    } 
    } 

於是,經過這一切,我在做什麼不正確或不這樣做,以使連接關閉和廣播回到客戶端失敗?

回答

0

我沒有在我的onOpen()方法中設置廣播器。做完這些之後,在我的onMessage()方法中獲得廣播者,我現在得到了我在客戶端中尋找的響應。

這裏是新的socket資源代碼:

/** 
* Example resource to show integration of Atmosphere within Dropwizard. 
*/ 
@Slf4j 
@WebSocketProtocolService 
public class SocketResource implements WebSocketProtocol { 

    @Inject 
    private BroadcasterFactory factory; 

    private int requestCount = 1; 

    @Override 
    public List<AtmosphereRequest> onMessage(WebSocket webSocket, String data) { 
    Broadcaster b = webSocket.resource().getBroadcaster(); 
    b.broadcast(data); 
    log.info("broadcaster: {}", b); 
    log.info("received: {}", data); 
    return null; 
    } 

    @Override 
    public List<AtmosphereRequest> 
     onMessage(WebSocket webSocket, byte[] data, int offset, int length) { 
    try { 
     String bStr = new String(data, "UTF-8"); 
     log.info("received unexpected byte message: {}", bStr); 
    } catch (UnsupportedEncodingException e) { 
     log.error("unsupported text encoding: {}", e); 
    } 
    return null; 
    } 

    @Override 
    public void configure(AtmosphereConfig config) { 
    log.info("config method called"); 
    } 

    @Override 
    public void onOpen(WebSocket webSocket) { 
    log.info("connection requested: {}", webSocket.resource().uuid()); 
    webSocket.resource().setBroadcaster(factory.lookup("/sensorIngest/" + 
     requestCount++, true)).suspend(-1); 
    } 

    @Override 
    public void onClose(WebSocket webSocket) { 
    log.info("connection closed: {}", webSocket.resource().uuid()); 
    } 

    @Override 
    public void onError(WebSocket webSocket, WebSocketProcessor.WebSocketException t) { 
    log.error("websocket error: {}", t.getMessage()); 
    log.error("status {}, message {}", t.response().getStatus(), t.response().getStatusMessage()); 
    } 

我願意改善爲我真的很新的大氣。我還沒有想到的一件事是如何僅將服務器響應發送給一個客戶端。