2016-12-07 60 views
0

我有以下卡桑德拉表用法:分區鍵檢索與joinWithCassandraTable

CREATE TABLE listener.snapshots_geohash 
(
    created_date text, -- date when record have come to the system 
    geo_part text, -- few signs of geo hash - just for partitioning 
    when timestamp, -- record creation date 
    device_id text, -- id of device produced json data (see snapshot column) 
    snapshot text, -- json data, should be aggregated by spark 
    PRIMARY KEY ((created_date, geo_part), when, device_id) 
) 

每天早上聚合應用程序應該加載前一天和快照列總JSON數據。聚合將通過geohash對數據進行分組,這就是爲什麼它的部分被選爲分區鍵的一部分。

我知道使用joinWithCassandraTable加載Cassandra中的數據是有效的 - 但爲此,我必須從(created_date,geo_part)對中構建RDD。儘管我知道created_date值,但我無法列出geo_part值 - 因爲它只是geohash的一部分,並且其值不連續。所以我有辦法運行select distinct created_date, geo_part from ks.snapshots並從其結果創建RDD。問題是如何使用spark 2.0.2和cassandra-connector 2.0.0-M3運行此選擇,或者可能有其他方法?

回答

0

我發現的方式運行CQL查詢與CassandraConnector獲取卡桑德拉分區鍵:

val cassandraConnector = CassandraConnector(spark.sparkContext.getConf) 
val distinctRows = cassandraConnector.withSessionDo(session => { 
    session.execute(s"select distinct created_date, geo_part from ${keyspace}.$snapshots_table") 
}).all().map(row => {TableKeyM(row.getString("created_date"), row.getString("geo_part"))}).filter(k => {days.contains(k.created_date)}) 
val data_x = spark.sparkContext.parallelize(distinctRows) 

表結構設計有以下問題:卡桑德拉不允許添加WHERE CREATED_DATE =「...」條款到選擇不同的created_date,geo_part,它需要獲取整個列表對並在應用程序中對其進行過濾。

替代解決方案可以使分區鍵連續。如果聚合是按小時完成的 - 那麼分區鍵可以是(created_date,hour),24小時可以在應用程序中列出。如果每天有24個分區不夠用,並且聚合有組by by geohash,可以堅持geohash的重要部分 - 但它應該被翻譯成可數的東西 - 例如geoPart.hash()%desiredNumberOfSubpartitions