3

我有一個Java應用程序,其中,我有Spark-1.4.0Cassandra-2.1.5Cassandra-Spark-connection-1.4.0-M1卡桑德拉不與UDT合作

在此應用程序中,我試圖使用Dataframe或使用javaFunctions class將Java Bean類存儲到Cassandra表中,其中有一些UDTs

messages.foreachRDD(new Function2<JavaRDD<Message>,Time,Void>(){ 
    @Override 
    public Void call(JavaRDD<Message> arg0, Time arg1) throws Exception { 
     javaFunctions(arg0).writerBuilder(
       Properties.getString("spark.cassandra.keyspace"), 
       Properties.getString("spark.cassandra.table"), 
       mapToRow(Message.class)).saveToCassandra(); 

OR

messages.foreachRDD(new Function2<JavaRDD<Message>,Time,Void>(){ 
    @Override 
    public Void call(JavaRDD<Message> arg0, Time arg1) throws Exception { 

     SQLContext sqlContext = SparkConnection.getSqlContext(); 
     DataFrame df = sqlContext.createDataFrame(arg0, Message.class); 

     df.write() 
       .mode(SaveMode.Append) 
       .option("keyspace",Properties.getString("spark.cassandra.keyspace")) 
       .option("table",Properties.getString("spark.cassandra.table")) 
       .format("org.apache.spark.sql.cassandra").save(); 

但我得到這個錯誤

15/06/16 19:51:38 INFO CassandraConnector: Disconnected from Cassandra cluster: BDI Cassandra 
15/06/16 19:51:39 WARN TaskSetManager: Lost task 0.0 in stage 4.0 (TID 4, 192.168.1.19): com.datastax.spark.connector.types.TypeConversionException: Cannot convert object null to com.datastax.spark.connector.UDTValue. 
    at com.datastax.spark.connector.types.TypeConverter$$anonfun$convert$1.apply(TypeConverter.scala:44) 
    at com.datastax.spark.connector.types.TypeConverter$$anonfun$convert$1.apply(TypeConverter.scala:40) 
    at com.datastax.spark.connector.types.UserDefinedType$$anon$1$$anonfun$convertPF$1.applyOrElse(UserDefinedType.scala:33) 
    at com.datastax.spark.connector.types.TypeConverter$class.convert(TypeConverter.scala:40) 
    at com.datastax.spark.connector.types.UserDefinedType$$anon$1.convert(UserDefinedType.scala:31) 
    at com.datastax.spark.connector.writer.SqlRowWriter$$anonfun$readColumnValues$1.apply$mcVI$sp(SqlRowWriter.scala:21) 
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) 
    at com.datastax.spark.connector.writer.SqlRowWriter.readColumnValues(SqlRowWriter.scala:20) 
    at com.datastax.spark.connector.writer.SqlRowWriter.readColumnValues(SqlRowWriter.scala:8) 
    at com.datastax.spark.connector.writer.BoundStatementBuilder.bind(BoundStatementBuilder.scala:35) 
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:106) 
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:31) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:31) 
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:135) 
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:119) 
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:102) 
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:101) 
    at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:130) 
    at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:101) 
    at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:119) 
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36) 
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) 
    at org.apache.spark.scheduler.Task.run(Task.scala:70) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 

以前,我能夠成功地使用映射類的消息對象保存到卡桑德拉表。

MappingManager mapping=new MappingManager(session); 
       Mapper<Message> mapper=mapping.mapper(Message.class); 
       mapper.save(message); 

這是我的Java Bean

import com.datastax.driver.mapping.annotations.FrozenKey; 
import com.datastax.driver.mapping.annotations.Table; 
@Table(name = "data") 
public class Message implements Serializable{ 
    private static final long serialVersionUID = 42L; 
    private String admin; 
    private String searchname; 
    private String searchsource; 
    private String searchtype; 
    private String messageid; 
    private String message; 
    @FrozenKey 
    private List<Action> actions; 
    @Frozen 
    private AdminCreator admincreator; 
    @Frozen 
    private AppReference appreference; 
    private String caption; 
    @Frozen 
    private Reference referencefrom; 
    private String icon; 
    private Boolean ishidden; 
       ..... 
       ..... 
       ..... 
+0

https://datastax-oss.atlassian.net/browse/SPARKC-271:錯誤提交 – RussS

回答