回答

0

定義Topology.java

import storm.kafka.BrokerHosts; 
import storm.kafka.KafkaSpout; 
import storm.kafka.SpoutConfig; 
import storm.kafka.StringScheme; 
import storm.kafka.ZkHosts; 

public class Topology{ 
    public static void main(String[] args){ 
    TopologyBuilder builder = new TopologyBuilder(); 
    String zkHosts = StringUtils.join("127.0.0.1", ','); 

     BrokerHosts hosts = new ZkHosts(zkHosts); 
     SpoutConfig spoutConfig = new SpoutConfig(hosts, "kafkaTopic_name", "/kafkaTopic_name", "kafkaGroup_name"); 
     spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); 
     spoutConfig.forceFromStart = forceFromStart; 
     builder.setSpout("events", new KafkaSpout(spoutConfig), 5).setNumTasks(5); 
     //... 
    } 
} 

基本上,你需要以創建kafkaSpout創建SpoutConfig。

+0

也許我會讓你感到困惑,但我需要的是KafkaSpout.java,而不是拓撲。這是你的新東西。 – cutd

相關問題