2017-07-27 50 views
0

我試圖運行一個簡單的Apache Flink腳本與卡夫卡指令,但我一直有執行問題。 該腳本應該讀取來自kafka製作者的消息,詳細闡述它們,然後再發送回另一個主題,即處理結果。 我從這裏得到這個例子: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Simple-Flink-Kafka-Test-td4828.htmlkafka-apache flink執行log4j錯誤

我的錯誤是:

Exception in thread "main" java.lang.NoSuchFieldError:ALL 
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenera‌tor.createJobGraph(S‌​treamingJobGraphGene‌​rator.java:86) 
at org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph‌​(StreamGraph.java:42‌​9) 
at org.apache.flink.streaming.api.environment.LocalStreamEnviro‌nment.execute(LocalS‌​treamEnvironment.jav‌​a:46) 

在org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(當地人treamEnvironment。 JAV一:33)

這是我的代碼:

public class App { 
     public static void main(String[] args) throws Exception { 
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
      Properties properties = new Properties(); 
      properties.setProperty("bootstrap.servers", "localhost:9092"); 

      //properties.setProperty("zookeeper.connect", "localhost:2181"); 
      properties.setProperty("group.id", "javaflink"); 

      DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer010<String>("test", new SimpleStringSchema(), properties)); 
      System.out.println("Step D"); 
      messageStream.map(new MapFunction<String, String>(){ 

        public String map(String value) throws Exception { 
          // TODO Auto-generated method stub 
          return "Blablabla " + value; 
        } 
      }).addSink(new FlinkKafkaProducer010("localhost:9092", "demo2", new SimpleStringSchema())); 
      env.execute(); 
     } 
} 

這些都是依賴新生的pom.xml Dencies:

<dependency> 
    <groupId>org.apache.flink</groupId> 
    <artifactId>flink-core</artifactId> 
    <version>1.3.1</version> 
</dependency> 
<dependency> 
    <groupId>org.apache.flink</groupId> 
    <artifactId>flink-java_2.11</artifactId> 
    <version>0.10.2</version> 
</dependency> 
<dependency> 
    <groupId>org.apache.flink</groupId> 
    <artifactId>flink-clients_2.11</artifactId> 
    <version>1.3.1</version> 
</dependency> 
<dependency> 
    <groupId>org.apache.flink</groupId> 
    <artifactId>flink-streaming-core</artifactId> 
    <version>0.9.1</version> 
</dependency> 
<dependency> 
    <groupId>org.apache.flink</groupId> 
    <artifactId>flink-streaming-java_2.11</artifactId> 
    <version>1.3.1</version> 
    <scope>provided</scope> 
</dependency> 
<dependency> 
    <groupId>org.apache.flink</groupId> 
    <artifactId>flink-connector-kafka-0.10_2.11</artifactId> 
    <version>1.3.1</version> 
</dependency> 

什麼會導致這種錯誤?

感謝 盧卡

+0

這不是錯誤。這只是一個警告。無論如何,你的工作應該工作。 –

回答

0

的問題很可能是由不同版本弗林克的混合導致您已在pom.xml定義。爲了運行該程序,它應該足以包含以下依賴項:

<!-- Streaming API --> 
<dependency> 
    <groupId>org.apache.flink</groupId> 
    <artifactId>flink-streaming-java_2.11</artifactId> 
    <version>1.3.1</version> 
</dependency> 

<!-- In order to execute the program from within your IDE --> 
<dependency> 
    <groupId>org.apache.flink</groupId> 
    <artifactId>flink-clients_2.11</artifactId> 
    <version>1.3.1</version> 
</dependency> 

<!-- Kafka connector dependency --> 
<dependency> 
    <groupId>org.apache.flink</groupId> 
    <artifactId>flink-connector-kafka-0.10_2.11</artifactId> 
    <version>1.3.1</version> 
</dependency> 
+0

謝謝。這解決了我的問題! :D –

+0

非常感謝。如果我的答案解決了您的問題,那麼您可以接受它,以便其他人看到這可以解決此問題。 –