2017-05-24 125 views
0

我有一個如何使用spark來操作/迭代/掃描cassandra的多個表的問題。我們的項目使用spark & spark-cassandra-connector連接到cassandra來掃描多個表,嘗試在不同的表中匹配相關值,如果匹配,則執行額外的操作,如表插入。使用情況如下圖所示:使用spark來掃描多個cassandra表使用spark-cassandra-connector

sc.cassandraTable(KEYSPACE, "table1").foreach(
    row => { 
    val company_url = row.getString("company_url") 

    sc.cassandraTable(keyspace, "table2").foreach(
     val url = row.getString("url") 
     val value = row.getString("value") 
     if (company_url == url) { 
      sc.saveToCassandra(KEYSPACE, "target", SomeColumns(url, value)) 
     } 
    ) 
}) 

的問題是

  1. 火花RDD是不可序列化,原因sc.cassandraTable返回一個RDD嵌套搜索將失敗。我知道要解決的唯一方法是使用sc.broadcast(sometable.collect())。但是如果sometable很大,collect會消耗所有的內存。而且,如果在使用情況下,多個表使用廣播,則會消耗內存。

  2. RDD.persist可以處理這種情況,而不是廣播嗎?在我的情況下,我使用sc.cassandraTable來讀取RDD中的所有表,並將其保存回磁盤,然後檢索數據以便處理。如果它有效,我怎麼保證rdd的讀取是由塊完成的?

  3. 除了火花,還有其他工具(如hadoop等??)可以優雅地處理案件嗎?

回答

0

它看起來像你實際上試圖做一系列的內部聯接。見

joinWithCassandraTable方法

這可以讓你使用一個RDD的元素做一個卡桑德拉表直接查詢。根據您從Cassandra讀取的數據部分,這可能是您最好的選擇。如果分數太大,儘管您最好單獨閱讀兩個表,然後使用RDD.join方法排列行。

如果一切都失敗了,你總是可以手動使用CassandraConnector對象來直接訪問Java驅動程序,並使用分佈式環境中的原始請求。

+0

我無法進行連接,因爲對於我的大多數情況,我必須使用string.contains來比較相關列,而不是字符串等於運算符。 – user8053367

+0

這將需要一個笛卡兒連接,除非你有像Solr這樣的二級索引。 – RussS

+0

謝謝。如果我做笛卡爾連接,結果會很大,可能會耗盡內存? 以及如何使用二級索引來做的東西? – user8053367