2

我們一直在使用Spark RDD API(Spark 2.0)處理在Cassandra中建模的數據。請注意,數據在Cassandra中進行建模以便高效讀寫。Spark DataFrame和Cassandra

但是現在也有星火SQL API的星火據幀API,這也是另一個數據訪問方法 - http://spark.apache.org/docs/latest/sql-programming-guide.html

火花RDD,我們使用CQL使用Datastax卡桑德拉驅動程序的API來訪問卡桑德拉DB - http://docs.datastax.com/en/developer/java-driver/2.0/,像

val resultSets = new util.ArrayList[Row]() 
val resultSet = CassandraConnector(SparkReader.conf).withSessionDo[ResultSet] { session => 
    val sel_stmt = QueryBuilder.select("yyy", "zz", "xxxx") 
       .from("geokpi_keyspace", table_name) 
       .where(QueryBuilder.eq("bin", bin)) 
       .and(QueryBuilder.eq("year", year)) 
       .and(QueryBuilder.eq("month", month)) 
       .and(QueryBuilder.eq("day", day)) 
       .and(QueryBuilder.eq("cell", cell)) 

    session.execute(sel_stmt) 

    } 
resultSets.addAll(resultSet.all()) 
}) 
resultSets.asScala.toList --> RDD[Row] 

因爲我們幾乎可以直接使用CQL,它不允許你這樣做不被支持的卡桑德拉比如連接作爲卡桑德拉設計不支持的事情。 但是,使用Spark SQL或Spark DataFrame API訪問Cassandra DB的替代方法爲您提供了SQL類型抽象。對於底層關係數據庫來說,這樣做會很好。

但是使用這種抽象,像JOIN查詢存儲在NoSQL數據庫中的數據,如Cassandra似乎是一個錯誤的抽象。在Spark中使用這種抽象,無需瞭解數據模型(分區鍵,集羣鍵等等)對於高效的數據讀寫非常重要,是不是會導致無效的生成代碼以及底層Cassandra節點的高效/慢速數據檢索?

回答

0

我會爭辯說,你認爲我們在使用Spark SQL時忽略數據模型的假設是不正確的,它實踐我們在非常嚴格的契約下工作,其中數據源可能默認只處理基本投影和選擇以及重處理由Spark集羣執行。

與此同時,數據源開發人員在設計給定連接器時可以自由地包含任何類型的域或系統特定知識。 JDBC數據源就是一個很好的例子,你可以檢查我的答案How to partition Spark RDD when importing Postgres using JDBC?,看看它如何被用來執行一些非標準的操作。

儘管Cassandra Connector似乎在這裏略有侷限(如果我錯了,我沒有廣泛使用它),它的RDD組件提供了一套廣泛的Cassandra感知操作,可用於執行服務器端操作和優化整體工作流程

無論如何,當Spark試圖強制外部系統執行那裏不支持的操作時,沒有任何情況。

不會它導致高效生成的代碼和高效/慢的數據檢索

我們要問這裏的根本問題是爲什麼會事。由於僅僅使用給定的來源進行分析工作,我們隱含地接受這樣一個事實,即我們可能會強調給定的系統,而這種方式並不是典型的日常操作使用。

同時,如果我們使用的系統不支持在我們的數據處理管道中需要的某些操作,我們應該接受執行這些操作的成本可能會比優化後的系統高得多。儘管效率低下的處理需要花費金錢,但在選擇技術堆棧和設計基礎架構時應該考慮這個問題。

最後,如果某些操作具有不可接受的性能影響(是的,聯接很昂貴),它應該反映在數據建模中。

因爲我們幾乎可以直接使用CQL,它不允許你這樣做不被卡桑德拉支持的東西像任命爲Cassandra的設計不支持它

正如已經已經解釋同樣沒有星火SQL。直接提取數據並稍後執行連接不會改變執行模型中的任何內容。

忽略此特定示例中沒有任何內容不能由DataFrame API處理,並且可以使用cassandraTable執行更復雜的檢索。

+0

「好吧,如果你可以使用本地數據結構來處理數據,就像在你的例子中那樣,爲什麼首先使用Spark呢?如果數據可以存儲在一臺機器的內存中,那裏有解決方案,做比Spark更好的工作「 - >我們的數據不能保存在一個spark或cassandra節點中;我們使用大約4個Cassandra節點來並行存儲和讀取2到4個Spark Worker節點的數據。 Spark用於數據的分佈式並行處理。 Spark是非常需要的,否則我們將不得不將我們的基於消息的任務系統與錯誤處理等進行滾動。 –

+0

那麼如何將查詢結果轉換爲本地非惰性結構('resultSets.asScala.toList')適合那個? – zero323

+0

問題是,如圖所示,直接使用Cql的用法,沒有機會使用Join或類似的,使用Spark SQL許可證,我希望你有我的問題 –