2017-07-28 38 views
1

我在pyspark 1.6.1下表:如何在Pyspark 1.6.1中集成/計算點積?

+--------+-----+--------------------+ 
|  key|carid|    data| 
+--------+-----+--------------------+ 
| time| 1|[0.2, 0.4, 0.5, 0...| 
|velocity| 1|[2.0, 2.1, 2.3, 0...| 
| time| 2|[0.1, 0.35, 0.4, 0..| 
|velocity| 2|[1.0, 1.1, 3.3, 0...| 
| time| 3|[0.3, 0.6, 0.7, 0...| 
|velocity| 3|[2.3, 2.1, 2.3, 0...| 
+--------+-----+--------------------+ 

即我有汽車數量和每個車廂我有非等距離的時間戳的陣列,並用速度值的數組。我想計算的距離每輛車驅動:

+-----+------ -+ 
|carid|distance| 
+-----+--------+ 
| 1|  100| 
| 2|  102| 
| 3|  85| 
+-----+--------+ 

我想梯形數值積分(或簡單地內積(DIFF(時間戳),速度)來計算這個我怎樣才能做到這一點pyspark 1.6.1 ?

回答

0

您可以嘗試在真實數據驗證碼,讓我們知道是否能解決問題了嗎?

import numpy as np 
import pyspark.sql.functions as f 
from pyspark.sql.types import FloatType 

df = sc.parallelize([ 
    ['time', 1, [0.2, 0.4, 0.5 ]], 
    ['velocity',1, [2.0, 2.1, 2.3 ]], 
    ['time', 2, [0.1, 0.35, 0.4]], 
    ['velocity',2, [1.0, 1.1, 3.3 ]] 
]).toDF(('key', 'carid', 'data')) 
df.show() 

df1 = df.sort('carid','key').groupby("carid").agg(f.collect_list("data").alias("timeVelocityPair")) 

def modify_values(l): 
    val = np.trapz(l[1], x=l[0]) 
    return float(val) 
modified_val = f.udf(modify_values, FloatType()) 
final_df = df1.withColumn("distance", modified_val("timeVelocityPair")).drop("timeVelocityPair") 
final_df.show() 
+0

當我執行「DF1 = ...」我總是得到錯誤信息「不處理程序蜂房udf class org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCollectList,因爲:只有原始類型參數可接受D但數組作爲參數1傳遞;「.. – Mandy

+0

對我來說,它的作品,但解決這個錯誤,我會建議遵循這個[鏈接](https://stackoverflow.com/questions/38117360/aggregate-array -type-in​​-spark-dataframe)或者可能會升級到下一個版本(查看這個[bug](https://issues.apache.org/jira/browse/HIVE-10427))。現在,您可以在獨立系統而不是羣集上運行此代碼,並查看該解決方案是否適合您? – Prem

+0

謝謝,但我正在尋找一個解決方案在pyspark 1.6.1中,而不是scala,升級對我來說不是選擇。 – Mandy