0
喜歡風暴卡夫卡客戶端,我曾用風暴卡夫卡客戶端,但不能很好地工作,並寫一個新的噴口不工作了。 誰可以幫我寫一個樣品卡夫卡壺嘴。如何使用New Kafka Consumer API編寫樣本kafka噴嘴?
喜歡風暴卡夫卡客戶端,我曾用風暴卡夫卡客戶端,但不能很好地工作,並寫一個新的噴口不工作了。 誰可以幫我寫一個樣品卡夫卡壺嘴。如何使用New Kafka Consumer API編寫樣本kafka噴嘴?
定義Topology.java
import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
public class Topology{
public static void main(String[] args){
TopologyBuilder builder = new TopologyBuilder();
String zkHosts = StringUtils.join("127.0.0.1", ',');
BrokerHosts hosts = new ZkHosts(zkHosts);
SpoutConfig spoutConfig = new SpoutConfig(hosts, "kafkaTopic_name", "/kafkaTopic_name", "kafkaGroup_name");
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
spoutConfig.forceFromStart = forceFromStart;
builder.setSpout("events", new KafkaSpout(spoutConfig), 5).setNumTasks(5);
//...
}
}
基本上,你需要以創建kafkaSpout創建SpoutConfig。
也許我會讓你感到困惑,但我需要的是KafkaSpout.java,而不是拓撲。這是你的新東西。 – cutd