2017-09-12 58 views
0

我正在研究卡夫卡的網絡層代碼有幾個關於選擇器 類的問題,特別是如何實現poll()方法。民調()方法是這樣的:Kafka來源 - 瞭解Selector.poll()的語義

void poll(int timeout){ 
.... 
    /* check ready keys */ 
    long startSelect = time.nanoseconds(); 
    int readyKeys = select(timeout); 
    long endSelect = time.nanoseconds(); 
    this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds()); 

    if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) { 
     pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect); 
     pollSelectionKeys(immediatelyConnectedKeys, true, endSelect); 
    } 

... 
} 

是否有一個具體的要求,因爲這些我們稱之爲pollSelectionKeys()方法由select()方法,然後就立即接通鍵返回鍵第一 ?是否 只是爲了清楚起見,我們分別執行這些操作,還是有一些特定的要求?

其次,在pollSelectionKeys()方法,我們有:

void pollSelectionKeys(Iterable<SelectionKey> selectionKeys, 
             boolean isImmediatelyConnected, 
             long currentTimeNanos){ 
... 
    /* if channel is ready write to any sockets that have space in their buffer and for which 
    we have data */ 
    if (channel.ready() && key.isWritable()) { 
     Send send = channel.write(); 
     if (send != null) { 
      this.completedSends.add(send); 
      this.sensors.recordBytesSent(channel.id(), send.size()); 
     } 
    } 
... 
} 

據我瞭解,我們永遠只能寫一個KafkaChannel時,要麼屬於 我們從早期調用獲取到select()鍵集方法,或者如果KafkaChannelimmediatelyConnectedKeys之一關聯。我的問題是,爲什麼我們這樣去寫KafkaChannels這個 業務?更具體地說,我們不只是迭代 已連接的所有KafkaChannels,並寫入他們,如果他們有一個Send 對象與它們關聯?通過這種方式,我們儘快寫入KafkaChannel, 而不必等待它屬於immediatelyConnectedKeysreadyKeys

回答

1

答案在於連接方法Selector類(下面相關部分)

connected = socketChannel.connect(address); 
.............................. 
................................ 

SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT); 

的作爲每該文檔NIO的解釋SocketChannel連接

如果此通道處於非阻塞模式,則調用這個方法啓動了一個非阻塞連接操作。如果 連接立即建立,就像本地連接發生的那樣,則此方法返回true。否則,此方法 返回false,並且後面的連接操作必須由調用finishConnect方法的 完成。

所以互動的一個典型的工作流程如下(很好的解釋here

如果你是在非阻塞模式,你應該連接:

  • 註冊通道OP_CONNECT
  • 當它觸發時調用finishConnect()
  • 如果它返回true,則取消註冊OP_CONNECT並註冊這取決於你想要什麼下一個
  • 做,如果返回false OP_READ或OP_WRITE,什麼也不做,繼續選擇
  • 如果可以連接()或finishConnect()拋出一個異常,關閉通道,然後重試還是算了吧或者告訴用戶或任何合適的東西。

如果直到通道連接不想做任何事,做 連接阻塞模式,並進入非阻塞模式時 連接成功。

這種連接方法可以立即連接爲在本地連接的情況下,並且可以不觸發其註冊用於此連接(connect調用後幾行)的SocketChannel OP_CONNECT事件,因此,使用典型的Java NIO寄存器代碼時,我們可能會錯過它。我們需要最終在這些頻道上調用finishConnect(請參閱工作流中的第二個重點)。因此,我們將這樣的頻道密鑰添加到另一個設置immediatelyConnectedKeys,以便它們可以被處理得太晚,否則我們會完全錯過它們。

if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) { 
      pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect); 
      pollSelectionKeys(immediatelyConnectedKeys, true, endSelect); 
     } 

後來在pollSelectionKeys方法(注意使用finishConnect這是在所有的卡夫卡代碼finishConnect調用底層SocketChannel

/* complete any connections that have finished their handshake (either normally or immediately) */ 
       if (isImmediatelyConnected || key.isConnectable()) { 
        if (channel.finishConnect()) { 
......................... 
......................... 

一切看起來像標準的NIO的東西怎麼回事,除非有更多的是卡夫卡團隊可以解釋的內容。關於這個問題的更多內容可以在here找到。與此相關的一個有趣的誤解(bug提交併最終被JDK團隊拒絕)可以找到here

對於問題的第二部分,您可能會詢問以下代碼。爲什麼鑰匙

if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) { 
      pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect); 
      pollSelectionKeys(immediatelyConnectedKeys, true, endSelect); 
     } 

兩個單獨的呼叫看看我們現在有兩套鑰匙的保持。雖然有一個整體的按鍵的觀點是由selector.keys()提供,但按鍵是不能直接修改的,所以它一種只讀視圖。只有當它被取消並且其頻道已被註銷時,該鍵集中的鍵才被刪除。因此通常使用selector.selectedKeys()來訪問就緒通道。另外selector.selectedKeys()顯然不會從immediatelyConnectedKeys返回密鑰。通過從selector.selectedKeys()獲得的這些密鑰的通常處理模式是遍歷集合,測試 密鑰所代表的通道​​(可接受,可連接,可讀/可寫)是否已準備就緒,然後將其刪除從集合中。此刪除部分是非常必要的。選擇器不會從選定的密鑰集本身中刪除SelectionKey實例。你必須這樣做,當你完成頻道的處理。 通道下一次變爲「就緒」時,選擇器將再次將其添加到選定的按鍵組。所以這就是處理這兩者的原因,並且方法pollSelectionKeys被設計成兼顧兩者。

+0

這使得更多的意義。你對上述第二個問題有任何迴應嗎? –

+0

我加入了我對第二個問題的看法。 – Shailendra

0

的TCP連接不可用,直到I/O連接完成