2017-06-13 54 views
2

是否有方法使用sparklyr/dplyr的函數複製Spark數據幀的行?R - 如何使用sparklyr複製火花數據幀中的行

sc <- spark_connect(master = "spark://####:7077") 

df_tbl <- copy_to(sc, data.frame(row1 = 1:3, row2 = LETTERS[1:3]), "df") 

這是所需的輸出,保存到一個新的火花TBL:

> df2_tbl 
    row1 row2 
    <int> <chr> 
1  1  A 
2  1  A 
3  1  A 
4  2  B 
5  2  B 
6  2  B 
7  3  C 
8  3  C 
9  3  C 

回答

2

隨着sparklyr可以使用arrayexplode通過@Oli的建議:

df_tbl %>% 
    mutate(arr = explode(array(1, 1, 1))) %>% 
    select(-arr) 

# # Source: lazy query [?? x 2] 
# # Database: spark_connection 
# row1 row2 
# <int> <chr> 
# 1  1 A  
# 2  1 A  
# 3  1 A  
# 4  2 B  
# 5  2 B  
# 6  2 B  
# 7  3 C  
# 8  3 C  
# 9  3 C  

和廣義

library(rlang) 

df_tbl %>% 
    mutate(arr = !!rlang::parse_quo(
    paste("explode(array(", paste(rep(1, 3), collapse = ","), "))") 
)) %>% select(-arr) 

# # Source: lazy query [?? x 2] 
# # Database: spark_connection 
# row1 row2 
# <int> <chr> 
# 1  1 A  
# 2  1 A  
# 3  1 A  
# 4  2 B  
# 5  2 B  
# 6  2 B  
# 7  3 C  
# 8  3 C  
# 9  3 C 

在這裏你可以輕鬆地調整行數。

1

我想到的第一是使用explode功能(它到底是什麼,是指在理念火花)。然而,在SparkR中似乎並沒有支持數組(據我所知)。

> structField("a", "array") 
Error in checkType(type) : Unsupported type for SparkDataframe: array 

不過,我可以提出其他兩種方法:

  1. 一個簡單但並不是很優雅的一個:

    head(rbind(df, df, df), n=30) 
    # row1 row2 
    # 1 1 A 
    # 2 2 B 
    # 3 3 C 
    # 4 1 A 
    # 5 2 B 
    # 6 3 C 
    # 7 1 A 
    # 8 2 B 
    # 9 3 C 
    

    或用一個for循環更多的通用性:

    df2 = df 
    for(i in 1:2) df2=rbind(df, df2) 
    

    請注意,這也可以與union一起使用。

  2. 第二,更優雅方法(因爲它僅意味着一個火花操作)是基於與尺寸3的數據幀交叉聯接(笛卡爾積)(或任何其它數目):

    j <- as.DataFrame(data.frame(s=1:3)) 
    head(drop(crossJoin(df, j), "s"), n=100) 
    # row1 row2 
    # 1 1 A 
    # 2 1 A 
    # 3 1 A 
    # 4 2 B 
    # 5 2 B 
    # 6 2 B 
    # 7 3 C 
    # 8 3 C 
    # 9 3 C 
    
+0

它應該是'array '而不是'array',例如'structField(「a」,「array 」)'。 – user8371915

0

我不知道R的rep函數的集羣端版本。然而,我們可以使用連接模擬羣集。

df_tbl <- copy_to(sc, data.frame(row1 = 1:3, row2 = LETTERS[1:3]), "df") 

replyr <- function(data, n, sc){ 
    joiner_frame <- copy_to(sc, data.frame(joiner_index = rep(1,n)), "tmp_joining_frame", overwrite = TRUE) 

    data %>% 
    mutate(joiner_index = 1) %>% 
    left_join(joiner_frame) %>% 
    select(-joiner_index) 

} 

df_tbl2 <- replyr(df_tbl, 3, sc) 
# row1 row2 
# <int> <chr> 
# 1  1 A  
# 2  1 A  
# 3  1 A  
# 4  2 B  
# 5  2 B  
# 6  2 B  
# 7  3 C  
# 8  3 C  
# 9  3 C 

它能夠完成任務,但它是一個有點髒,因爲tmp_joining_frame將持續存在。我不確定這樣做多好,因爲對函數的多次調用進行了懶惰評估。