2015-06-16 29 views
0

我試圖獲得一個在風暴中運行的非常簡單的卡夫卡噴口。我的主要功能是Exclamation示例的一個非常簡單的改編;storm-kafka forceStartOffsetTime(int)符號找不到符號

TopologyBuilder builder = new TopologyBuilder(); 
String zkConnString = "localhost:2181"; 
String brokerZkPath = "/kafka"; 
String topicName = "test"; 
BrokerHosts hosts = new ZkHosts(zkConnString); 
SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "" , "test123"); 
spoutConfig.forceStartOffsetTime(-2); 
//spoutConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime(); 
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); 
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig); 

//builder.setSpout("word", new TestWordSpout(), 10); 
builder.setSpout("word", kafkaSpout, 1); 
builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word"); 
builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1"); 

當我做MVN包,我得到了以下錯誤:

[ERROR] COMPILATION ERROR : 
[INFO] ------------------------------------------------------------- 
[ERROR] /usr/local/apache-storm-0.9.2-incubating/examples/test-example/my-app/src/main/java/com/mycompany/app/ExclamationTopology.java:[89,16] cannot find symbol 
    symbol: method forceStartOffsetTime(int) 
    location: variable spoutConfig of type storm.kafka.SpoutConfig 
[INFO] 1 error 
[INFO] ------------------------------------------------------------- 
[INFO] ------------------------------------------------------------------------ 
[INFO] BUILD FAILURE 
[INFO] ------------------------------------------------------------------------ 
[INFO] Total time: 2.268 s 
[INFO] Finished at: 2015-06-16T19:22:10-04:00 
[INFO] Final Memory: 26M/446M 
[INFO] ------------------------------------------------------------------------ 
[ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) on project my-app: Compilation failure 
[ERROR] /usr/local/apache-storm-0.9.2-incubating/examples/test-example/my-app/src/main/java/com/mycompany/app/ExclamationTopology.java:[89,16] cannot find symbol 
[ERROR] symbol: method forceStartOffsetTime(int) 
[ERROR] location: variable spoutConfig of type storm.kafka.SpoutConfig 
[ERROR] -> [Help 1] 
[ERROR] 
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch. 
[ERROR] Re-run Maven using the -X switch to enable full debug logging. 
[ERROR] 
[ERROR] For more information about the errors and possible solutions, please read the following articles: 
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException 

如果我刪除行有 'spoutConfig.forceStartOffsetTime(-2);',它編譯成功。但是,如果使用:spoutConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();它不會將偏移設置爲主題中最早的偏移量。如果我設置spoutConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime(),偏移量設置爲4而不是0.我真的需要一種方法來設置它的偏移量最早的。

我的pom.xml的依賴關係:

<dependencies> 
    <dependency> 
     <groupId>junit</groupId> 
     <artifactId>junit</artifactId> 
     <scope>test</scope> 
    </dependency> 
    <dependency> 
     <groupId>org.testng</groupId> 
     <artifactId>testng</artifactId> 
     <version>6.8.5</version> 
     <scope>test</scope> 
    </dependency> 
    <dependency> 
     <groupId>org.mockito</groupId> 
     <artifactId>mockito-all</artifactId> 
     <scope>test</scope> 
    </dependency> 
    <dependency> 
     <groupId>org.easytesting</groupId> 
     <artifactId>fest-assert-core</artifactId> 
     <version>2.0M8</version> 
     <scope>test</scope> 
    </dependency> 
    <dependency> 
     <groupId>org.jmock</groupId> 
     <artifactId>jmock</artifactId> 
     <version>2.6.0</version> 
     <scope>test</scope> 
    </dependency> 
    <dependency> 
    <groupId>org.twitter4j</groupId> 
    <artifactId>twitter4j-stream</artifactId> 
    <version>3.0.3</version> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.storm</groupId> 
     <artifactId>storm-core</artifactId> 
     <version>${project.version}</version> 
     <!-- keep storm out of the jar-with-dependencies --> 
     <scope>provided</scope> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.storm</groupId> 
     <artifactId>storm-kafka</artifactId> 
     <version>0.9.2-incubating</version> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.kafka</groupId> 
     <artifactId>kafka_2.10</artifactId> 
     <version>0.8.2.0</version> 
     <!-- 
     <artifactId>kafka_2.9.2</artifactId> 
     <version>0.8.1.1</version> 
     <exclusions> 
      <exclusion> 
       <groupId>org.apache.zookeeper</groupId> 
       <artifactId>zookeeper</artifactId> 
      </exclusion> 
      <exclusion> 
       <groupId>log4j</groupId> 
       <artifactId>log4j</artifactId> 
      </exclusion> 
     </exclusions> 
    </dependency> 
    <dependency> 
     <groupId>commons-collections</groupId> 
     <artifactId>commons-collections</artifactId> 
     <version>3.2.1</version> 
    </dependency> 
    <dependency> 
     <groupId>com.google.guava</groupId> 
     <artifactId>guava</artifactId> 
    </dependency> 
</dependencies> 

回答

0

我相信storm 0.9.2 -incubatingthe correct property should beforceFromStart它接受Boolean值,而不是int

所以你可以做點像spoutConfig.forceFromStart = true

+0

它的工作表示感謝! – niepan

+1

@niepan如果您的問題的解決方案正確無誤,請考慮接受此答案 – Sankalp