2016-10-28 137 views
1

我有時間戳數據集,則在PySpark錯誤:AttributeError的:「NoneType」對象有沒有屬性「_jvm」

格式,我已經寫在pyspark一個UDF來處理此數據集,並返回作爲地圖的鍵值。但是我得到了錯誤信息。

數據集:df_ts_list

+--------------------+ 
|    ts_list| 
+--------------------+ 
|[1477411200, 1477...| 
|[1477238400, 1477...| 
|[1477022400, 1477...| 
|[1477224000, 1477...| 
|[1477256400, 1477...| 
|[1477346400, 1476...| 
|[1476986400, 1477...| 
|[1477321200, 1477...| 
|[1477306800, 1477...| 
|[1477062000, 1477...| 
|[1477249200, 1477...| 
|[1477040400, 1477...| 
|[1477090800, 1477...| 
+--------------------+ 

Pyspark UDF:

>>> def on_time(ts_list): 
...  import sys 
...  import os 
...  sys.path.append('/usr/lib/python2.7/dist-packages') 
...  os.system("sudo apt-get install python-numpy -y") 
...  import numpy as np 
...  import datetime 
...  import time 
...  from datetime import timedelta 
...  ts = np.array(ts_list) 
...  if ts.size == 0: 
...    count = 0 
...    duration = 0 
...    st = time.mktime(datetime.now()) 
...    ymd = str(datetime.fromtimestamp(st).date()) 
...  else: 
...    ts.sort() 
...    one_tag = [] 
...    start = float(ts[0]) 
...    for i in range(len(ts)): 
...      if i == (len(ts)) - 1: 
...        end = float(ts[i]) 
...        a_round = [start, end] 
...        one_tag.append(a_round) 
...      else: 
...        diff = (datetime.datetime.fromtimestamp(float(ts[i+1])) - datetime.datetime.fromtimestamp(float(ts[i]))) 
...        if abs(diff.total_seconds()) > 3600: 
...          end = float(ts[i]) 
...          a_round = [start, end] 
...          one_tag.append(a_round) 
...          start = float(ts[i+1]) 
...    one_tag = [u for u in one_tag if u[1] - u[0] > 300] 
...    count = int(len(one_tag)) 
...    duration = int(np.diff(one_tag).sum()) 
...    ymd = str(datetime.datetime.fromtimestamp(time.time()).date()) 
...  return {'count':count,'duration':duration, 'ymd':ymd} 

Pyspark代碼:

>>> on_time=udf(on_time, MapType(StringType(),StringType())) 
>>> df_ts_list.withColumn("one_tag", on_time("ts_list")).select("one_tag").show() 

錯誤:

Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
    File "/usr/lib/spark/python/pyspark/worker.py", line 172, in main 
    process() 
    File "/usr/lib/spark/python/pyspark/worker.py", line 167, in process 
    serializer.dump_stream(func(split_index, iterator), outfile) 
    File "/usr/lib/spark/python/pyspark/worker.py", line 106, in <lambda> 
    func = lambda _, it: map(mapper, it) 
    File "/usr/lib/spark/python/pyspark/worker.py", line 92, in <lambda> 
    mapper = lambda a: udf(*a) 
    File "/usr/lib/spark/python/pyspark/worker.py", line 70, in <lambda> 
    return lambda *a: f(*a) 
    File "<stdin>", line 27, in on_time 
    File "/usr/lib/spark/python/pyspark/sql/functions.py", line 39, in _ 
    jc = getattr(sc._jvm.functions, name)(col._jc if isinstance(col, Column) else col) 
AttributeError: 'NoneType' object has no attribute '_jvm' 

任何幫助,將不勝感激!

回答

5

錯誤消息說,在udf的第27行,你正在調用一些pyspark sql函數。它與abs()一致,所以我想你上面的某個地方叫from pyspark.sql.functions import *,它會覆蓋python的abs()函數。

0

Mariusz答案並沒有真正幫助我。所以,如果你喜歡我找到這個,因爲它是谷歌唯一的結果,你是pyspark的新手(一般來說會引發火花),這是對我有用的。

在我的情況下,我得到了這個錯誤,因爲我試圖在pyspark環境設置之前執行pyspark代碼。

確保pyspark可用,並在根據pyspark.sql.functions進行調用之前進行設置,解決了我的問題。

相關問題