2017-03-01 39 views
1

我已經做了這個代碼,我的問題是功能強制數據類型,我怎樣才能將所有列包括在同一個數據集列時間戳,另一個問題是如何除了列時間戳以外,對所有列應用函數avg。 非常感謝應用函數在所有列火花

val df = spark.read.option("header",true).option("inferSchema", "true").csv("C:/Users/mhattabi/Desktop/dataTest.csv") 
val result=df.withColumn("new_time",((unix_timestamp(col("time")) /300).cast("long") * 300).cast("timestamp")) 
result("value").cast("float")//here the first question 
val finalresult=result.groupBy("new_time").agg(avg("value")).sort("new_time")//here the second question about avg 
finalresult.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save("C:/mydata.csv") 
+0

難道你不能只爲你想投的每一列添加'withColumn'嗎?和'agg'中的列一樣多的'avg'? – Mariusz

+0

@Mariusz數據集非常大,有很多列的問題,只想做一些除了列時間以外的所有列 – user7394882

回答

0

這是很容易在pyspark實現,但我碰上touble試圖重寫這Scala代碼...我希望你會以某種方式對其進行管理。

from pyspark.sql.functions import * 
df = spark.createDataFrame([(100, "4.5", "5.6")], ["new_time", "col1", "col2"]) 
columns = [col(c).cast('float') if c != 'new_time' else col(c) for c in df.columns] 
aggs = [avg(c) for c in df.columns if c != 'new_time'] 
finalresult = df.select(columns).groupBy('new_time').agg(*aggs) 
finalresult.explain() 

*HashAggregate(keys=[new_time#0L], functions=[avg(cast(col1#14 as double)), avg(cast(col2#15 as double))]) 
+- Exchange hashpartitioning(new_time#0L, 200) 
    +- *HashAggregate(keys=[new_time#0L], functions=[partial_avg(cast(col1#14 as double)), partial_avg(cast(col2#15 as double))]) 
     +- *Project [new_time#0L, cast(col1#1 as float) AS col1#14, cast(col2#2 as float) AS col2#15] 
     +- Scan ExistingRDD[new_time#0L,col1#1,col2#2]