我正在測試簡單的拓撲結構來檢查卡夫卡噴嘴的性能。 它包含kafka噴口和螺栓acknoledge每個元組。 博爾特execute方法:卡夫卡噴嘴性能不佳
public void execute(Tuple input) {
collector.ack(input);
}
拓撲結構是這樣的:
protected void configureTopology(TopologyBuilder topologyBuilder) {
configureKafkaCDRSpout(topologyBuilder);
configureKafkaSpoutBandwidthTesterBolt(topologyBuilder);
}
private void configureKafkaCDRSpout(TopologyBuilder builder) {
KafkaSpout kafkaSpout = new KafkaSpout(createKafkaCDRSpoutConfig());
int spoutCount = Integer.valueOf(topologyConfig.getProperty("kafka.cboss.cdr.spout.thread.count"));
builder.setSpout(KAFKA_CDR_SPOUT_ID, kafkaSpout, spoutCount)
.setNumTasks(Integer.valueOf(topologyConfig.getProperty(KAFKA_CDR_SPOUT_NUM_TASKS)));
}
private SpoutConfig createKafkaCDRSpoutConfig() {
BrokerHosts hosts = new ZkHosts(topologyConfig.getProperty("kafka.zookeeper.broker.host"));
String topic = topologyConfig.getProperty("kafka.cboss.cdr.topic");
String zkRoot = topologyConfig.getProperty("kafka.cboss.cdr.zkRoot");
String consumerGroupId = topologyConfig.getProperty("kafka.cboss.cdr.consumerId");
SpoutConfig kafkaSpoutConfig = new SpoutConfig(hosts, topic, zkRoot, consumerGroupId);
kafkaSpoutConfig.scheme = new SchemeAsMultiScheme(new CbossCdrScheme());
kafkaSpoutConfig.ignoreZkOffsets = true;
kafkaSpoutConfig.fetchSizeBytes = Integer.valueOf(topologyConfig.getProperty("kafka.fetchSizeBytes"));
kafkaSpoutConfig.bufferSizeBytes = Integer.valueOf(topologyConfig.getProperty("kafka.bufferSizeBytes"));
return kafkaSpoutConfig;
}
public void configureKafkaSpoutBandwidthTesterBolt(TopologyBuilder topologyBuilder) {
SimpleAckerBolt b = new SimpleAckerBolt();
topologyBuilder.setBolt(SPOUT_BANDWIDTH_TESTER_BOLT_ID, b, Integer.valueOf(topologyConfig.getProperty(CFG_SIMPLE_ACKER_BOLT_PARALLELISM)))
.setNumTasks(Integer.valueOf(topologyConfig.getProperty(SPOUT_BANDWIDTH_TESTER_BOLT_NUM_TASKS)))
.localOrShuffleGrouping(KAFKA_CDR_SPOUT_ID);
}
其他拓撲結構設置:
topology.max.spout.pending=250
topology.executor.receive.buffer.size=1024
topology.executor.send.buffer.size=1024
topology.receiver.buffer.size=8
topology.transfer.buffer.size=1024
topology.acker.executors=1
我發動我的拓撲1名工人1卡夫卡脫粒機和1個簡單阿克爾螺栓。 這就是我在風暴UI獲得:
歐凱我在10分鐘1.5kk元組。螺栓的衝擊力約爲0.5。所以我的邏輯很簡單:如果我將噴嘴和螺栓平行提示 - 我會得到雙倍的性能。 下一個測試是1個工人2卡夫卡噴口,2個簡單的阿克爾螺栓和topology.acker.executors = 2。下面是結果:
所以,我得到增加parallelizm提示更糟更流暢。爲什麼它會發生?我如何每秒處理元組?實際上任何測試與噴口並行性暗示大於2表示比1噴口執行器更糟糕的結果。
我已經檢查過:
1)這不是卡夫卡故障。主題有2個經紀人有20個分區。 4名工作人員的拓撲結構可獲得x4性能。
2)這不是服務器故障。服務器有40個核心和32Gb RAM。運行拓撲時,它消耗大約1/8的CPU,而幾乎沒有RAM。
3)改變topology.max.spout.pending參數沒有幫助。 4)增加Bolt或Acker並行性暗示甚至更不利於。
你只用一個worker運行兩個測試,如果你添加了另一個worker,該怎麼辦?因此,與兩名工人一起進行第二次測試。 – morganw09dev
謝謝你的回覆,摩根。你說得對。越來越多的工人給我比例的結果。有2個工人2個噴嘴,每秒我的元組數增加一倍。但是這個測試的想法是確定1名工人的最佳表現。我能得到的最好的結果是每10分鐘1,5kk元組,或每秒2500元組。我猜想在40核心,32GB內存和10Gb/s網絡的節點上,我可以做得更好。 – f1sherox
1名員工,但40個核心並不合理。無論如何,每個工作人員都是單線程的,所以這意味着您的服務器有能力容納40名員工。您現在可以在單個核心服務器上獲得完全相同的性能。每個線程2500元/ s並不是很好,但也不算太差。 – C4stor