我們一直在使用Spark RDD API(Spark 2.0)處理在Cassandra中建模的數據。請注意,數據在Cassandra中進行建模以便高效讀寫。Spark DataFrame和Cassandra
但是現在也有星火SQL API的星火據幀API,這也是另一個數據訪問方法 - http://spark.apache.org/docs/latest/sql-programming-guide.html
火花RDD,我們使用CQL使用Datastax卡桑德拉驅動程序的API來訪問卡桑德拉DB - http://docs.datastax.com/en/developer/java-driver/2.0/,像
val resultSets = new util.ArrayList[Row]()
val resultSet = CassandraConnector(SparkReader.conf).withSessionDo[ResultSet] { session =>
val sel_stmt = QueryBuilder.select("yyy", "zz", "xxxx")
.from("geokpi_keyspace", table_name)
.where(QueryBuilder.eq("bin", bin))
.and(QueryBuilder.eq("year", year))
.and(QueryBuilder.eq("month", month))
.and(QueryBuilder.eq("day", day))
.and(QueryBuilder.eq("cell", cell))
session.execute(sel_stmt)
}
resultSets.addAll(resultSet.all())
})
resultSets.asScala.toList --> RDD[Row]
因爲我們幾乎可以直接使用CQL,它不允許你這樣做不被支持的卡桑德拉比如連接作爲卡桑德拉設計不支持的事情。 但是,使用Spark SQL或Spark DataFrame API訪問Cassandra DB的替代方法爲您提供了SQL類型抽象。對於底層關係數據庫來說,這樣做會很好。
但是使用這種抽象,像JOIN查詢存儲在NoSQL數據庫中的數據,如Cassandra似乎是一個錯誤的抽象。在Spark中使用這種抽象,無需瞭解數據模型(分區鍵,集羣鍵等等)對於高效的數據讀寫非常重要,是不是會導致無效的生成代碼以及底層Cassandra節點的高效/慢速數據檢索?
「好吧,如果你可以使用本地數據結構來處理數據,就像在你的例子中那樣,爲什麼首先使用Spark呢?如果數據可以存儲在一臺機器的內存中,那裏有解決方案,做比Spark更好的工作「 - >我們的數據不能保存在一個spark或cassandra節點中;我們使用大約4個Cassandra節點來並行存儲和讀取2到4個Spark Worker節點的數據。 Spark用於數據的分佈式並行處理。 Spark是非常需要的,否則我們將不得不將我們的基於消息的任務系統與錯誤處理等進行滾動。 –
那麼如何將查詢結果轉換爲本地非惰性結構('resultSets.asScala.toList')適合那個? – zero323
問題是,如圖所示,直接使用Cql的用法,沒有機會使用Join或類似的,使用Spark SQL許可證,我希望你有我的問題 –