0

我有一個像下面這樣的cassandra表,並且想要使用一些條件從cassandra獲取記錄並將其放置在配置單元表中。Cassandra到Hive使用Spark

卡桑德拉表(employee)入口:

Id Name Amount Time 
1 abc 1000 2017041801 
2 def 1000 2017041802 
3 ghi 1000 2017041803 
4 jkl 1000 2017041804 
5 mno 1000 2017041805 
6 pqr 1000 2017041806 
7 stu 1000 2017041807 

假設該表列是數據類型的字符串。 我們在蜂巢中也有相同的模式。

現在我想導入cassandra記錄在2017041801到2017041804之間配置爲hive或hdfs。在第二次運行中,我將根據prev運行來提取增量記錄。

我可以使用下面的語法將cassandra數據加載到RDD中。現在

val sc = new SparkContext(conf) 
val rdd = sc.cassandraTable("mydb", "Employee") 

我的問題是我怎麼能根據條件之間並堅持在蜂箱或蜂房外部表路徑篩選的記錄篩選該記錄。

不幸的是我的時間列不是cassandra表中的集羣鍵。所以我無法使用.where()子句。

我是新來的這個scala和火花。所以,請善意幫助這個過濾器邏輯或任何其他更好的方式來實現這個邏輯使用數據幀,請讓我知道。

在此先感謝。

+2

過濾,你可以在火花本身做,上線的東西:保存http://stackoverflow.com/a/39283574/7413631蜂巢這裏覆蓋http://stackoverflow.com/questions/37050828/save-spark-rdd-to-hive-table –

回答

0
  1. 我推薦使用Connector Dataframe API從C * https://github.com/datastax/spark-cassandra-connector/blob/master/doc/14_data_frames.md加載。
  2. 使用df.filter()調用謂詞
  3. saveAsTable()方法將數據存儲在配置單元中。

這裏是引發2.0例如,對於你的情況

val df = spark 
    .read 
    .format("org.apache.spark.sql.cassandra") 
    .options(Map("table" -> "Employee", "keyspace" -> "mydb")) 
    .load() 
df.filter("time between 2017041801 and 2017041804") 
    .write.mode("overwrite").saveAsTable("hivedb.employee"); 
+0

謝謝@Artem Aliev – vkumarg3