我有數據集,我需要計算數據的連續性,如果它符合某些狀態。示例數據集如下。用例是,如果交換ID連續具有風險和不穩定狀態,那麼在該星期內將計數增加1,並與數據集合並。我正在嘗試使用Spark。如何計算數據集中的連續性?
Date Exchange Id Status Consecutiveness
5/05/2017 a RISKY 0
5/05/2017 b Stable 0
5/05/2017 c Stable 0
5/05/2017 d UNSTABLE 0
5/05/2017 e UNKNOWN 0
5/05/2017 f UNKNOWN 0
6/05/2017 a RISKY 1
6/05/2017 b Stable 0
6/05/2017 c Stable 0
6/05/2017 d UNSTABLE 1
6/05/2017 e UNSTABLE 1
6/05/2017 f UNKNOWN 0
我的方法如下。
- 創建具有當前日期匯率風險和不穩定 條件
- 據幀具有 風險而且不穩定
- 加入2個dataframes並獲得交流不符合標準
- 以前日期的交流創建另一個數據框更新當前日期的連續性
- 與原始數據集合並。
我正在嘗試以下命令。但是,有問題,無法與3,4,5
case class Telecom(Date: String, Exchange: String, Stability: String, Cosecutive: Int)
val emp1 = sc.textFile("file:/// Filename").map(_.split(",")).map(emp1=>Telecom(emp1(0),emp1(1),emp1(2),emp1(4).trim.toInt)).toDF()
val PreviousWeek = sqlContext.sql("select * from T1 limit 10")
emp1.registerTempTable("T1")
val FailPreviousWeek = sqlContext.sql("Select Exchange, Count from T1 where Date = '5/05/2017' and Stability in ('RISKY','UNSTABLE')")
val FailCurrentWeek = sqlContext.sql("Select Exchange, Count from T1 where Date = '6/05/2017' and Stability in ('RISKY','UNSTABLE')")
FailCurrentWeek.join(FailPreviousWeek, FailCurrentWeek("Exchange") === FailPreviousWeek("Exchange"))
val UpdateCurrentWeek = FailCurrentWeek.select($"Exchange",$"Count" +1)
Val UpdateDataSet = emp1.join(UpdateCurrentWeek)
val UpdateCurrentWeek = FailCurrentWeek.select($"Exchange".alias("Exchangeid"),$"Count" +1)
爲什麼本週'6/05/2017'的'e'是'1'? –