2016-07-25 104 views
5

我有以下簡單SparkR方案,該方案是創建一個SparkR DataFrame,並從中檢索/收集數據。無法從SparkR檢索數據創建數據幀

Sys.setenv(HADOOP_CONF_DIR = "/etc/hadoop/conf.cloudera.yarn") 
Sys.setenv(SPARK_HOME = "/home/user/Downloads/spark-1.6.1-bin-hadoop2.6") 
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths())) 
library(SparkR) 
sc <- sparkR.init(master="yarn-client",sparkEnvir = list(spark.shuffle.service.enabled=TRUE,spark.dynamicAllocation.enabled=TRUE,spark.dynamicAllocation.initialExecutors="40")) 
hiveContext <- sparkRHive.init(sc) 

n = 1000 
x = data.frame(id = 1:n, val = rnorm(n)) 
xs <- createDataFrame(hiveContext, x) 

xs 

head(xs) 
collect(xs) 

我能夠創建它併成功查看信息,但是與獲取數據相關的任何操作都會拋出錯誤。

16/07/25 16時33分59秒WARN TaskSetManager:失去任務0.3在階段17.0(TID 86,wlos06.nrm.minn.seagate.com):java.net.SocketTimeoutException:接受超時 在java.net.PlainSocketImpl.socketAccept(Native Method) at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:398) at java.net.ServerSocket.implAccept(ServerSocket.java:530) at java.net。的ServerSocket.accept(ServerSocket.java:498) 在org.apache.spark.api.r.RRDD $ .createRWorker(RRDD.scala:432) 在org.apache.spark.api.r.BaseRRDD.compute(RRDD .scala:63) at org.apache.spark.rdd.RDD.computeOrReCheckCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd。 RDD.computeOrReadCheckpoint(RDD.scala:306) 在org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 在org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReCheckCheck(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd。 MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReCheckCheck(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 在org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 在org.apache.spark.scheduler.Task.run(Task.scala:89) 在org.apache.spark.executor.Executor $ TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:615) (java.lang.Thread.run)(Thread.java:745)

16/07/25 16:33:59 ERROR TaskSetManager:階段17.0中的任務0失敗4次;中止工作 16/07/25 16時33分59秒錯誤RBackendHandler:上org.apache.spark.sql.api.r.SQLUtils dfToCols在invokeJava(isStatic = TRUE,類名,方法名,...)失敗 錯誤: org.apache.spark.SparkException:作業已中止因故障階段:階段17.0(TID 86,wlos06.nrm.minn.seagate.com)丟失任務0.3:任務0級17.0失敗4次,最近一次失敗: java.net.SocketTimeoutException:接受超時 在java.net.PlainSocketImpl.socketAccept(本地方法) 在java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:398) 在java.net.ServerSocket.implAccept(ServerSocket。 Java的:530) 在java.net.ServerSocket.accept(ServerSocket.java:498) 在org.apache.spark.api.r.RRDD $ .createRWorker(RRDD.scala:432) at org.apache.spark.api.r.BaseRRDD.compute(RRDD.scala:63) at org.apache.spark.rdd.RDD.computeOrReCheckCheck(RDD.scala:306) at org.apache.spark。 rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala: 306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache。spark.rdd.MapPartitionsRDD.compute(MapPar

如果我被sparkR命令行類似下面執行它,它得到執行。

~/Downloads/spark-1.6.1-bin-hadoop2.6/bin/sparkR --master yarn-client 

但是當我經由R個執行它,並sparkR。 ?初始化((主=「紗客戶」),它拋出錯誤

是否有人可以幫助解決這些錯誤

+0

我有同樣的問題。你怎麼修好它的? –

回答

5

添加此行帶來的改變:

Sys.setenv("SPARKR_SUBMIT_ARGS"="--master yarn-client sparkr-shell") 

下面是完整的代碼:

Sys.setenv(HADOOP_CONF_DIR = "/etc/hadoop/conf.cloudera.yarn") 
Sys.setenv(SPARK_HOME = "/home/user/Downloads/spark-1.6.1-bin-hadoop2.6") 
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths())) 
library(SparkR) 
Sys.setenv("SPARKR_SUBMIT_ARGS"="--master yarn-client sparkr-shell") 
sc <- sparkR.init(sparkEnvir = list(spark.shuffle.service.enabled=TRUE,spark.dynamicAllocation.enabled=TRUE,spark.dynamicAllocation.initialExecutors="40")) 
hiveContext <- sparkRHive.init(sc) 

n = 1000 
x = data.frame(id = 1:n, val = rnorm(n)) 
xs <- createDataFrame(hiveContext, x) 

xs 

head(xs) 
collect(xs)