我正在研究卡夫卡的網絡層代碼有幾個關於選擇器 類的問題,特別是如何實現poll()方法。民調()方法是這樣的:Kafka來源 - 瞭解Selector.poll()的語義
void poll(int timeout){
....
/* check ready keys */
long startSelect = time.nanoseconds();
int readyKeys = select(timeout);
long endSelect = time.nanoseconds();
this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());
if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect);
pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
}
...
}
是否有一個具體的要求,因爲這些我們稱之爲pollSelectionKeys()
方法由select()
方法,然後就立即接通鍵返回鍵第一 ?是否 只是爲了清楚起見,我們分別執行這些操作,還是有一些特定的要求?
其次,在pollSelectionKeys()
方法,我們有:
void pollSelectionKeys(Iterable<SelectionKey> selectionKeys,
boolean isImmediatelyConnected,
long currentTimeNanos){
...
/* if channel is ready write to any sockets that have space in their buffer and for which
we have data */
if (channel.ready() && key.isWritable()) {
Send send = channel.write();
if (send != null) {
this.completedSends.add(send);
this.sensors.recordBytesSent(channel.id(), send.size());
}
}
...
}
據我瞭解,我們永遠只能寫一個KafkaChannel
時,要麼屬於 我們從早期調用獲取到select()
鍵集方法,或者如果KafkaChannel
與immediatelyConnectedKeys
之一關聯。我的問題是,爲什麼我們這樣去寫KafkaChannels
這個 業務?更具體地說,我們不只是迭代 已連接的所有KafkaChannels
,並寫入他們,如果他們有一個Send
對象與它們關聯?通過這種方式,我們儘快寫入KafkaChannel
, 而不必等待它屬於immediatelyConnectedKeys
或readyKeys
。
這使得更多的意義。你對上述第二個問題有任何迴應嗎? –
我加入了我對第二個問題的看法。 – Shailendra