2017-08-11 61 views
2

在Spark SQL中,如何獲取組中的第一個非空(或不匹配的文本,如'N/A')。在下面的示例中,用戶正在觀看電視頻道,前3個記錄是頻道100,SIGNAL_STRENGHT是N/A,其中下一個記錄的值爲Good,所以我想使用它。獲取組中的第一個非空值

我試過Windows函數,但我有一個像MAX,MIN等

如果我用鉛我只得到下一行,如果我使用無界我不方法;看不到的方法類似fistNotNull。請告知

輸入?

CUSTOMER_ID || TV_CHANNEL_ID || TIME || SIGNAL_STRENGHT 
1 || 100 || 0|| N/A 
1 || 100 || 1|| Good 
1 || 100 || 2 || Meduim 
1 || 100 || 3|| N/A 
1 || 100 || 4|| Poor 
1 || 100 || 5 || Meduim 
1 || 200 || 6 || N/A 
1 || 200 || 7 || N/A 
1 || 200 || 8 || Poor 
1 || 300 || 9 || Good 
1 || 300 || 10 || Good 
1 || 300 || 11 || Good 

預期輸出?

CUSTOMER_ID || TV_CHANNEL_ID || TIME || SIGNAL_STRENGHT 
1 || 100 || 0|| Good 
1 || 100 || 1|| Good 
1 || 100 || 2 || Meduim 
1 || 100 || 3|| Poor 
1 || 100 || 4|| Poor 
1 || 100 || 5 || Meduim 
1 || 200 || 6 || Poor 
1 || 200 || 7 || Poor 
1 || 200 || 8 || Poor 
1 || 300 || 9 || Good 
1 || 300 || 10 || Good 
1 || 300 || 11 || Good 

實際代碼

package com.ganesh.test; 

    import org.apache.spark.SparkContext; 
    import org.apache.spark.sql.*; 
    import org.apache.spark.sql.expressions.Window; 
    import org.apache.spark.sql.expressions.WindowSpec; 
    import org.slf4j.Logger; 
    import org.slf4j.LoggerFactory; 

    public class ChannelLoader { 

     private static final Logger LOGGER = LoggerFactory.getLogger(ChannelLoader.class); 

     public static void main(String[] args) throws AnalysisException { 
      String master = "local[*]"; 
      //region 
      SparkSession sparkSession = SparkSession 
        .builder() 
        .appName(ChannelLoader.class.getName()) 
        .master(master).getOrCreate(); 
      SparkContext context = sparkSession.sparkContext(); 
      context.setLogLevel("ERROR"); 

      SQLContext sqlCtx = sparkSession.sqlContext(); 

      Dataset<Row> rawDataset = sparkSession.read() 
        .format("com.databricks.spark.csv") 
        .option("delimiter", ",") 
        .option("header", "true") 
        .load("sample_channel.csv"); 

      rawDataset.printSchema(); 

      rawDataset.createOrReplaceTempView("channelView"); 
      //endregion 

      WindowSpec windowSpec = Window.partitionBy("CUSTOMER_ID").orderBy("TV_CHANNEL_ID"); 

      rawDataset = sqlCtx.sql("select * ," + 
        " (isNan(SIGNAL_STRENGHT) over (partition by CUSTOMER_ID, TV_CHANNEL_ID order by TIME ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) ) as updatedStren " + 
        " from channelView " + 
        " order by CUSTOMER_ID, TV_CHANNEL_ID, TIME " 
      ); 

      rawDataset.show(); 

      sparkSession.close(); 

     } 
    } 

UPDATE

我看了許多可能的方式,但沒有運氣。所以我用蠻力並得到了預期的結果,我計算了幾列並得出結果。我決定將N/A轉換爲null,這樣當我使用collect_list時,它不會出現。

rawDataset = sqlCtx.sql("select * " + 
      " , (collect_list(SIGNAL_STRENGTH) " + 
      " over (partition by CUSTOMER_ID, TV_CHANNEL_ID order by TIME ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) )" + 
      " as fwdValues " + 
      " , (collect_list(SIGNAL_STRENGTH) " + 
      " over (partition by CUSTOMER_ID, TV_CHANNEL_ID order by TIME ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) )" + 
      " as bkwdValues " + 
      " , (row_number() over (partition by CUSTOMER_ID, TV_CHANNEL_ID order by TIME)) as rank_fwd " + 
      " , (row_number() over (partition by CUSTOMER_ID, TV_CHANNEL_ID order by TIME DESC)) as rank_bkwd " + 
      " from channelView " + 
      " order by CUSTOMER_ID, TV_CHANNEL_ID, TIME " 
    ); 
    rawDataset.show(); 
    rawDataset.createOrReplaceTempView("updatedChannelView"); 
    sqlCtx.sql("select * " + 
      " , SIGNAL_STRENGTH " + 
      ", (case " + 
      " when (SIGNAL_STRENGTH IS NULL AND rank_bkwd = 1) then bkwdValues[size(bkwdValues)-1] " + 
      " when (SIGNAL_STRENGTH IS NULL) then fwdValues[0] " + 
      " else SIGNAL_STRENGTH " + 
      " end) as NEW_SIGNAL_STRENGTH" + 
      " from updatedChannelView " + 
      "" 
    ).show(); 

輸出代碼

 +-----------+-------------+----+---------------+--------------------+--------------------+--------+---------+---------------+-------------------+ 
    |CUSTOMER_ID|TV_CHANNEL_ID|TIME|SIGNAL_STRENGTH|   fwdValues|   bkwdValues|rank_fwd|rank_bkwd|SIGNAL_STRENGTH|NEW_SIGNAL_STRENGTH| 
    +-----------+-------------+----+---------------+--------------------+--------------------+--------+---------+---------------+-------------------+ 
    |   1|   100| 0|   null|[Good, Meduim, Poor]|     []|  1|  6|   null|    Good| 
    |   1|   100| 1|   Good|[Good, Meduim, Poor]|    [Good]|  2|  5|   Good|    Good| 
    |   1|   100| 2|   Meduim|  [Meduim, Poor]|  [Good, Meduim]|  3|  4|   Meduim|    Meduim| 
    |   1|   100| 3|   null|    [Poor]|  [Good, Meduim]|  4|  3|   null|    Poor| 
    |   1|   100| 4|   Poor|    [Poor]|[Good, Meduim, Poor]|  5|  2|   Poor|    Poor| 
    |   1|   100| 5|   null|     []|[Good, Meduim, Poor]|  6|  1|   null|    Poor| 
    |   1|   200| 6|   null|    [Poor]|     []|  1|  3|   null|    Poor| 
    |   1|   200| 7|   null|    [Poor]|     []|  2|  2|   null|    Poor| 
    |   1|   200| 8|   Poor|    [Poor]|    [Poor]|  3|  1|   Poor|    Poor| 
    |   1|   300| 10|   null|    [Good]|     []|  1|  3|   null|    Good| 
    |   1|   300| 11|   null|    [Good]|     []|  2|  2|   null|    Good| 
    |   1|   300| 9|   Good|    [Good]|    [Good]|  3|  1|   Good|    Good| 
    +-----------+-------------+----+---------------+--------------------+--------------------+--------+---------+---------------+-------------------+ 
+0

什麼是分組密鑰? customer_id + tv_channel_id? 您在哪個字段中定義了訂單(因爲獲得「第一個不爲空」意味着有訂單)? – jeanr

+0

我已經添加了示例代碼 – Manjesh

+0

我不太瞭解您期待的輸出,您能否提供一個簡單的示例? @Manjesh – jeanr

回答

1

希望這有助於!

[編輯注 - 原來的問題後更新解決辦法進行了修改]

import pyspark.sql.functions as f 

df = sc.parallelize([ 
    [1, 100, 0, None], 
    [1, 100, 1, 'Good'], 
    [1, 100, 2, 'Medium'], 
    [1, 100, 3, None], 
    [1, 100, 4, 'Poor'], 
    [1, 100, 5, 'Medium'], 
    [1, 200, 6, None], 
    [1, 200, 7, None], 
    [1, 200, 8, 'Poor'], 
    [1, 300, 9, 'Good'], 
    [1, 300,10, 'Good'], 
    [1, 300,11, 'Good'] 
]).toDF(('customer_id', 'tv_channel_id', 'time', 'signal_strength')) 
df.show() 

#convert to pandas dataframe and fill NA as per the requirement then convert it back to spark dataframe 
df1 = df.sort('customer_id', 'tv_channel_id','time').select('customer_id', 'tv_channel_id', 'signal_strength') 
p_df = df1.toPandas() 
p_df["signal_strength"] = p_df.groupby(["customer_id","tv_channel_id"]).transform(lambda x: x.fillna(method='bfill')) 
df2= sqlContext.createDataFrame(p_df).withColumnRenamed("signal_strength","signal_strength_new") 

#replace 'signal_strength' column of original dataframe with the column of above pandas dataframe 
df=df.withColumn('row_index', f.monotonically_increasing_id()) 
df2=df2.withColumn('row_index', f.monotonically_increasing_id()) 
final_df = df.join(df2, on=['customer_id', 'tv_channel_id','row_index']).drop("row_index","signal_strength").\ 
    withColumnRenamed("signal_strength_new","signal_strength").\ 
    sort('customer_id', 'tv_channel_id','time') 
final_df.show() 

輸出是:

+-----------+-------------+----+---------------+ 
|customer_id|tv_channel_id|time|signal_strength| 
+-----------+-------------+----+---------------+ 
|   1|   100| 0|   Good| 
|   1|   100| 1|   Good| 
|   1|   100| 2|   Medium| 
|   1|   100| 3|   Poor| 
|   1|   100| 4|   Poor| 
|   1|   100| 5|   Medium| 
|   1|   200| 6|   Poor| 
|   1|   200| 7|   Poor| 
|   1|   200| 8|   Poor| 
|   1|   300| 9|   Good| 
|   1|   300| 10|   Good| 
|   1|   300| 11|   Good| 
+-----------+-------------+----+---------------+ 
+0

我已經更新了我的示例,我需要底部窗口中的第一個非空值。在你可以看到的第100頻道的例子中,我介紹了兩個空值,它們之間有一個很好的值... – Manjesh

+0

更新了我的答案,現在你應該得到所需的輸出。 – Prem