2015-10-26 81 views
4

我的spark版本是1.3,我正在使用pyspark。用戶定義的函數中斷pyspark數據框

我有一個叫做df的大型數據框。

from pyspark import SQLContext 
sqlContext = SQLContext(sc) 
df = sqlContext.parquetFile("events.parquet") 

然後我選擇數據框的幾列並嘗試計算行數。這工作正常。

df3 = df.select("start", "end", "mrt") 
print(type(df3)) 
print(df3.count()) 

然後我申請一個用戶自定義函數轉換的一列從字符串到數字,如果我嘗試算我得到的行數,這也能正常工作

from pyspark.sql.functions import UserDefinedFunction 
from pyspark.sql.types import LongType 
CtI = UserDefinedFunction(lambda i: int(i), LongType()) 
df4 = df2.withColumn("mrt-2", CtI(df2.mrt)) 

然而即使類型表明它是一個像df3一樣的數據框也是一個例外。

print(type(df4)) 
print(df4.count()) 

我的錯誤:

--------------------------------------------------------------------------- 
Py4JJavaError        Traceback (most recent call last) 
<ipython-input-10-53941e183807> in <module>() 
     8 df4 = df2.withColumn("mrt-2", CtI(df2.mrt)) 
     9 print(type(df4)) 
---> 10 print(df4.count()) 
    11 df3 = df4.select("start", "end", "mrt-2").withColumnRenamed("mrt-2", "mrt") 

/data/cloudera/parcels/CDH-5.4.7-1.cdh5.4.7.p0.3/lib/spark/python/pyspark/sql/dataframe.py in count(self) 
    299   2L 
    300   """ 
--> 301   return self._jdf.count() 
    302 
    303  def collect(self): 

/data/cloudera/parcels/CDH-5.4.7-1.cdh5.4.7.p0.3/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args) 
    536   answer = self.gateway_client.send_command(command) 
    537   return_value = get_return_value(answer, self.gateway_client, 
--> 538     self.target_id, self.name) 
    539 
    540   for temp_arg in temp_args: 

/data/cloudera/parcels/CDH-5.4.7-1.cdh5.4.7.p0.3/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 
    298     raise Py4JJavaError(
    299      'An error occurred while calling {0}{1}{2}.\n'. 
--> 300      format(target_id, '.', name), value) 
    301    else: 
    302     raise Py4JError(

Py4JJavaError: An error occurred while calling o152.count. 
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1379 in stage 12.0 failed 4 times, most recent failure: Lost task 1379.3 in stage 12.0 (TID 27021, va1ccogbds01.lab.ctllabs.io): org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
    File "/data/0/cloudera/parcels/CDH-5.4.7-1.cdh5.4.7.p0.3/jars/spark-assembly-1.3.0-cdh5.4.7-hadoop2.6.0-cdh5.4.7.jar/pyspark/worker.py", line 101, in main 
    process() 
    File "/data/0/cloudera/parcels/CDH-5.4.7-1.cdh5.4.7.p0.3/jars/spark-assembly-1.3.0-cdh5.4.7-hadoop2.6.0-cdh5.4.7.jar/pyspark/worker.py", line 96, in process 
    serializer.dump_stream(func(split_index, iterator), outfile) 
    File "/data/0/cloudera/parcels/CDH-5.4.7-1.cdh5.4.7.p0.3/jars/spark-assembly-1.3.0-cdh5.4.7-hadoop2.6.0-cdh5.4.7.jar/pyspark/serializers.py", line 236, in dump_stream 
    vs = list(itertools.islice(iterator, batch)) 
    File "/data/cloudera/parcels/CDH-5.4.7-1.cdh5.4.7.p0.3/lib/spark/python/pyspark/sql/functions.py", line 119, in <lambda> 
    File "<ipython-input-10-53941e183807>", line 7, in <lambda> 
TypeError: int() argument must be a string or a number, not 'NoneType' 

at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:135) 
at org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:98) 
at org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:94) 
at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43) 
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) 
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
at org.apache.spark.rdd.RDD$$anonfun$zip$1$$anon$1.hasNext(RDD.scala:743) 
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$6.apply(Aggregate.scala:127) 
at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$6.apply(Aggregate.scala:124) 
at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634) 
at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634) 
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) 
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) 
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) 
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) 
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) 
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) 
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) 
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
at org.apache.spark.scheduler.Task.run(Task.scala:64) 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
at java.lang.Thread.run(Thread.java:745) 

Driver stacktrace: 
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1210) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1199) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1198) 
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1198) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) 
at scala.Option.foreach(Option.scala:236) 
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) 
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1400) 
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1361) 
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
--------------------------------------------------------------------------- 

我使用正確定義功能的用戶?任何想法爲什麼數據框架功能不適用於數據框架?

+0

您省略了跟蹤調用!如何弄清楚這樣的錯誤? – eliasah

+0

@eliasah固定,格式化是一個痛苦。 – deltap

+0

你能打印架構嗎? –

回答

2

從堆棧跟蹤看來,您的列看起來像包含一個None值,它正在打破int強制轉換;你可以嘗試改變你的lambda函數爲lambda i: int(i) if i else None,來處理這種情況。

注意,只是因爲df2.withColumn("mrt-2", CtI(df2.mrt))沒有拋出一個錯誤並不意味着你的代碼是正確的:星火有懶的評價,所以它不會實際嘗試,直到你打電話countcollect或類似運行代碼那。

+0

謝謝,這對我有效。 – deltap

0

你在使用火花筆記本嗎? 我曾經在火花筆記本中遇到同樣的錯誤。 但火花提交相同的代碼運行良好

spark-submit YOURFILE.py