2016-03-01 57 views
0

我正在測試簡單的拓撲結構來檢查卡夫卡噴嘴的性能。 它包含kafka噴口和螺栓acknoledge每個元組。 博爾特execute方法:卡夫卡噴嘴性能不佳

public void execute(Tuple input) { 
    collector.ack(input); 
} 

拓撲結構是這樣的:

protected void configureTopology(TopologyBuilder topologyBuilder) { 
    configureKafkaCDRSpout(topologyBuilder); 
    configureKafkaSpoutBandwidthTesterBolt(topologyBuilder); 
} 

private void configureKafkaCDRSpout(TopologyBuilder builder) { 
    KafkaSpout kafkaSpout = new KafkaSpout(createKafkaCDRSpoutConfig()); 
    int spoutCount = Integer.valueOf(topologyConfig.getProperty("kafka.cboss.cdr.spout.thread.count")); 
    builder.setSpout(KAFKA_CDR_SPOUT_ID, kafkaSpout, spoutCount) 
      .setNumTasks(Integer.valueOf(topologyConfig.getProperty(KAFKA_CDR_SPOUT_NUM_TASKS))); 
} 
private SpoutConfig createKafkaCDRSpoutConfig() { 
    BrokerHosts hosts = new ZkHosts(topologyConfig.getProperty("kafka.zookeeper.broker.host")); 
    String topic = topologyConfig.getProperty("kafka.cboss.cdr.topic"); 
    String zkRoot = topologyConfig.getProperty("kafka.cboss.cdr.zkRoot"); 
    String consumerGroupId = topologyConfig.getProperty("kafka.cboss.cdr.consumerId"); 
    SpoutConfig kafkaSpoutConfig = new SpoutConfig(hosts, topic, zkRoot, consumerGroupId); 
    kafkaSpoutConfig.scheme = new SchemeAsMultiScheme(new CbossCdrScheme()); 
    kafkaSpoutConfig.ignoreZkOffsets = true; 
    kafkaSpoutConfig.fetchSizeBytes = Integer.valueOf(topologyConfig.getProperty("kafka.fetchSizeBytes")); 
    kafkaSpoutConfig.bufferSizeBytes = Integer.valueOf(topologyConfig.getProperty("kafka.bufferSizeBytes")); 
    return kafkaSpoutConfig; 
} 

public void configureKafkaSpoutBandwidthTesterBolt(TopologyBuilder topologyBuilder) { 
    SimpleAckerBolt b = new SimpleAckerBolt(); 
    topologyBuilder.setBolt(SPOUT_BANDWIDTH_TESTER_BOLT_ID, b, Integer.valueOf(topologyConfig.getProperty(CFG_SIMPLE_ACKER_BOLT_PARALLELISM))) 
      .setNumTasks(Integer.valueOf(topologyConfig.getProperty(SPOUT_BANDWIDTH_TESTER_BOLT_NUM_TASKS))) 
      .localOrShuffleGrouping(KAFKA_CDR_SPOUT_ID); 
} 

其他拓撲結構設置:

topology.max.spout.pending=250 
topology.executor.receive.buffer.size=1024 
topology.executor.send.buffer.size=1024 
topology.receiver.buffer.size=8 
topology.transfer.buffer.size=1024 
topology.acker.executors=1 

我發動我的拓撲1名工人1卡夫卡脫粒機和1個簡單阿克爾螺栓。 這就是我在風暴UI獲得: Perfomance test 1

歐凱我在10分鐘1.5kk元組。螺栓的衝擊力約爲0.5。所以我的邏輯很簡單:如果我將噴嘴和螺栓平行提示 - 我會得到雙倍的性能。 下一個測試是1個工人2卡夫卡噴口,2個簡單的阿克爾螺栓和topology.acker.executors = 2。下面是結果:

Perfomance test 2

所以,我得到增加parallelizm提示更糟更流暢。爲什麼它會發生?我如何每秒處理元組?實際上任何測試與噴口並行性暗示大於2表示比1噴口執行器更糟糕的結果。

我已經檢查過:
1)這不是卡夫卡故障。主題有2個經紀人有20個分區。 4名工作人員的拓撲結構可獲得x4性能。
2)這不是服務器故障。服務器有40個核心和32Gb RAM。運行拓撲時,它消耗大約1/8的CPU,而幾乎沒有RAM。
3)改變topology.max.spout.pending參數沒有幫助。 4)增加Bolt或Acker並行性暗示甚至更不利於。

+0

你只用一個worker運行兩個測試,如果你添加了另一個worker,該怎麼辦?因此,與兩名工人一起進行第二次測試。 – morganw09dev

+0

謝謝你的回覆,摩根。你說得對。越來越多的工人給我比例的結果。有2個工人2個噴嘴,每秒我的元組數增加一倍。但是這個測試的想法是確定1名工人的最佳表現。我能得到的最好的結果是每10分鐘1,5kk元組,或每秒2500元組。我猜想在40核心,32GB內存和10Gb/s網絡的節點上,我可以做得更好。 – f1sherox

+0

1名員工,但40個核心並不合理。無論如何,每個工作人員都是單線程的,所以這意味着您的服務器有能力容納40名員工。您現在可以在單個核心服務器上獲得完全相同的性能。每個線程2500元/ s並不是很好,但也不算太差。 – C4stor

回答

0

所以看起來好像你對一名工人的表現感到極限。你只是讓一個工人做很多事情,而且不能處理所有事情。

在這一點上,如果你想進一步提高你的系統的性能,你有兩種選擇。

  1. 添加更多工人。
  2. 提高你一個工人的工作能力。

如果你不想增加更多的工人,那麼剩下的就是配置你的一個工人。然後,您應該調查一個工作人員的配置,爲其提供更多的內存,更多的CPU等。您應該查看Storm的default configuration options,看看調整某些配置值是否會提高性能。一些配置看起來更有可能比其他配置更有幫助:

worker.heap.memory.mb: 
worker.childopts: 
supervisor.childopts: 
supervisor.memory.capacity.mb: 
supervisor.cpu.capacity: 
+0

我試圖增加工人和主管的堆大小,但仍然沒有任何變化。我已更改的設置:supervisor.childopts,worker.childopts。此外,我試圖改變這個設置:topology.worker.shared.thread.pool.size,topology.worker.receiver.thread.count。這裏完整的配置http://pastebin.com/TEB6d3Ve – f1sherox

+0

我從來沒有真正做過很多的風暴優化,所以我不能在這方面做的很多。但除非你有一個合理的理由,否則我會建議增加工人的數量以獲得更好的表現。 – morganw09dev