我正嘗試使用flink從kafka流式傳輸數據。我的代碼在編譯時沒有錯誤,但運行出現以下錯誤:當與kafka連接器運行flink時出現NoClassDefFoundError
Error: A JNI error has occurred, please check your installation and try again
Exception in thread "main" java.lang.NoClassDefFoundError:
org/apache/flink/streaming/util/serialization/DeserializationSchema
at java.lang.Class.getDeclaredMethods0(Native Method)
at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
at java.lang.Class.privateGetMethodRecursive(Class.java:3048)
at java.lang.Class.getMethod0(Class.java:3018)
at java.lang.Class.getMethod(Class.java:1784)
at sun.launcher.LauncherHelper.validateMainClass(LauncherHelper.java:544)
at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:526)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.util.serialization.DeserializationSchema
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 7 more
我的POM依賴列表如下:
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-core</artifactId>
<version>0.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>0.10.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.9_2.11</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
<version>1.1</version>
</dependency>
</dependencies>
,我想只是運行Java代碼訂閱卡夫卡題目叫 '流光':
import java.util.Properties;
import java.util.Arrays;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
public class StreamConsumer {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "samplegroup");
DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer09<String>("streamer", new SimpleStringSchema(), properties));
messageStream.rebalance().map(new MapFunction<String, String>() {
private static final long serialVersionUID = -6867736771747690202L;
@Override
public String map(String value) throws Exception {
return "Streamed data: " + value;
}
}).print();
env.execute();
}
}
系統信息:
1.卡夫卡版本:0.9.0.1
2.弗林克版本:1.3.2
3. OpenJDK的版本:1.8
雖然我使用maven,我不認爲這是任何Maven的問題,因爲我得到了同樣的錯誤,甚至當我嘗試沒有行家。我手動將所有必需的.jar文件下載到一個文件夾,並在使用javac編譯時使用-cp選項指定該文件夾路徑。在運行期間,我得到與上面相同的錯誤,但在編譯期間沒有錯誤。
我想,限制類文件的範圍,但這裏提到的版本是最新的。 – raviabhiram
flink-streaming-core不再存在,並且被flink-streaming-java_包含。 flink-clients現在也有一個scala後綴。 –
我嘗試過使用這些依賴關係,但仍然有相同的錯誤。我已經添加了上面的Java代碼。 – raviabhiram