2017-09-25 54 views
1

我試圖使用spark_apply在Spark表上運行下面的R函數。這工作得很好,如果我的輸入表小(例如5000行),但約30分鐘拋出一個錯誤後,當該表是中等大小(例如500萬行): sparklyr worker rscript failure, check worker logs for detailsSparklyr的spark_apply函數似乎在單個執行程序上運行,並在中等大小的數據集上失敗

望着星火UI顯示有隻有一個任務正在創建,並且一個執行者被應用於這個任務。

任何人都可以提供建議,爲什麼這個函數失敗的500萬行數據集?難道問題是單個遺囑執行者是否正在做所有的工作,並失敗?

# Create data and copy to Spark 
testdf <- data.frame(string_id=rep(letters[1:5], times=1000), # 5000 row table 
       string_categories=rep(c("", "1", "2 3", "4 5 6", "7"), times=1000)) 
testtbl <- sdf_copy_to(sc, testdf, overwrite=TRUE, repartition=100L, memory=TRUE) 

# Write function to return dataframe with strings split out 
myFunction <- function(inputdf){ 
    inputdf$string_categories <- as.character(inputdf$string_categories) 
    inputdf$string_categories=with(inputdf, ifelse(string_categories=="", "blank", string_categories)) 
    stringCategoriesList <- strsplit(inputdf$string_categories, ' ') 
    outDF <- data.frame(string_id=rep(inputdf$string_id, times=unlist(lapply(stringCategoriesList, length))), 
        string_categories=unlist(stringCategoriesList)) 
return(outDF) 
} 

# Use spark_apply to run function in Spark 
outtbl <- testtbl %>% 
    spark_apply(myFunction, 
      names=c('string_id', 'string_categories')) 
outtbl 

回答

2
  1. sparklyr worker rscript failure, check worker logs for details錯誤是由駕駛員節點寫入,並指出需要在工人日誌中找到真正的錯誤。通常,可以通過在Spark UI中執行程序的選項卡中打開stdout來訪問工作日誌;日誌應該包含RScript:條目,描述執行程序正在處理的內容以及錯誤的具體內容。

  2. 關於單個任務創建,當未在spark_apply()與類型指定columns,它需要計算結果的一個子集來猜測列類型,以避免這種情況,通過明確的列類型如下:

    outtbl <- testtbl %>% spark_apply( myFunction, columns=list( string_id = "character", string_categories = "character"))

  3. 如果使用sparklyr 0.6.3,更新到sparklyr 0.6.4devtools::install_github("rstudio/sparklyr"),由於sparklyr 0.6.3包含在某些情況下,邊緣的不正確的等待時間,其中數據包分發被啓用和多於一個的執行器中的每個節點上運行。

  4. 在高負載下,內存不足常見。增加分區數量可以解決此問題,因爲它會減少處理此數據集所需的總內存。嘗試運行此爲:

    testtbl %>% sdf_repartition(1000) %>% spark_apply(myFunction, names=c('string_id', 'string_categories'))

  5. 它也可能是功能拋出一個異常,對於一些由於功能邏輯分區的情況下,你可以看到,如果是這樣的話,通過使用tryCatch()忽略這些錯誤,然後找出哪些是缺失的值以及爲什麼函數會失敗這些值。我會的東西開始,如:

    myFunction <- function(inputdf){ tryCatch({ inputdf$string_categories <- as.character(inputdf$string_categories) inputdf$string_categories=with(inputdf, ifelse(string_categories=="", "blank", string_categories)) stringCategoriesList <- strsplit(inputdf$string_categories, ' ') outDF <- data.frame(string_id=rep(inputdf$string_id, times=unlist(lapply(stringCategoriesList, length))), string_categories=unlist(stringCategoriesList)) return(outDF) }, error = function(e) { return( data.frame(string_id = c(0), string_categories = c("error")) ) }) }

+0

感謝您對這樣一個全面的答案!增加分區的數量解決了問題,但也有很多額外的信息可以幫助我繼續前進。 – jay

相關問題