2015-11-03 126 views
1

我打算使用netty客戶端引導程序打開多個連接,以解析來自多個源的消息。這些消息都具有相同的格式,但是,由於需要處理的數據量很大,我必須在不同的線程上運行每個連接(這是假設netty爲每個客戶端通道創建一個線程,我找不到參考因爲 - 如果情況並非如此,這將如何實現?)。Scala Netty有沒有什麼辦法可以共享一個ReplayingDecoder

這是我用來連接到數據服務器的代碼:其延伸ReplayingDecoder

var b = new Bootstrap() 
     .group(group) 
     .channel(classOf[NioSocketChannel]) 
     .handler(RawFeedChannelInitializer) 


var ch1 = b.clone().connect(host, port).sync().channel(); 
var ch2 = b.clone().connect(host, port).sync().channel(); 

初始化器調用RawPacketDecoder,並且被定義here。 當打開單個連接時,代碼運行良好,沒有@Sharable,但爲了我的應用程序的目的,我必須多次連接到同一個服務器。

這會導致運行時錯誤@Sharable annotation is not allowed指向我的RawPacketDecoder類。

我不完全確定如何解決這個問題,但缺少在scala中實現類ReplayingDecoder作爲我的解碼器直接基於ByteToMessageDecoder

任何幫助將不勝感激。

注:我使用的網狀4.0.32最終

回答

1

我發現this StockExchange answer解決方案。

我的問題是我使用的是基於對象的ChannelInitializer(單例),並且ReplayingDecoder以及ByteToMessageDecoder是不可共享的。

我的初始化程序是作爲一個scala對象創建的,因此允許使用單個實例。將初始化程序更改爲scala類併爲每個引導程序克隆實例化解決了問題。我修改了引導代碼上面如下:

var b = new Bootstrap() 
    .group(group) 
    .channel(classOf[NioSocketChannel]) 
    //.handler(RawFeedChannelInitializer) 

var ch1 = b.clone().handler(new RawFeedChannelInitializer()).connect(host, port).sync().channel(); 
var ch2 = b.clone().handler(new RawFeedChannelInitializer()).connect(host, port).sync().channel(); 

我不知道這是否可以確保多線程被通緝,但它允許分割數據訪問到進給服務器的多個連接。

編輯更新:在對主題進行了額外的研究之後,我確定netty實際上確實爲每個通道創建一個線程;這證實通過打印創建每個通道的後到控制檯:

println("No. of active threads: " + Thread.activeCount()); 

輸出顯示一個遞增的數字作爲被創建並與它們各自的線程相關聯的信道。

默認情況下NioEventLoopGroup使用2*Num_CPU_cores線程定義here

DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
       "io.netty.eventLoopThreads", 
       Runtime.getRuntime().availableProcessors() * 2)); 

這個值可以通過設置

val group = new NioEventLoopGroup(16) 

,然後使用該組創建/設置引導被覆蓋到別的東西。

+0

Netty不會爲每個通道創建一個線程,它通過其自己的線程共享所有傳入通道,並且如果存在比線程更多的通道,則1個線程可以具有多個通道 – Ferrybig

相關問題