我想重新平衡我使用KafkaSpout的Storm拓撲。我的代碼是:風暴拓撲重新平衡使用Java代碼
TopologyBuilder builder = new TopologyBuilder();
Properties kafkaProps = new Properties();
kafkaProps.put("zk.connect", "localhost:2181");
kafkaProps.put("zk.connectiontimeout.ms", "1000000");
kafkaProps.put("groupid", "storm");
builder.setSpout("kafkaSpout" , new KafkaSpout(kafkaProps, "test"), 3);
builder.setBolt("eventBolt", new EventBolt(), 2).shuffleGrouping("kafkaSpout", "eventStream");
builder.setBolt("tableBolt", new TableBolt(), 2).shuffleGrouping("kafkaSpout", "tableStream");
Map<String, Object> conf = new HashMap<String, Object>();
conf.put(Config.TOPOLOGY_DEBUG, true);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
Utils.sleep(1000*5);
List<TopologySummary> topologySummaries = cluster.getClusterInfo().get_topologies();
for (TopologySummary summary : topologySummaries) {
StormTopology topology = cluster.getTopology(summary.get_id());
RebalanceOptions options = new RebalanceOptions();
options.set_wait_secs(0);
options.set_num_workers(4);
for (String name : topology.get_bolts().keySet()) {
System.err.println(name + " " + topology.get_bolts().get(name).get_common().get_json_conf());
options.put_to_num_executors(name , 5);
}
for (String name : topology.get_spouts().keySet()) {
System.err.println(name + " " + topology.get_spouts().get(name).get_common().get_json_conf());
options.put_to_num_executors(name , 5);
}
cluster.rebalance(summary.get_name() , options);
}
但是,重新平衡過程中,以下錯誤跟蹤顯示:
10341 [storm_rishabh-1361473654345-95461d10_watcher_executor] INFO kafka.consumer.ZookeeperConsumerConnector - storm_rishabh-1361473654345-95461d10 begin rebalancing consumer storm_rishabh-1361473654345-95461d10 try #1
10341 [storm_rishabh-1361473654345-3b26ed76_watcher_executor] INFO kafka.consumer.ZookeeperConsumerConnector - storm_rishabh-1361473654345-3b26ed76 begin rebalancing consumer storm_rishabh-1361473654345-3b26ed76 try #1
10342 [storm_rishabh-1361473654345-95461d10_watcher_executor] ERROR kafka.consumer.ZookeeperConsumerConnector - storm_rishabh-1361473654345-95461d10 error during syncedRebalance
java.lang.NullPointerException: null
at kafka.utils.ZkUtils$.getChildrenParentMayNotExist(ZkUtils.scala:181) ~[kafka_2.9.2-0.7.0.jar:na]
at kafka.utils.ZkUtils$.getCluster(ZkUtils.scala:202) ~[kafka_2.9.2-0.7.0.jar:na]
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:447) ~[kafka_2.9.2-0.7.0.jar:na]
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:78) ~[scala-library-2.9.2.jar:na]
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:444) ~[kafka_2.9.2-0.7.0.jar:na]
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:401) ~[kafka_2.9.2-0.7.0.jar:na]
10342 [storm_rishabh-1361473654345-3b26ed76_watcher_executor] ERROR kafka.consumer.ZookeeperConsumerConnector - storm_rishabh-1361473654345-3b26ed76 error during syncedRebalance
java.lang.NullPointerException: null
at kafka.utils.ZkUtils$.getChildrenParentMayNotExist(ZkUtils.scala:181) ~[kafka_2.9.2-0.7.0.jar:na]
at kafka.utils.ZkUtils$.getCluster(ZkUtils.scala:202) ~[kafka_2.9.2-0.7.0.jar:na]
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:447) ~[kafka_2.9.2-0.7.0.jar:na]
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:78) ~[scala-library-2.9.2.jar:na]
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:444) ~[kafka_2.9.2-0.7.0.jar:na]
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:401) ~[kafka_2.9.2-0.7.0.jar:na]
10342 [storm_rishabh-1361473654345-95461d10_watcher_executor] INFO kafka.consumer.ZookeeperConsumerConnector - storm_rishabh-1361473654345-95461d10 stopping watcher executor thread for consumer storm_rishabh-1361473654345-95461d10
10343 [storm_rishabh-1361473654345-3b26ed76_watcher_executor] INFO kafka.consumer.ZookeeperConsumerConnector - storm_rishabh-1361473654345-3b26ed76 stopping watcher executor thread for consumer storm_rishabh-1361473654345-3b26ed76
有人能告訴我什麼可以是問題?我是否需要在kafkaSpout中定義更多內容,以便在重新平衡時正確關閉並重新啓動?