3
我剛剛嘗試了這裏提到的卡夫卡風暴噴口https://github.com/nathanmarz/storm-contrib/tree/master/storm-kafka,我所用的配置如下所述。風暴 - 卡夫卡噴口慢慢消耗
BrokerHosts brokerHosts = KafkaConfig.StaticHosts.fromHostString(
ImmutableList.of("localhost"), 1);
SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, // list of Kafka
"test", // topic to read from
"/kafkastorm", // the root path in Zookeeper for the spout to
"discovery"); // an id for this consumer for storing the
// consumer offsets in Zookeeper
spoutConfig.scheme = new StringScheme();
spoutConfig.stateUpdateIntervalMs = 1000;
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
TridentTopology topology = new TridentTopology();
InetSocketAddress inetSocketAddress = new InetSocketAddress(
"localhost", 6379);
TridentState wordsCount = topology
.newStream(SPOUT_FIRST, kafkaSpout)
.parallelismHint(1)
.each(new Fields("str"), new TestSplit(), new Fields("words"))
.groupBy(new Fields("words"))
.persistentAggregate(
RedisState.transactional(inetSocketAddress),
new Count(), new Fields("counts")).parallelismHint(100);
Config conf = new Config();
conf.setMaxTaskParallelism(200);
// conf.setDebug(true);
// conf.setMaxSpoutPending(20);
// This topology can only be run as local because it is a toy example
LocalDRPC drpc = new LocalDRPC();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("symbolCounter", conf, topology.build());
但速度在從卡夫卡主題上述噴口取消息是圍繞7000 /秒,但我預期每秒左右50000的消息的負載。我嘗試過在spoutConfig中增加提取緩衝區大小的各種選項,但沒有可見的結果。
有沒有遇到類似的類型的問題,他不能通過風暴與生產者生產消息的速度獲取卡夫卡話題?
只是一個想法,可能是因爲暴風雨而不是KafkaSpout造成的,你有沒有看到任何不同之處?您的主題的分區數量是多少?還有降低maxSpoutPending值的任何變化? – user2720864
@ user2720864問題是,kafspout能夠獲取所有大量的消息,但是它在更新zookeeper階段的過程中需要一些時間,這個階段將持續到它自己的階段。我會嘗試你的建議。 –
@ user2720864:我將配置中的「topology.spout.max.batch.size」值更新爲約64 * 1024的值,然後風暴處理變得快速。 –