2014-12-05 41 views
3

我正在閱讀使用spark-streaming的kafka流消息。 現在我想將Cassandra設置爲我的輸出。 我已創建在卡桑德拉表「TEST_TABLE」與列「鍵:文本主鍵」和「值:文本」 我已映射的數據成功打入JavaDStream<Tuple2<String,String>> data這樣的:spark-streaming:如何輸出流數據到cassandra

JavaSparkContext sc = new JavaSparkContext("local[4]", "SparkStream",conf); 
JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(3000)); 

JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap); 
JavaDStream<Tuple2<String,String>> data = messages.map(new Function< Tuple2<String,String>, Tuple2<String,String> >() 
{ 
    public Tuple2<String,String> call(Tuple2<String, String> message) 
    { 
     return new Tuple2<String,String>(message._1(), message._2()); 
    } 
} 
); 

然後,我已經創建了一個清單:

List<TestTable> list = new ArrayList<TestTable>(); 

其中TestTable的是具有相同的結構,我的卡珊德拉表我的自定義類,其成員「鍵」和「值」:

class TestTable 
{ 
    String key; 
    String val; 

    public TestTable() {} 

    public TestTable(String k, String v) 
    { 
     key=k; 
     val=v; 
    } 

    public String getKey(){ 
     return key; 
    } 

    public void setKey(String k){ 
     key=k; 
    } 

    public String getVal(){ 
     return val; 
    } 

    public void setVal(String v){ 
     val=v; 
    } 

    public String toString(){ 
     return "Key:"+key+",Val:"+val; 
    } 
} 

請建議如何將JavaDStream<Tuple2<String,String>> data中的數據添加到List<TestTable> list中。 我這樣做,這樣我可以隨後使用

JavaRDD<TestTable> rdd = sc.parallelize(list); 
javaFunctions(rdd, TestTable.class).saveToCassandra("testkeyspace", "test_table"); 

到RDD數據保存到卡桑德拉。

我曾試圖編碼是這樣的:

messages.foreachRDD(new Function<Tuple2<String,String>, String>() 
         { 
          public List<TestTable> call(Tuple2<String,String> message) 
          { 
           String k = message._1(); 
           String v = message._2(); 
           TestTable tbl = new TestTable(k,v); 
           list.put(tbl); 
          } 
         } 
        ); 

但似乎有些類型不匹配happenning。 請幫忙。

回答

6

假設此程序的意圖是將kafka的流數據保存到Cassandra中,則不需要將JavaDStream<Tuple2<String,String>>數據轉儲到List<TestTable>列表中。

DataStax的Spark-Cassandra連接器直接通過Spark Streaming extensions支持此功能。

這應足以在JavaDStream使用這樣的擴展:

javaFunctions(data).writerBuilder("testkeyspace", "test_table", mapToRow(TestTable.class)).saveToCassandra(); 

,而不是中介名單上排數據。

+0

@ maasg-謝謝你,現在我可以創建一個連接。但現在數據沒有被插入到cassandra表中。日誌顯示它正在連接,下一秒會斷開連接。完整的代碼和日誌和依賴關係在「http://stackoverflow.com/questions/27386223/spark-data-not-getting-written-into-cassandra-zero-rows-inserted」 – aiman 2014-12-09 18:33:42