2015-07-19 58 views
3

我試圖用netty 4.1編寫一個非阻塞代理。我有一個處理傳入連接的「FrontHandler」,然後是一個處理傳出連接的「BackHandler」。我正在關注的HexDumpProxyHandler(https://github.com/netty/netty/blob/ed4a89082bb29b9e7d869c5d25d6b9ea8fc9d25b/example/src/main/java/io/netty/example/proxy/HexDumpProxyFrontendHandler.java#L67Netty的非阻塞反向代理

在這段代碼中,我發現:

@Override 
public void channelRead(final ChannelHandlerContext ctx, Object msg) { 
    if (outboundChannel.isActive()) { 
     outboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {, I've seen: 

這意味着如果出站客戶端連接已經準備好進入的消息只寫。這在HTTP代理案例中顯然不理想,所以我在想什麼是處理它的最好方法。

我想知道是否禁用自動讀取前端連接(並且只有在外出客戶端連接準備就緒後才手動讀取)是一個不錯的選擇。然後,我可以在後端處理程序的「channelActive」事件中再次通過子套接字啓用autoRead。然而,我不確定每個「read()」調用的處理程序中會得到多少個消息(使用HttpDecoder,我假設我會得到最初的HttpRequest,但是我真的很想避免獲得後續的HttpContent/LastHttpContent消息,直到我再次手動觸發read()並通過通道啓用autoRead。

另一種選擇是使用承諾從客戶端ChannelPool獲得信道:

private void setCurrentBackend(HttpRequest request) { 
    pool.acquire(request, backendPromise); 

    backendPromise.addListener((FutureListener<Channel>) future -> { 
     Channel c = future.get(); 
     if (!currentBackend.compareAndSet(null, c)) { 
      pool.release(c); 
      throw new IllegalStateException(); 
     } 
    }); 
} 

,然後執行復制從輸入到通這一承諾的輸出。例如:

private void handleLastContent(ChannelHandlerContext frontCtx, LastHttpContent lastContent) { 
    doInBackend(c -> { 
     c.writeAndFlush(lastContent).addListener((ChannelFutureListener) future -> { 
      if (future.isSuccess()) { 
       future.channel().read(); 
      } else { 
       pool.release(c); 
       frontCtx.close(); 
      } 
     }); 
    }); 
} 
private void doInBackend(Consumer<Channel> action) { 
    Channel c = currentBackend.get(); 
    if (c == null) { 
     backendPromise.addListener((FutureListener<Channel>) future -> action.accept(future.get())); 
    } else { 
     action.accept(c); 
    } 
} 

,但我不知道它有多好,加入聽衆它信守承諾永遠存在,並盡一切從「前」到「回」的寫入。我也不能確定如何實例,這樣的操作在正確的線程執行的...現在我使用的承諾:

backendPromise = group.next().<Channel> newPromise(); // bad 
// or 
backendPromise = frontCtx.channel().eventLoop().newPromise(); // OK? 

(其中組是相同eventLoopGroup作爲使用

前端的ServerBootstrap)。

如果不通過正確的線程處理它們,我認爲在「doInBackend」方法中進行「else {}」優化可能會有問題,以避免使用Promise並直接寫入通道。

回答

2

無自動載入方法本身無法工作,因爲即使只執行一次read(),HttpRequestDecoder也會創建多個消息。

我已經通過使用鏈式CompletableFutures解決了它。

+1

我正面臨一個非常類似的問題:客戶端 - [http] - > Netty - [websockets] - >後端,我必須等待websockets握手完成才能發送數據。實現類似的東西似乎沒有多大的複雜性。你能否詳細描述你所做的事情?謝謝 –

0

我曾參與基於MQTT協議的類似代理應用程序。所以它基本上被用來創建一個實時聊天應用程序。然而,我必須設計的應用程序本質上是異步的,所以我自然不會遇到任何這樣的問題。因爲萬一

outboundChannel.isActive() == false 

那麼我可以簡單地保持在隊列或持久DB的消息,然後處理它們一旦outboundChannel到了。但是,由於您正在討論HTTP應用程序,因此這意味着該應用程序本質上是同步的,這意味着客戶端無法繼續發送數據包,直到outboundChannel啓動並運行。因此,您建議的選項是隻有在通道處於活動狀態時纔會讀取數據包,您可以通過禁用ChannelConfig中的自動讀取手動處理消息讀取。

但是,我想建議您應該檢查outboundChannel是否處於活動狀態。如果通道處於活動狀態,請將其發送給處理。如果信道不活躍,你應該通過發回類似Error404

的響應伴隨着這一點,你應該配置你的客戶拒絕數據包,以保持在重試一定的時間間隔後發送數據包和相應的處理有什麼需要以防止頻道花費太長時間變得活躍並變得可讀。手動處理channelRead通常不是首選,並且是反模式。你應該讓Netty以最有效的方式爲你處理。

+0

我不能發送404回客戶端,因爲客戶端沒有(也不應該)知道有一個事實,即有一個後端服務器需要連接到... –