2015-09-06 108 views
2

我正在將一些數據加載到sparkR(Spark版本1.4.0,在Fedora21上運行),通過該數據我運行了一些生成三個不同數字的算法。我的算法需要一堆參數,我想在相同的數據上運行不同的參數設置。輸出格式應該是一個數據框(或CSV列表)的列是算法參數和三個數字我的算法中單位計算,即將sparkr收集到數據框中

mypar1, mypar2, mypar3, myres1, myres2, myres3 
    1  1.5  1.2  5.6  8.212 5.9 
    2  1.8  1.7  5.1  7.78 8.34 

將是兩個不同的參數設置的輸出。 我寫低於該腳本parallelises在不同paramater設置運行:它需要與參數值作爲參數,這對於上面的例子是這樣的一個輸入文件:每行

1,1.5,1.2 
2,1.8,1.7 

這樣一個參數的組合。

這是我的問題:不是每個參數設置一個,所有的數字都合併成一個長列表。函數cv_spark返回一個data.frame(基本上是一行)。我怎麼能告訴spark將cv_spark的輸出結合成一個數據框(即做一些像rbind?)或列表列表?

#!/home/myname/Spark/spark-1.4.0/bin/sparkR 

library(SparkR) 

sparkcontext <- sparkR.init("local[3]","cvspark",sparkEnvir=list(spark.executor.memory="1g")) 

cv_spark <- function(indata) { 
    cv_params <- strsplit(indata, split=",")[[1]] 
    param.par1 = as.integer(cv_params[1]) 
    param.par2 = as.numeric(cv_params[2]) 
    param.par3 = as.numeric(cv_params[3]) 
    predictions <- rep(NA, 1) 
    ## here I run some calculation on some data that I load to my SparkR session, 
    ## but for illustration purpose I'm just filling up with some random numbers 
    mypred = base:::sample(seq(5,10,by=0.01),3) 
    predictions <- cbind(param.par1, param.par2, param.par3,mypred[1],mypred[2],mypred[3]) 
    return(as.data.frame(predictions)) 
} 

args <- commandArgs(trailingOnly=TRUE) 
print(paste("args ", args)) 
cvpar = readLines(args[[1]]) 

rdd <- SparkR:::parallelize(sparkcontext, coll=cvpar, numSlices=4) 
myerr <- SparkR:::flatMap(rdd,cv_spark) 
output <- SparkR:::collect(myerr) 
print("final output") 
print(output) 

outfile = "spark_output.csv" 
write.csv(output,outfile,quote=FALSE,row.names=FALSE) 

回答

1

我設法得到我想要的東西用flatMapValues代替flatMap,並通過創建(key, value)對我的各種參數設置(基本關鍵是我的參數輸入文件和值的行號是對的參數線)。然後我打電話給reduceByKey,它基本上每個鍵保持一行。修改後的腳本看起來像這樣:

#!/home/myname/Spark/spark-1.4.0/bin/sparkR 

library(SparkR) 

sparkcontext <- sparkR.init("local[4]","cvspark",sparkEnvir=list(spark.executor.memory="1g")) 

cv_spark <- function(indata) { 
    cv_params <- unlist(strsplit(indata[[1]], split=",")) 
    param.par1 = as.integer(cv_params[1]) 
    param.par2 = as.numeric(cv_params[2]) 
    param.par3 = as.integer(cv_params[3]) 
    predictions <- rep(NA, 1) 
    ## here I run some calculation on some data that I load to my SparkR session, 
    ## but for illustration purpose I'm just filling up with some random numbers 
    mypred = base:::sample(seq(5,10,by=0.01),3) 
    predictions <- cbind(param.par1, param.par2, param.par3,mypred[1],mypred[2],mypred[3]) 
    return(as.data.frame(predictions)) 
} 

args <- commandArgs(trailingOnly=TRUE) 
print(paste("args ", args)) 
cvpar = readLines(args[[1]]) 
## Creates (key, value) pairs 
cvpar <- Map(list,seq(1,length(cvpar)),cvpar) 

rdd <- SparkR:::parallelize(sparkcontext, coll=cvpar, numSlices=1) 
myerr <- SparkR:::flatMapValues(rdd,cv_spark) 
myerr <- SparkR:::reduceByKey(myerr,"c", 2L) 
output <- SparkR:::collect(myerr) 

myres <- sapply(output,`[`,2) 
df_res <- do.call("rbind",myres) 
colnames(df_res) <- c("Element","sigdf","sigq","err","err.sse","err.mse") 

outfile = "spark_output.csv" 
write.csv(df_res,outfile,quote=FALSE,row.names=FALSE) 

這按預期運行,即,輸出是一個數據幀(或CSV文件)與相同的行數作爲輸入文件到上面的腳本(的不同即數參數值配置),但也許有更有效的方法來做到這一點。

+0

嗨,強子,你能提供給我的命令來運行這個程序。 –

+0

@Vijay_Shinde'./myexample.R myparameterfile.txt'其中myexample.R是上面的腳本。確保你在腳本中修復了shebang。 myparameterfile.txt包含每行3個逗號分隔的數字。 – hadron