2016-03-22 206 views
0

鑑於SparkFlumeEvents流(或者說,任何DSTREAM)如何做一個映射到適當的模式,使流,可以保存到卡桑德拉與星火流架構

stream.saveToCassandra(keyspace,table) 

一個天真的嘗試抱怨缺少的列。

是stream.map()給定對象(這看起來很麻煩)的最佳方法?

或者......

另一種方法似乎是使用stream.foreachRDD並以某種方式映射到數據幀。考慮到流方法支持直接存儲到cassandra,這似乎也很麻煩。

那麼正確的方法是什麼?

回答

0

通過指定要插入的鍵空間,表名和列,使用spark cassandra連接器將流保存到Cassandra中。另一種方法是將數據映射到UDT並將其插入到數據庫中。如果您只需要插入數據,我寧願將列指定爲最快的方式。從文檔 例不完全一樣的,但是你可以使用它的任何變種:

val wc = stream.flatMap(_.split("\\s+")) 
    .map(x => (x, 1)) 
    .reduceByKey(_ + _) 
    .saveToCassandra("streaming_test", "words", SomeColumns("word", "count")) 
+0

做不過你的代碼假設,即傳入流式傳輸有正確的列 - 我已經看到了文檔這種方式,但我的主要問題更多地圍繞轉換和映射模式的想法 - 例如,我可能需要以自定義方式反序列化流的字節或執行列映射。我想知道指定這些映射/解碼器的最簡潔的方法是什麼? – ismisesisko