2017-03-02 33 views
2

我一直在使用sparklyr將大卡桑德拉表放入火花中,將它們註冊到R並對它們執行dplyr操作。通過sparklyr將cassandra表導入spark中 - 只能選擇一些列?

我已經成功導入卡桑德拉表,看起來像這樣的代碼:

# import cassandra table into spark 

cass_df <- sparklyr:::spark_data_read_generic(
    sc, "org.apache.spark.sql.cassandra", "format", 
    list(keyspace = "cass_keyspace", table = "cass_table") 
) %>% 
    invoke("load") 


# register table in R 

cass_tbl <- sparklyr:::spark_partition_register_df(
     sc, cass_df, name = "cass_table", repartition = 0, memory = TRUE) 
     ) 

一些卡桑德拉表都相當大(> 85億行),並需要一段時間才能導入/註冊和有些會導致內存溢出,即使有6個節點運行總共60個內核和192GB內存。但是,我通常只需要每個cassandra數據庫中的一些列。

我的問題是:

  1. 是否可以過濾進口/註冊卡桑德拉數據庫,以便只進口一些列或使上過濾主鍵(通過傳遞SQL/CQL即類型查詢,如SELECT name FROM cass_table WHERE id = 5)?
  2. 這樣的查詢將放在上面的代碼中,語法採用何種形式?

我曾嘗試加入這樣一個查詢的選項列表中的一個附加選項,即:

list(. . . , select = "id") 

以及調用它作爲一個單獨的管%>% invoke("load")之前,即:

invoke("option", "select", "id") %>% 

# OR 

invoke("option", "query", s"select id from cass_table") %>% 

但這些不起作用。有什麼建議麼?

回答

3

可以跳過渴望緩存和選擇感興趣的列:

session <- spark_session(sc) 

# Some columns to select 
cols <- list("x", "y", "z") 

cass_df <- session %>% 
    invoke("read") %>% 
    invoke("format", "org.apache.spark.sql.cassandra") %>% 
    invoke("options", as.environment(list(keyspace="test"))) %>% 
    invoke("load") %>% 
    # We use select(col: String, cols* String) so the first column 
    # has to be used separately. If you want only one column the third argument 
    # has to be an empty list 
    invoke("select", cols[[1]], cols[2:length(cols)]) %>% 
    # Standard lazy cache if you need one 
    invoke("cache") 

如果使用謂詞可以顯著降低獲取的數據量設爲pushdown選項"true"(默認)和前緩存使用filter

如果你想通過更復雜的查詢您註冊臨時視圖和sql方法:

session %>% 
    invoke("read") %>% 
    ... 
    invoke("load") %>% 
    invoke("createOrReplaceTempView", "some_name") 

cass_df <- session %>% 
    invoke("sql", "SELECT id FROM some_name WHERE foo = 'bar'") %>% 
    invoke("cache") 
+0

精彩,這個帖子已經幫了我這麼多...我已經做了,從這個靈感的東西,但裝載了正確的來自CSV文件的列。 我想添加一個可能想註冊'case_df',因此可以使用dplyr動詞(因爲sparklyr帶有dplyr後端)。 'R_cass_df = sdf_register(cass_df 「spark_cass_df」)' 然後dplyr動詞可以應用於,例如:: 登記用做 '庫( 「dplyr」); R_cass_df%>%filter(foo ==「bar」)%>%select(id)' – Raphvanns