2017-08-04 91 views
2

如何在基於Netty的HTTP客戶端中重試HTTP請求?使用Netty HTTP客戶端重試請求

考慮以下的處理程序,它試圖如果HTTP響應代碼503被接收到1秒後重試HTTP請求:

public class RetryChannelHandler extends ChannelDuplexHandler { 
    List<HttpObject> requestParts; 

    @Override 
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { 
     if (msg instanceof HttpRequest) { 
      requestParts = new ArrayList<>(); 
      requestParts.add((HttpRequest)msg); 
     } else if (msg instanceof HttpObject) { 
      requestParts.add((HttpObject)msg); 
     } 

     super.write(ctx, msg, promise); 
    } 

    @Override 
    public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { 
     if (msg instanceof HttpResponse) { 
      HttpResponse res = (HttpResponse)msg; 
      if (res.status().code() == 503) { 
       ctx.executor().schedule(new Runnable() { 
        @Override 
        public void run() { 
         for (HttpObject obj : requestParts) { 
          ctx.channel().write(obj); 
         } 
        } 
       }, 1000, TimeUnit.MILLISECONDS); 
      } else { 
       super.channelRead(ctx, msg); 
      } 
     } else { 
      super.channelRead(ctx, msg); 
     } 
    } 
} 

當我寫入信道在本例中,在管道中的其他處理程序請參閱HttpObjects,但HttpRequest實際上並未再次執行 - 只收到一個HttpResponse。

我想我只是在這種情況下濫用通道,我需要創建一個新的通道(代表到服務器的新連接)來執行重試。我不清楚如何從Handler的上下文創建新的頻道,以及我是否真的在Netty的正確層面上做這種邏輯。

任何有關如何實現我所描述的行爲的指導將不勝感激。

回答

2

您還需要在write(...)之後致電flush(),否則將不會刷新頻道。此外,您需要確保您可能保留()和複製()HttpContent,否則您最終可能會嘗試編寫已發佈的HttpContent對象。

像這樣(未測試):

public class RetryChannelHandler extends ChannelDuplexHandler { 
    Queue<HttpObject> requestParts; 

    @Override 
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { 
     if (msg instanceof HttpRequest) { 
      requestParts = new ArrayDeque<>(); 
      requestParts.add((HttpRequest)msg); 
     } else if (msg instanceof HttpContent) { 
      requestParts.add(((HttpContent)msg).duplicate().retain()); 
     } 

     super.write(ctx, msg, promise); 
    } 

    @Override 
    public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { 
     if (msg instanceof HttpResponse) { 
      HttpResponse res = (HttpResponse)msg; 
      if (res.status().code() == 503) { 
       ctx.executor().schedule(new Runnable() { 
        @Override 
        public void run() { 
         HttpObject obj; 
         while ((obj = requestParts.poll()) != null) { 
          ctx.write(obj); 
         } 
         ctx.flush(); 
        } 
       }, 1000, TimeUnit.MILLISECONDS); 
      } else { 
       HttpObject obj; 
       while ((obj = requestParts.poll()) != null) { 
        ReferenceCountUtil.release(obj); 
       } 
       super.channelRead(ctx, msg); 
      } 
     } else { 
      super.channelRead(ctx, msg); 
     } 
    } 
} 
+0

謝謝!經過對實現的小調整後,這解決了我的問題。 –