2015-04-28 65 views
0

我有一個非常簡單的RDD叫做STjoin,我在這個RDD上傳遞了一個簡單的函數來獲取表示日期時間的字符串。在Spark中添加日期字段到RDD

該代碼通過了懶惰評估,但如果我運行最後一行(STjoinday.take(5)),則出現錯誤。

def parsedate(x): 
    try: 
     dt=dateutil.parser.parse(x[1]).date() 
    except: 
     dt=dateutil.parser.parse("01 Jan 1900 00:00:00").date() 

    x.append(dt)  
    return x 

STjoinday=STjoin.map(lambda line: parsedate(line)) 
#STjoinday.take(5) 

這裏有什麼問題?

龍誤差低於回溯:

15/04/27 22:14:02 ERROR Executor: Exception in task 0.0 in stage 6.0 (TID 8) 
org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
    File "/home/terrapin/Spark_Hadoop/spark-1.1.1-bin-cdh4/python/pyspark/worker.py", line 79, in main 
    serializer.dump_stream(func(split_index, iterator), outfile) 
    File "/home/terrapin/Spark_Hadoop/spark-1.1.1-bin-cdh4/python/pyspark/serializers.py", line 196, in dump_stream 
    self.serializer.dump_stream(self._batched(iterator), stream) 
    File "/home/terrapin/Spark_Hadoop/spark-1.1.1-bin-cdh4/python/pyspark/serializers.py", line 127, in dump_stream 
    for obj in iterator: 
    File "/home/terrapin/Spark_Hadoop/spark-1.1.1-bin-cdh4/python/pyspark/serializers.py", line 185, in _batched 
    for item in iterator: 
    File "/home/terrapin/Spark_Hadoop/spark-1.1.1-bin-cdh4/python/pyspark/rdd.py", line 1147, in takeUpToNumLeft 
    yield next(iterator) 
    File "/home/terrapin/Spark_Hadoop/spark-1.1.1-bin-cdh4/test3.py", line 72, in parsedate 
    dt=dateutil.parser.parse("01 Jan 1900 00:00:00").date() 
AttributeError: 'module' object has no attribute 'parser' 

    at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:124) 
    at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:154) 
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:87) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) 
    at org.apache.spark.scheduler.Task.run(Task.scala:54) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 
15/04/27 22:14:02 ERROR TaskSetManager: Task 0 in stage 6.0 failed 1 times; aborting job 
Traceback (most recent call last): 
    File "/home/terrapin/Spark_Hadoop/spark-1.1.1-bin-cdh4/test3.py", line 79, in <module> 
    STjoinday.take(5) 
    File "/home/terrapin/Spark_Hadoop/spark-1.1.1-bin-cdh4/python/pyspark/rdd.py", line 1152, in take 
    res = self.context.runJob(self, takeUpToNumLeft, p, True) 
    File "/home/terrapin/Spark_Hadoop/spark-1.1.1-bin-cdh4/python/pyspark/context.py", line 770, in runJob 
    it = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, javaPartitions, allowLocal) 
    File "/home/terrapin/Spark_Hadoop/spark-1.1.1-bin-cdh4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__ 
    File "/home/terrapin/Spark_Hadoop/spark-1.1.1-bin-cdh4/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value 
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. 
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 1 times, most recent failure: Lost task 0.0 in stage 6.0 (TID 8, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
    File "/home/terrapin/Spark_Hadoop/spark-1.1.1-bin-cdh4/python/pyspark/worker.py", line 79, in main 
    serializer.dump_stream(func(split_index, iterator), outfile) 
    File "/home/terrapin/Spark_Hadoop/spark-1.1.1-bin-cdh4/python/pyspark/serializers.py", line 196, in dump_stream 
    self.serializer.dump_stream(self._batched(iterator), stream) 
    File "/home/terrapin/Spark_Hadoop/spark-1.1.1-bin-cdh4/python/pyspark/serializers.py", line 127, in dump_stream 
    for obj in iterator: 
    File "/home/terrapin/Spark_Hadoop/spark-1.1.1-bin-cdh4/python/pyspark/serializers.py", line 185, in _batched 
    for item in iterator: 
    File "/home/terrapin/Spark_Hadoop/spark-1.1.1-bin-cdh4/python/pyspark/rdd.py", line 1147, in takeUpToNumLeft 
    yield next(iterator) 
    File "/home/terrapin/Spark_Hadoop/spark-1.1.1-bin-cdh4/test3.py", line 72, in parsedate 
    dt=dateutil.parser.parse("01 Jan 1900 00:00:00").date() 
AttributeError: 'module' object has no attribute 'parser' 

     org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:124) 
     org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:154) 
     org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:87) 
     org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) 
     org.apache.spark.rdd.RDD.iterator(RDD.scala:229) 
     org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) 
     org.apache.spark.scheduler.Task.run(Task.scala:54) 
     org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) 
     java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
     java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
     java.lang.Thread.run(Thread.java:745) 
Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) 
    at scala.Option.foreach(Option.scala:236) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391) 
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) 
    at akka.actor.ActorCell.invoke(ActorCell.scala:456) 
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) 
    at akka.dispatch.Mailbox.run(Mailbox.scala:219) 
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) 
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
+1

貌似問題是'dateutil'。你輸入了嗎?我不確定,但也許它也需要在執行者或其他東西上可用?嘗試使用'strptime'編寫你自己的解析器(我已經看到了可用的機器,但沒有'dateutil'),或者用一個替代函數替換它,以確保所有的_else_工作正常。 – ohruunuruus

+0

是的,我已經導入它(導入dateutil和導入dateutil.parser)。這是在本地運行的,所以不應該存在執行者問題。我想過使用strptime,但如果我的日期有很多不同的格式,那麼它就不那麼靈活了。 –

回答

0

貌似dateutil是不是一個標準的Python PKG。您需要將其分發給每個工作節點。 你可以發佈什麼時,你只是在運行python shell後導入dateutil會發生什麼?可能是你錯過PYTHONPATH的一些條目

+0

如果我只是運行pyspark並鍵入'import dateutil'或'import dateutil.parser',我不會收到任何錯誤消息。然後我可以運行'dateutil.parser.parse(「Jan Jan 1900 00:00:00」)。date()'並獲得正確的結果。 –

+0

我發現了一種可行的方法,並將其作爲答案。謝謝。 –

1

正如在其他答案和評論中指出的,問題在於輸入dateutils。我發現了一種行之有效的方式,儘管我不確定其他人爲什麼會失敗。取而代之的是上面:

from dateutil.parser import parse as parse_date 

然後使用:

dt=parse_date("01 Jan 1900 00:00:00").date() 
相關問題