我正在閱讀使用spark-streaming的kafka流消息。 現在我想將Cassandra設置爲我的輸出。 我已創建在卡桑德拉表「TEST_TABLE」與列「鍵:文本主鍵」和「值:文本」 我已映射的數據成功打入JavaDStream<Tuple2<String,String>> data
這樣的:spark-streaming:如何輸出流數據到cassandra
JavaSparkContext sc = new JavaSparkContext("local[4]", "SparkStream",conf);
JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(3000));
JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap);
JavaDStream<Tuple2<String,String>> data = messages.map(new Function< Tuple2<String,String>, Tuple2<String,String> >()
{
public Tuple2<String,String> call(Tuple2<String, String> message)
{
return new Tuple2<String,String>(message._1(), message._2());
}
}
);
然後,我已經創建了一個清單:
List<TestTable> list = new ArrayList<TestTable>();
其中TestTable的是具有相同的結構,我的卡珊德拉表我的自定義類,其成員「鍵」和「值」:
class TestTable
{
String key;
String val;
public TestTable() {}
public TestTable(String k, String v)
{
key=k;
val=v;
}
public String getKey(){
return key;
}
public void setKey(String k){
key=k;
}
public String getVal(){
return val;
}
public void setVal(String v){
val=v;
}
public String toString(){
return "Key:"+key+",Val:"+val;
}
}
請建議如何將JavaDStream<Tuple2<String,String>> data
中的數據添加到List<TestTable> list
中。 我這樣做,這樣我可以隨後使用
JavaRDD<TestTable> rdd = sc.parallelize(list);
javaFunctions(rdd, TestTable.class).saveToCassandra("testkeyspace", "test_table");
到RDD數據保存到卡桑德拉。
我曾試圖編碼是這樣的:
messages.foreachRDD(new Function<Tuple2<String,String>, String>()
{
public List<TestTable> call(Tuple2<String,String> message)
{
String k = message._1();
String v = message._2();
TestTable tbl = new TestTable(k,v);
list.put(tbl);
}
}
);
但似乎有些類型不匹配happenning。 請幫忙。
@ maasg-謝謝你,現在我可以創建一個連接。但現在數據沒有被插入到cassandra表中。日誌顯示它正在連接,下一秒會斷開連接。完整的代碼和日誌和依賴關係在「http://stackoverflow.com/questions/27386223/spark-data-not-getting-written-into-cassandra-zero-rows-inserted」 – aiman 2014-12-09 18:33:42