2016-05-12 105 views
0

我從linux終端的流中獲得一些條目,將它們分配爲lines,將它們分成words。但不是將它們打印出來,我想將它們保存到Cassandra。 我有一個名爲ks的Keyspace,裏面有一張名爲record的桌子。 我知道像CassandraStreamingJavaUtil.javaFunctions(words).writerBuilder("ks", "record").saveToCassandra();這樣的代碼必須做這項工作,但我想我做錯了什麼。有人可以幫忙嗎?如何使用java將spark數據保存到cassandra?

這裏是我的卡桑德拉ks.record架構(我通過CQLSH添加了這些數據)

id | birth_date      | name 
----+---------------------------------+----------- 
10 | 1987-12-01 23:00:00.000000+0000 | Catherine 
11 | 2004-09-07 22:00:00.000000+0000 | Isadora 
1 | 2016-05-10 13:00:04.452000+0000 |  John 
2 | 2016-05-10 13:00:04.452000+0000 |  Troy 
12 | 1970-10-01 23:00:00.000000+0000 |  Anna 
3 | 2016-05-10 13:00:04.452000+0000 | Andrew 

這裏是我的Java代碼:

import com.datastax.spark.connector.japi.CassandraStreamingJavaUtil; 
import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.JavaRDD; 
import org.apache.spark.api.java.function.FlatMapFunction; 
import org.apache.spark.api.java.function.Function2; 
import org.apache.spark.api.java.function.PairFunction; 
import org.apache.spark.streaming.Durations; 
import org.apache.spark.streaming.api.java.JavaDStream; 
import org.apache.spark.streaming.api.java.JavaPairDStream; 
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; 
import org.apache.spark.streaming.api.java.JavaStreamingContext; 
import scala.Tuple2; 

import java.util.Arrays; 

import static com.datastax.spark.connector.japi.CassandraJavaUtil.javaFunctions; 
import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow; 
import static com.datastax.spark.connector.japi.CassandraStreamingJavaUtil.*; 


public class CassandraStreaming2 { 
    public static void main(String[] args) { 

     // Create a local StreamingContext with two working thread and batch interval of 1 second 
     SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("CassandraStreaming"); 
     JavaStreamingContext sc = new JavaStreamingContext(conf, Durations.seconds(1)); 

     // Create a DStream that will connect to hostname:port, like localhost:9999 
     JavaReceiverInputDStream<String> lines = sc.socketTextStream("localhost", 9999); 

     // Split each line into words 
     JavaDStream<String> words = lines.flatMap(
       (FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" ")) 
     ); 

     words.print(); 
     //CassandraStreamingJavaUtil.javaFunctions(words).writerBuilder("ks", "record").saveToCassandra(); 

     sc.start();    // Start the computation 
     sc.awaitTermination(); // Wait for the computation to terminate 

    } 
} 
+0

什麼錯誤你好嗎? – RussS

+0

錯誤:錯誤:(38,60)java:method writerBuilder in class com.datastax.spark.connector.japi.RDDAndDStreamCommonJavaFunctions 不能應用於給定的類型; 必需:java.lang.String,java.lang.String,com.datastax.spark.connector.writer.RowWriterFactory found:java.lang.String,java.lang.String reason:實際和正式的參數列表長度不同' 行'CassandraStreamingJavaUtil.javaFunctions(words).writerBuilder(「ks」,「record」)。saveToCassandra(); ' – Arsinux

回答

1

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/7_java_api.md#saving-data-to-cassandra

按照該文檔,您還需要通過RowWriter工廠。執行此操作的最常見方法是使用mapToRow(Class) api,這是所描述的缺失參數。

但是你有一個額外的問題,你的代碼還沒有以可以寫入C *的方式指定數據。您的JavaDStream僅爲String s。對於給定的模式,單個String無法制作成Cassandra行。

基本上你說的是連接器

Write "hello" to CassandraTable (id, birthday, value)

沒有告訴它在hello去(又該ID是什麼?應該生日是什麼呢?)

+0

我無法解決您提到的「額外問題」。你能解釋我應該怎麼做到這一點? – Arsinux

+1

您缺少必填字段。你需要提供它們。在C *中,您總是必須至少指定主鍵。這意味着您要寫入C *的對象必須具有「id」和「birthday」的值。這可能意味着你可以編寫'Tuple3(id,timestamp,string)'或者創建一個自定義類,比如'RowClass(Integer id;時間戳生日,字符串值; Integer getId()...' – RussS

+0

你是對的。解決了你提到的問題,那些幫助它工作的代碼就是改變'JavaDStream words = lines.flatMap( (FlatMapFunction )x - > Arrays.asList(x.split(「」)) );'into JavaDStream words = lines.flatMap( (FlatMapFunction )x - > Arrays.asList(x.split(「」)) ).map(s - > new Word(s) );'。 Word類是一個將cassandra表映射到我從終端獲得的類的類 – Arsinux

相關問題