2017-02-13 26 views
1

我有CSV格式的文件,其中包含與列「ID」,「時間戳」,「行動」,「價值」和「位置」的表。 我想一個函數應用於表中的每一行,我已經寫在R上的代碼如下:如何將函數應用於SparkR中的每一行?

user <- read.csv(file_path,sep = ";") 
num <- nrow(user) 
curLocation <- "1" 
for(i in 1:num) { 
    row <- user[i,] 
    if(user$action != "power") 
     curLocation <- row$value 
    user[i,"location"] <- curLocation 
} 

將R腳本正常工作,現在我想將其應用SparkR。但是,我無法直接訪問SparkR中的第i行,並且找不到任何操作SparkR documentation中的每一行的函數。

我應以實現如在R腳本同樣的效果使用哪種方法?

此外,作爲@chateaur建議,我嘗試使用dapply功能如下的代碼:

curLocation <- "1" 
schema <- structType(structField("Sequence","integer"), structField("ID","integer"), structField("Timestamp","timestamp"), structField("Action","string"), structField("Value","string"), structField("Location","string")) 
setLocation <- function(row, curLoc) { 
    if(row$Action != "power|battery|level"){ 
     curLoc <- row$Value 
    } 
    row$Location <- curLoc 
} 
bw <- dapply(user, function(row) { setLocation(row, curLocation)}, schema) 
head(bw) 

然後,我得到了一個錯誤: error message

我擡頭警告消息的條件具有長度> 1且僅第一個元素將被用來和我發現一些https://stackoverflow.com/a/29969702/4942713。這讓我不知道在dapply功能參數是否代表我的數據幀,而不是一個單列的整個分區?可能功能不是一個理想的解決方案?

後來,我試圖通過@chateaur作爲建議修改功能。除了使用dapply的,我用dapplyCollect從而節省了我指定模式的努力。有用!

changeLocation <- function(partitionnedDf) { 
    nrows <- nrow(partitionnedDf) 
    curLocation <- "1" 
    for(i in 1:nrows){ 
     row <- partitionnedDf[i,] 
     if(row$action != "power") { 
      curLocation <- row$value 
     } 
    partitionnedDf[i,"location"] <- curLocation 
    } 
    partitionnedDf 
} 

bw <- dapplyCollect(user, changeLocation) 
+0

您可以使用sparklyr(相同的語法比dplyr ) –

+0

@DimitriPetrenko如果我需要使用SparkR,該怎麼辦? SparkR能達到這個效果嗎? – Scorpion775

回答

2

Scorpion775,

你應該分享您的sparkR代碼。不要忘記,R和sparkR中的數據操作方式不一樣。

來源:http://spark.apache.org/docs/latest/sparkr.html

df <- read.df(csvPath, "csv", header = "true", inferSchema = "true", na.strings = "NA") 

然後你可以看一下dapply功能在這裏:https://spark.apache.org/docs/2.1.0/api/R/dapply.html

這裏是一個工作示例:

changeLocation <- function(partitionnedDf) { 
    nrows <- nrow(partitionnedDf) 
    curLocation <- as.integer(1) 

    # Loop over each row of the partitionned data frame 
    for(i in 1:nrows){ 
     row <- partitionnedDf[i,] 

     if(row[1] != "power") { 
      curLocation <- row[2] 
     } 
     partitionnedDf[i,3] <- curLocation 
    } 

    # Return modified data frame 
    partitionnedDf 
} 

# Load data 
df <- read.df("data.csv", "csv", header="false", inferSchema = "true") 

head(collect(df)) 

# Define schema of dataframe 
schema <- structType(structField("action", "string"), structField("value", "integer"), 
        structField("location", "integer")) 

# Change location of each row      
df2 <- dapply(df, changeLocation, schema) 

head(df2) 
+0

我接過一看dapply功能,並發現它是用於「應用** **功能的SparkDataFrame的每個分區」。根據我的理解,_partition_與_row_無關。我擔心的是,我不知道如何編寫**函數**以應用於SparkDataFrame。目前我只知道如何實現**函數**我想在R中但不在SparkR中。你能給我一些建議嗎? – Scorpion775

+0

我不是一個火花專家,但我認爲分區數據分散到整個集羣中。你可以嘗試一下上面的例子,告訴我它是否適合你的需要? – chateaur

+0

謝謝你的建議。我試圖按照你的指示,但得到了一個錯誤,如問題所示。 – Scorpion775

相關問題