2017-05-28 27 views
1

我有數據集,我需要計算數據的連續性,如果它符合某些狀態。示例數據集如下。用例是,如果交換ID連續具有風險和不穩定狀態,那麼在該星期內將計數增加1,並與數據集合並。我正在嘗試使用Spark。如何計算數據集中的連續性?

Date Exchange Id Status Consecutiveness 
5/05/2017 a RISKY 0 
5/05/2017 b Stable 0 
5/05/2017 c Stable 0 
5/05/2017 d UNSTABLE 0 
5/05/2017 e UNKNOWN 0 
5/05/2017 f UNKNOWN 0 
6/05/2017 a RISKY 1 
6/05/2017 b Stable 0 
6/05/2017 c Stable 0 
6/05/2017 d UNSTABLE 1 
6/05/2017 e UNSTABLE 1 
6/05/2017 f UNKNOWN 0 

我的方法如下。

  1. 創建具有當前日期匯率風險和不穩定 條件
  2. 據幀具有 風險而且不穩定
  3. 加入2個dataframes並獲得交流不符合標準
  4. 以前日期的交流創建另一個數據框更新當前日期的連續性
  5. 與原始數據集合並。

我正在嘗試以下命令。但是,有問題,無法與3,4,5

case class Telecom(Date: String, Exchange: String, Stability: String, Cosecutive: Int) 

val emp1 = sc.textFile("file:/// Filename").map(_.split(",")).map(emp1=>Telecom(emp1(0),emp1(1),emp1(2),emp1(4).trim.toInt)).toDF() 

val PreviousWeek = sqlContext.sql("select * from T1 limit 10") 

emp1.registerTempTable("T1") 

val FailPreviousWeek = sqlContext.sql("Select Exchange, Count from T1 where Date = '5/05/2017' and Stability in ('RISKY','UNSTABLE')") 

val FailCurrentWeek = sqlContext.sql("Select Exchange, Count from T1 where Date = '6/05/2017' and Stability in ('RISKY','UNSTABLE')") 

FailCurrentWeek.join(FailPreviousWeek, FailCurrentWeek("Exchange") === FailPreviousWeek("Exchange")) 

val UpdateCurrentWeek = FailCurrentWeek.select($"Exchange",$"Count" +1) 

Val UpdateDataSet = emp1.join(UpdateCurrentWeek) 

    val UpdateCurrentWeek = FailCurrentWeek.select($"Exchange".alias("Exchangeid"),$"Count" +1) 
+0

爲什麼本週'6/05/2017'的'e'是'1'? –

回答

0

這對我心愛窗口集合函數一個完美的情況下進行。

我認爲lag(帶when)功能可以這樣做:

滯後(列名:字符串,偏移量爲:int):列返回一個當前行前行偏移值,null如果在當前行之前的行數少於偏移量。

import org.apache.spark.sql.expressions.Window 
val exchIds = Window.partitionBy("Exchange_Id").orderBy("Date") 

val cc = when(lower($"Status") === "risky" && $"lag" === $"Status", 1). 
    when(lower($"Status") === "unstable" && $"lag" === $"Status", 1). 
    otherwise(0) 

val solution = input. 
    withColumn("lag", lag("Status", 1) over exchIds). 
    withColumn("Consecutiveness", cc). 
    orderBy("Date", "Exchange_Id"). 
    select("Date", "Exchange_Id", "Status", "Consecutiveness") 
scala> solution.show 
+---------+-----------+--------+---------------+ 
|  Date|Exchange_Id| Status|Consecutiveness| 
+---------+-----------+--------+---------------+ 
|5/05/2017|   a| RISKY|    0| 
|5/05/2017|   b| Stable|    0| 
|5/05/2017|   c| Stable|    0| 
|5/05/2017|   d|UNSTABLE|    0| 
|5/05/2017|   e| UNKNOWN|    0| 
|5/05/2017|   f| UNKNOWN|    0| 
|6/05/2017|   a| RISKY|    1| 
|6/05/2017|   b| Stable|    0| 
|6/05/2017|   c| Stable|    0| 
|6/05/2017|   d|UNSTABLE|    1| 
|6/05/2017|   e|UNSTABLE|    0| 
|6/05/2017|   f| UNKNOWN|    0| 
+---------+-----------+--------+---------------+ 
+0

嗨Jacek。非常感謝。知道這個功能是奇怪的。 –

+0

但有一個問題 - 如果下一個日期的狀態(07/05/2017)變穩定,那麼它應該再次變爲0,然後在08/05/2017再次變得有風險,那麼它應該從1再次開始,而不是2你可以用這段代碼試試這個,讓我知道。謝謝。 –

0

我終於用蜂巢窗口分區功能與多重循環。

  1. 第一步轉換標誌,以布爾
  2. 在一個循環中與標誌值計算連續性
  3. 然後再次具有考慮到循環 連續性多一個窗口劃分。

這可以使用Spark SQL來完成。