2012-03-08 40 views
1

我正在處理一個小代理服務器,管理一堆在選擇器循環中運行的tcp-ip連接。外部服務通過嵌入在代理服務器中的http服務器與代理服務器通信,並且這些請求一直到達一個套接字,然後在對原始http請求的響應中返回該套接字的響應。Jetty-等待響應數據與延續,ASYNCSTARTED異常

我有併發請求(具有唯一的請求標識)進入http服務器,這些請求中的實際消息卡在隊列中,並傳遞給一堆套接字連接(某個套接字將對應於一條消息),然後將響應轉儲到散列中。 http請求會定期檢查哈希,直到它看到具有正確請求標識的響應,否則超時。如果沒有任何意義,這是一個很棒的系統圖。

Awesome system diagram

我完全新的使用Java的HTTP東西,所以我想我會用碼頭,並設置了一個簡單的請求處理一拉Hello World示例。使用thread.sleep可以防止併發請求和延續,據我所知,需要事件觸發。我希望稍後檢查散列的請求,而不是每次套接字在散列中放入響應時觸發連接事件,除非事件將通過請求ID或其他事件觸發。如果有很多流量,這似乎並不實際。

懸崖:

  • 我如何處理程序(或servlet,如果我需要走那麼遠)等待 爲位或週期性的,而這是不 其他阻擋一切數據可用?
  • 有沒有更好的方法,使HTTP請求 知道它應該返回作爲響應數據的?
  • 這種設計不好,如果是這樣,有什麼選擇?

編輯:

這裏是我的問題首次嘗試。當HTTP請求指向與連接的tcp客戶端相對應的id時,它工作正常。如果這個id不對應一個連接的客戶端,那麼jetty就會進入一個循環,並且會反覆拋出以下異常。我期望發生的是超時請求,因爲它在收集器中找不到任何消息。

Mar 8, 2012 2:39:08 PM APIRequestHandler doPost 
INFO: json: {"state": "255", "m": "set", "id": "00:00:00:00:00:06", "rid": "84955228696f11e1aa04c42c033c8bd7", "sid": "XXXXX"} 
2012-03-08 14:39:08.526:WARN:oejs.ServletHandler:/ 
java.lang.IllegalStateException: ASYNCSTARTED,initial 
    at org.eclipse.jetty.server.AsyncContinuation.suspend(AsyncContinuation.java:365) 
    at org.eclipse.jetty.server.AsyncContinuation.suspend(AsyncContinuation.java:959) 
    at APIRequestHandler.doPost(APIRequestHandler.java:72) 
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:755) 
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:848) 
    at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:594) 
    at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:485) 
    at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:231) 
    at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1065) 
    at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:412) 
    at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:192) 
    at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:999) 
    at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:117) 
    at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:111) 
    at org.eclipse.jetty.server.Server.handle(Server.java:351) 
    at org.eclipse.jetty.server.AbstractHttpConnection.handleRequest(AbstractHttpConnection.java:454) 
    at org.eclipse.jetty.server.AbstractHttpConnection.content(AbstractHttpConnection.java:900) 
    at org.eclipse.jetty.server.AbstractHttpConnection$RequestHandler.content(AbstractHttpConnection.java:954) 
    at org.eclipse.jetty.http.HttpParser.parseNext(HttpParser.java:857) 
    at org.eclipse.jetty.http.HttpParser.parseAvailable(HttpParser.java:235) 
    at org.eclipse.jetty.server.AsyncHttpConnection.handle(AsyncHttpConnection.java:76) 
    at org.eclipse.jetty.io.nio.SelectChannelEndPoint.handle(SelectChannelEndPoint.java:609) 
    at org.eclipse.jetty.io.nio.SelectChannelEndPoint$1.run(SelectChannelEndPoint.java:45) 
    at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:599) 
    at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:534) 
    at java.lang.Thread.run(Thread.java:680) 

的Servlet:

public void doPost(HttpServletRequest req, HttpServletResponse res) 
     throws java.io.IOException 
    { 
    String rid = req.getParameter("rid"); 
    String msg = req.getParameter("json"); 
    String id = req.getParameter("id"); 

    // Send to tcp-ip channel 
    APIMessage m = new APIMessage(); 
    m.rid = rid; 
    m.id = id; 
    m.json = msg + "\r\n"; 
    queue.add(m); 
    this.selector.wakeup(); 

    Continuation cc = ContinuationSupport.getContinuation(req); 

    res.setContentType("text/plain"); 
    res.getWriter().println("Request: "+rid+"\tstart:\t"+new Date()); 
    res.getWriter().flush(); 
    int elapsed = 0; 
    String responseStr = ""; 
    boolean timeoutHappened = true; 
    while(elapsed<this.timeout) // say 50 millis 
    { 
     if(collector.containsKey(rid)){ 
      responseStr = collector.get(rid); 
      collector.remove(rid); 
      timeoutHappened = false; 
      break; 
     } 
     cc.setTimeout(10); // sleep 10 millis 
     cc.suspend(); 
    } 

    if(!timeoutHappened) 
    { 
     logger.info("--->API http request found response in collector!"); 
     logger.info("Respose: "+responseStr); 
    } 
    else // timed out 
    { 
     logger.info("--->API http request timed out!"); 
    } 

    res.getWriter().println("Request: "+rid+"\tend:\t"+new Date()); 
    if(cc.isInitial()!=true) {cc.complete();} 
} 

回答

0

我會看一下我們在碼頭有代理已經啓動。

http://git.eclipse.org/c/jetty/org.eclipse.jetty.project.git/tree/jetty-servlets/src/main/java/org/eclipse/jetty/servlets/ProxyServlet.java

如果不解決你的努力做的,它至少讓你開始。

您可以運行的servlet與該包中的測試類方便地嵌入:

http://git.eclipse.org/c/jetty/org.eclipse.jetty.project.git/tree/jetty-servlets/src/test/java/org/eclipse/jetty/servlets/AsyncProxyServer.java

好運

+0

感謝。我不認爲我可以使用標準事件超時,並在我的情況下連接,所以我訴諸循環。它可以正常工作,除非傳入的請求沒有與傳入的id相對應的tcp客戶端。我在上面添加了我的代碼 - 不知道爲什麼它會失敗。 – nflacco 2012-03-08 22:59:24

+0

您對continuation的使用已關閉,您需要在那裏進行isInitial()檢查,因此您不會多次執行相同的邏輯,請查看代理servlet以獲取延續機制應如何工作的示例。 – 2012-03-08 23:56:16

+0

碼頭是不是多線程開箱?該操作似乎說併發請求排隊,而不是異步執行? – davidfrancis 2012-03-09 00:22:18