2016-05-31 69 views
2

我已經使用spark-ec2創建了Spark集羣。Spark不會在集羣中分配提供的驅動程序

現在我想提出的是從Postgres的獲取一些數據的任務,豐富它,並轉儲它放回新表,所以我嘗試這樣做,用下面的命令:

PYSPARK_PYTHON=/usr/bin/python2.7 ./spark/bin/spark-submit --jars=/root/jars/postgresql-9.4.1208.jre7.jar --py-files=serializers.py parse_pageviews.py s3n://logs 

,但我得到以下異常

Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) 
    at scala.Option.foreach(Option.scala:236) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858) 
    at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:393) 
    at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:606) 
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) 
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381) 
    at py4j.Gateway.invoke(Gateway.java:259) 
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) 
    at py4j.commands.CallCommand.execute(CallCommand.java:79) 
    at py4j.GatewayConnection.run(GatewayConnection.java:209) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: java.lang.IllegalStateException: Did not find registered driver with class org.postgresql.Driver 
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$2$$anonfun$3.apply(JdbcUtils.scala:58) 
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$2$$anonfun$3.apply(JdbcUtils.scala:58) 
    at scala.Option.getOrElse(Option.scala:120) 
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$2.apply(JdbcUtils.scala:57) 
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$2.apply(JdbcUtils.scala:52) 
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$$anon$1.<init>(JDBCRDD.scala:347) 
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:339) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
    at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:342) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
    at org.apache.spark.scheduler.Task.run(Task.scala:89) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    ... 1 more 

據我瞭解,--jarsoption should serve the following driver across the cluster,但由於某些原因,工人不能找到它。 (如果我沒有記錯)

下面是一些代碼片段上,我訪問Postgres的

# get `attribution_orders` df and register it as table so we can query it 
orders = sqlc.read.jdbc(postgres_url, "attribution_orders") 
sqlc.registerDataFrameAsTable(orders, "attribution_orders") 

# processing... 

# write processed data back to postgres table 
sales_cycles = sqlc.createDataFrame(mapped) 
sales_cycles.write.jdbc(postgres_url, 'scs', mode='overwrite') 

所以我做錯了什麼?我如何分配postgres驅動程序並在集羣中訪問它?謝謝!

回答

0

我相信這是一個類加載器問題,而不是使用--jars添加jar通過yarn-site.xml中的yarn.application.classpath添加它。

相關問題