我想寫一個簡單的java程序,它將通過Spark從Cassandra讀取數據。我在POC級別做這個。我的代碼看起來像這樣Spark中的任務失敗ClassNotFoundException
String keyspace = "newkspace1";
String tablename = "newtable5";
public static void main(String[] args)
{
SparkConf conf = new SparkConf();
conf.setAppName("Cassandra Demo");
conf.setMaster("spark://ct-0094:7077");
conf.set("spark.cassandra.connection.host", "192.168.50.124");
conf.set("spark.cassandra.connection.native.port", "9041");
conf.set("spark.cassandra.connection.rpc.port", "9160");
PerformerClass app = new PerformerClass(conf);
app.run();
}
private void run()
{
JavaSparkContext sc = new JavaSparkContext(conf);
showResults(sc);
sc.stop();
}
private void showResults(JavaSparkContext sc)
{
CassandraJavaPairRDD<Integer, Integer> rdd1 = javaFunctions(sc)
.cassandraTable(keyspace, tablename, mapColumnTo(Integer.class), mapColumnTo(Integer.class))
.select("keyval", "rangefield");
List<Integer> lst = rdd1.keys().toArray();
for(Integer l : lst)
{
System.out.println(l);
}
}
當我運行上面的代碼中,我得到了以下異常(堆棧跟蹤粘貼以下)
15/01/15 19時22分41秒WARN scheduler.TaskSetManager :在階段0.0(TID 1,ct-0094)中丟失的任務1.0:java.lang.ClassNotFoundException:com.datastax.spark.connector.rdd.partitioner.CassandraPartition at java.net.URLClassLoader $ 1.run(URLClassLoader.java: (原方法) at java.net.URLClassLoader.findClass(URLClassLoader.java:360) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:340) at org.apache.spark.serializer.JavaDeserializationStream $$ anon $ 1.resolveClass(JavaSerializer.scala:59 ) [...]
我錯過了什麼?
看起來不正確,你能建立這個例子並運行它嗎? https://github.com/rssvihla/spark_commons/blob/master/examples/spark_bulk_operations/src/main/java/pro/foundev/java/AbstractCassandraConnectorClass.java – phact
實際上,當我試圖在IDE中運行我的代碼時,我得到了上述例外。所以我使用了'sc.addJar()'。但是如果你想構建JAR並直接在Spark集羣上運行它,那麼你不需要sc.addJar()。 (這是我的理解。) –
您是否正在使用spark-submit運行您的工作? – phact