我有一個如何使用spark來操作/迭代/掃描cassandra的多個表的問題。我們的項目使用spark & spark-cassandra-connector連接到cassandra來掃描多個表,嘗試在不同的表中匹配相關值,如果匹配,則執行額外的操作,如表插入。使用情況如下圖所示:使用spark來掃描多個cassandra表使用spark-cassandra-connector
sc.cassandraTable(KEYSPACE, "table1").foreach(
row => {
val company_url = row.getString("company_url")
sc.cassandraTable(keyspace, "table2").foreach(
val url = row.getString("url")
val value = row.getString("value")
if (company_url == url) {
sc.saveToCassandra(KEYSPACE, "target", SomeColumns(url, value))
}
)
})
的問題是
火花RDD是不可序列化,原因sc.cassandraTable返回一個RDD嵌套搜索將失敗。我知道要解決的唯一方法是使用sc.broadcast(sometable.collect())。但是如果sometable很大,collect會消耗所有的內存。而且,如果在使用情況下,多個表使用廣播,則會消耗內存。
RDD.persist可以處理這種情況,而不是廣播嗎?在我的情況下,我使用sc.cassandraTable來讀取RDD中的所有表,並將其保存回磁盤,然後檢索數據以便處理。如果它有效,我怎麼保證rdd的讀取是由塊完成的?
除了火花,還有其他工具(如hadoop等??)可以優雅地處理案件嗎?
我無法進行連接,因爲對於我的大多數情況,我必須使用string.contains來比較相關列,而不是字符串等於運算符。 – user8053367
這將需要一個笛卡兒連接,除非你有像Solr這樣的二級索引。 – RussS
謝謝。如果我做笛卡爾連接,結果會很大,可能會耗盡內存? 以及如何使用二級索引來做的東西? – user8053367