在Spark SQL中,如何獲取組中的第一個非空(或不匹配的文本,如'N/A')。在下面的示例中,用戶正在觀看電視頻道,前3個記錄是頻道100,SIGNAL_STRENGHT是N/A,其中下一個記錄的值爲Good,所以我想使用它。獲取組中的第一個非空值
我試過Windows函數,但我有一個像MAX,MIN等
如果我用鉛我只得到下一行,如果我使用無界我不方法;看不到的方法類似fistNotNull。請告知
輸入?
CUSTOMER_ID || TV_CHANNEL_ID || TIME || SIGNAL_STRENGHT
1 || 100 || 0|| N/A
1 || 100 || 1|| Good
1 || 100 || 2 || Meduim
1 || 100 || 3|| N/A
1 || 100 || 4|| Poor
1 || 100 || 5 || Meduim
1 || 200 || 6 || N/A
1 || 200 || 7 || N/A
1 || 200 || 8 || Poor
1 || 300 || 9 || Good
1 || 300 || 10 || Good
1 || 300 || 11 || Good
預期輸出?
CUSTOMER_ID || TV_CHANNEL_ID || TIME || SIGNAL_STRENGHT
1 || 100 || 0|| Good
1 || 100 || 1|| Good
1 || 100 || 2 || Meduim
1 || 100 || 3|| Poor
1 || 100 || 4|| Poor
1 || 100 || 5 || Meduim
1 || 200 || 6 || Poor
1 || 200 || 7 || Poor
1 || 200 || 8 || Poor
1 || 300 || 9 || Good
1 || 300 || 10 || Good
1 || 300 || 11 || Good
實際代碼
package com.ganesh.test;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.*;
import org.apache.spark.sql.expressions.Window;
import org.apache.spark.sql.expressions.WindowSpec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ChannelLoader {
private static final Logger LOGGER = LoggerFactory.getLogger(ChannelLoader.class);
public static void main(String[] args) throws AnalysisException {
String master = "local[*]";
//region
SparkSession sparkSession = SparkSession
.builder()
.appName(ChannelLoader.class.getName())
.master(master).getOrCreate();
SparkContext context = sparkSession.sparkContext();
context.setLogLevel("ERROR");
SQLContext sqlCtx = sparkSession.sqlContext();
Dataset<Row> rawDataset = sparkSession.read()
.format("com.databricks.spark.csv")
.option("delimiter", ",")
.option("header", "true")
.load("sample_channel.csv");
rawDataset.printSchema();
rawDataset.createOrReplaceTempView("channelView");
//endregion
WindowSpec windowSpec = Window.partitionBy("CUSTOMER_ID").orderBy("TV_CHANNEL_ID");
rawDataset = sqlCtx.sql("select * ," +
" (isNan(SIGNAL_STRENGHT) over (partition by CUSTOMER_ID, TV_CHANNEL_ID order by TIME ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) ) as updatedStren " +
" from channelView " +
" order by CUSTOMER_ID, TV_CHANNEL_ID, TIME "
);
rawDataset.show();
sparkSession.close();
}
}
UPDATE
我看了許多可能的方式,但沒有運氣。所以我用蠻力並得到了預期的結果,我計算了幾列並得出結果。我決定將N/A轉換爲null,這樣當我使用collect_list時,它不會出現。
rawDataset = sqlCtx.sql("select * " +
" , (collect_list(SIGNAL_STRENGTH) " +
" over (partition by CUSTOMER_ID, TV_CHANNEL_ID order by TIME ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) )" +
" as fwdValues " +
" , (collect_list(SIGNAL_STRENGTH) " +
" over (partition by CUSTOMER_ID, TV_CHANNEL_ID order by TIME ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) )" +
" as bkwdValues " +
" , (row_number() over (partition by CUSTOMER_ID, TV_CHANNEL_ID order by TIME)) as rank_fwd " +
" , (row_number() over (partition by CUSTOMER_ID, TV_CHANNEL_ID order by TIME DESC)) as rank_bkwd " +
" from channelView " +
" order by CUSTOMER_ID, TV_CHANNEL_ID, TIME "
);
rawDataset.show();
rawDataset.createOrReplaceTempView("updatedChannelView");
sqlCtx.sql("select * " +
" , SIGNAL_STRENGTH " +
", (case " +
" when (SIGNAL_STRENGTH IS NULL AND rank_bkwd = 1) then bkwdValues[size(bkwdValues)-1] " +
" when (SIGNAL_STRENGTH IS NULL) then fwdValues[0] " +
" else SIGNAL_STRENGTH " +
" end) as NEW_SIGNAL_STRENGTH" +
" from updatedChannelView " +
""
).show();
輸出代碼
+-----------+-------------+----+---------------+--------------------+--------------------+--------+---------+---------------+-------------------+
|CUSTOMER_ID|TV_CHANNEL_ID|TIME|SIGNAL_STRENGTH| fwdValues| bkwdValues|rank_fwd|rank_bkwd|SIGNAL_STRENGTH|NEW_SIGNAL_STRENGTH|
+-----------+-------------+----+---------------+--------------------+--------------------+--------+---------+---------------+-------------------+
| 1| 100| 0| null|[Good, Meduim, Poor]| []| 1| 6| null| Good|
| 1| 100| 1| Good|[Good, Meduim, Poor]| [Good]| 2| 5| Good| Good|
| 1| 100| 2| Meduim| [Meduim, Poor]| [Good, Meduim]| 3| 4| Meduim| Meduim|
| 1| 100| 3| null| [Poor]| [Good, Meduim]| 4| 3| null| Poor|
| 1| 100| 4| Poor| [Poor]|[Good, Meduim, Poor]| 5| 2| Poor| Poor|
| 1| 100| 5| null| []|[Good, Meduim, Poor]| 6| 1| null| Poor|
| 1| 200| 6| null| [Poor]| []| 1| 3| null| Poor|
| 1| 200| 7| null| [Poor]| []| 2| 2| null| Poor|
| 1| 200| 8| Poor| [Poor]| [Poor]| 3| 1| Poor| Poor|
| 1| 300| 10| null| [Good]| []| 1| 3| null| Good|
| 1| 300| 11| null| [Good]| []| 2| 2| null| Good|
| 1| 300| 9| Good| [Good]| [Good]| 3| 1| Good| Good|
+-----------+-------------+----+---------------+--------------------+--------------------+--------+---------+---------------+-------------------+
什麼是分組密鑰? customer_id + tv_channel_id? 您在哪個字段中定義了訂單(因爲獲得「第一個不爲空」意味着有訂單)? – jeanr
我已經添加了示例代碼 – Manjesh
我不太瞭解您期待的輸出,您能否提供一個簡單的示例? @Manjesh – jeanr