2017-04-12 53 views
0

我有一個像火花SQL - 聚合數據幀到一系列

+---------------+------+ 
|id    | value| 
+---------------+------+ 
|    1|118.0| 
|    2|109.0| 
|    3|113.0| 
|    4| 82.0| 
|    5| 60.0| 
|    6|111.0| 
|    7|107.0| 
|    8| 84.0| 
|    9| 91.0| 
|    10|118.0| 
+---------------+------+ 

ANS表想骨料或斌值的範圍0,10,20,30,40,...80,90,100,110,120我如何在SQL或多個特定的火花SQL執行此?

目前我有一個橫向視圖連接的範圍,但這似乎相當笨拙/低效率。

離散化的分位數並不是我想要的,而是具有此範圍的CUT

編輯

https://github.com/collectivemedia/spark-ext/blob/master/sparkext-mllib/src/main/scala/org/apache/spark/ml/feature/Binning.scala將執行動態垃圾桶,但我寧願需要這個指定的範圍。

+0

我已更新答案。那是你在找什麼? –

+0

差不多。乍一看看起來相當不錯。謝謝。 –

+0

'org.apache.spark.ml.feature.Bucketizer'採用明確提供的分割點數組。那麼你應該能夠在輸出列上進行分組。 –

回答

1

我只是一個新手。但我有一個建議。嘗試,如果它會奏效。

SELECT id, (value DIV 10)*10 FROM table_name ; 
+0

你知道如何在scala sql DSL api中工作嗎? –

+0

我不知道這件事。 :-( –

+1

順便說一下:'df.select(('value divide 10).cast(「int」)* 10).show'將會在sql api中工作 –

5

在一般情況下,可使用org.apache.spark.ml.feature.Bucketizer來執行靜態合併:

val data = Array(
    (1, 118.0), (2, 109.0), (3, 113.0), (4, 82.0), (5, 60.0), 
    (6, 111.0), (7, 107.0), (8, 84.0), (9, 91.0), (10, 118.0) 
) 
val df = spark.createDataFrame(data).toDF("id", "value") 

import org.apache.spark.ml.feature.Bucketizer 

val splits = (0 to 12).map(_ * 10.0).toArray 

val bucketizer = new Bucketizer() 
    .setInputCol("value") 
    .setOutputCol("bucket") 
    .setSplits(splits) 

val bucketed = bucketizer.transform(df) 

bucketed.groupBy($"bucket").agg(count($"id").as("count")).show() 

結果:

+------+-----+                
|bucket|count| 
+------+-----+ 
| 8.0| 2| 
| 11.0| 4| 
| 10.0| 2| 
| 6.0| 1| 
| 9.0| 1| 
+------+-----+ 

的bucketizer引發錯誤時值位於所定義的二進制位的外側。可以將分割點定義爲Double.NegativeInfinityDouble.PositiveInfinity來捕獲異常值。

Bucketizer被設計爲通過執行右側桶的二分查找來有效地進行任意拆分。在普通垃圾箱像你這樣的情況下,可以簡單地這樣做:

val binned = df.withColumn("bucket", (($"value" - bin_min)/bin_width).cast("int")) 

其中bin_minbin_width是最低斌左側間隔分別分箱寬度。

+0

但是假設一個存儲桶是空的,那麼這個分組將不會返回任何結果,所以如果我想查看所有存儲桶的列表(以及計數爲0的空列表),那麼可以在沒有連接的情況下執行? –

+0

裝箱後執行與範圍的連接應該非常有效。 –