2017-06-15 57 views
2

在我們的風暴1.0.2應用程序中,我們正面臨內存異常。調試後,我們看到卡夫卡噴嘴向螺栓發出太多消息。螺栓的運行速度接近4.0。那麼是否有一種方法可以在風暴中啓用背壓,以便根據螺栓中的容量排出噴口。嘗試啓用topology.backpressure.enable爲true,但跑到這個問題https://issues.apache.org/jira/browse/STORM-1949。我們正在使用KafkaSpout開箱即用的實現,並擴展BaseRichBolt以實現我們的螺栓。我們的DAG是線性的。風暴中的背壓

回答

2

您可以在拓撲配置設置maxSpoutPending值處理KafkaSpout的背壓,

Config config = new Config(); 
config.setMaxSpoutPending(200); 
config.setMessageTimeoutSecs(100); 

StormSubmitter.submitTopology("testtopology", config, builder.createTopology()); 

maxSpoutPending是可以在給定的時間內沒有任何拓撲確認的記錄數。設置這個屬性將會使KafkaSpout不會從Kafka中消耗更多的數據,除非未確認的元組數小於maxSpoutPending值。