我從linux終端的流中獲得一些條目,將它們分配爲lines
,將它們分成words
。但不是將它們打印出來,我想將它們保存到Cassandra。 我有一個名爲ks
的Keyspace,裏面有一張名爲record
的桌子。 我知道像CassandraStreamingJavaUtil.javaFunctions(words).writerBuilder("ks", "record").saveToCassandra();
這樣的代碼必須做這項工作,但我想我做錯了什麼。有人可以幫忙嗎?如何使用java將spark數據保存到cassandra?
這裏是我的卡桑德拉ks.record架構(我通過CQLSH添加了這些數據)
id | birth_date | name
----+---------------------------------+-----------
10 | 1987-12-01 23:00:00.000000+0000 | Catherine
11 | 2004-09-07 22:00:00.000000+0000 | Isadora
1 | 2016-05-10 13:00:04.452000+0000 | John
2 | 2016-05-10 13:00:04.452000+0000 | Troy
12 | 1970-10-01 23:00:00.000000+0000 | Anna
3 | 2016-05-10 13:00:04.452000+0000 | Andrew
這裏是我的Java代碼:
import com.datastax.spark.connector.japi.CassandraStreamingJavaUtil;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
import java.util.Arrays;
import static com.datastax.spark.connector.japi.CassandraJavaUtil.javaFunctions;
import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow;
import static com.datastax.spark.connector.japi.CassandraStreamingJavaUtil.*;
public class CassandraStreaming2 {
public static void main(String[] args) {
// Create a local StreamingContext with two working thread and batch interval of 1 second
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("CassandraStreaming");
JavaStreamingContext sc = new JavaStreamingContext(conf, Durations.seconds(1));
// Create a DStream that will connect to hostname:port, like localhost:9999
JavaReceiverInputDStream<String> lines = sc.socketTextStream("localhost", 9999);
// Split each line into words
JavaDStream<String> words = lines.flatMap(
(FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" "))
);
words.print();
//CassandraStreamingJavaUtil.javaFunctions(words).writerBuilder("ks", "record").saveToCassandra();
sc.start(); // Start the computation
sc.awaitTermination(); // Wait for the computation to terminate
}
}
什麼錯誤你好嗎? – RussS
錯誤:錯誤:(38,60)java:method writerBuilder in class com.datastax.spark.connector.japi.RDDAndDStreamCommonJavaFunctions不能應用於給定的類型; 必需:java.lang.String,java.lang.String,com.datastax.spark.connector.writer.RowWriterFactory found:java.lang.String,java.lang.String reason:實際和正式的參數列表長度不同' 行'CassandraStreamingJavaUtil.javaFunctions(words).writerBuilder(「ks」,「record」)。saveToCassandra(); ' –
Arsinux