2017-08-23 130 views
0

我的數據集是這樣的:(前三列輸入,我添加的列4-6,最後一列代表所需的輸出)計算運行時間

+-------------------+------+----+-------------------+-------------------+-------------------+---+----+ 
|   timestamp|status| msg|  end_timestamp|   start_eng|   stop_eng| --|res | 
+-------------------+------+----+-------------------+-------------------+-------------------+---+----+ 
|2017-01-01 06:15:00| ASC_a|nice|2017-01-01 07:00:00|    null|    null|-->| 0 | 
|2017-01-01 07:00:00| ASC_a|nice|2017-01-01 07:15:00|    null|    null|-->| 0 | 
|2017-01-01 07:15:00| start|nice|2017-01-01 08:00:00|2017-01-01 07:15:00|    null|-->| 45 | 
|2017-01-01 08:00:00| start|nice|2017-01-01 08:22:00|2017-01-01 08:00:00|    null|-->| 22 | 
|2017-01-01 08:22:00| ASC_b|init|2017-01-01 09:00:00|    null|    null|-->| 38 | 
|2017-01-01 09:00:00| ASC_b|init|2017-01-01 09:30:00|    null|    null|-->| 30 | 
|2017-01-01 09:30:00| end| bla|2017-01-01 10:00:00|    null|2017-01-01 09:30:00|-->| 0 | 
|2017-01-01 10:00:00| end| bla|2017-01-01 10:45:00|    null|2017-01-01 10:00:00|-->| 0 | 
|2017-01-01 10:45:00| ASC_a|meas|2017-01-01 11:00:00|    null|    null|-->| 0 | 
|2017-01-01 11:00:00| ASC_a|meas|2017-01-01 12:00:00|    null|    null|-->| 0 | 
|2017-01-01 12:00:00| ASC_a|meas|2017-01-01 12:15:00|    null|    null|-->| 0 | 
|2017-01-01 12:15:00| start|meas|2017-01-01 13:00:00|2017-01-01 12:15:00|    null|-->| 45 | 
|2017-01-01 13:00:00| start|meas|2017-01-01 13:22:00|2017-01-01 13:00:00|    null|-->| 22 | 
|2017-01-01 13:22:00| ASC_c|init|2017-01-01 14:00:00|    null|    null|-->| 38 | 
|2017-01-01 14:00:00| ASC_c|init|2017-01-01 14:31:00|    null|    null|-->| 31 | 
|2017-01-01 14:31:00| end|meas|    null|    null|2017-01-01 14:31:00|-->| 0 | 
+-------------------+------+----+-------------------+-------------------+-------------------+---+----+ 

我要計算的從狀態啓動的第一次發生到狀態結束的第一次發生時的引擎運行時間。 (狀態的開始和結束都出現在隨後的列中,因爲我添加了具有爆炸功能的行,但我仍然必須稍後將它們更改爲合理的值)

問題是我不知道如何計算引擎運行時開始和結束之間既不包含開始也不包含結束的行。
我想過使用窗口函數進行計算,但我不知道如何爲此指定窗口。

+0

你還可以分享到目前爲止嘗試過的代碼嗎? –

+0

spark rdd是完全分佈式的問題,你可以在不同的分區中找到'start',行和'end'之間的所有行,你不能在開始和結束之間有一個特定的鍵字段? – Mehrez

+0

如果我找到一種方法來擺脫重複的開始結束條目,並填寫他們的狀態與最後的ASC狀態,我想我可以做幾乎完全一樣的方式,我做到了這裏https://stackoverflow.com/questions/ 45815464 /正確的方法對填充數據集,以數據爲基礎的上窗口。 – user2811630

回答

0

我終於得到它爲小數據集工作。仍然必須對它進行測試。

//get tempstat column 
    val ds3 = ds2.withColumn("tempstat", when($"status".contains("ASC"), $"status").otherwise(null)) 
     .withColumn("tempstat_final", last($"tempstat", true).over(window)) 

    //remove duplicate status 
    val ds5 = ds3.withColumn("new_status", when(!$"status".contains("ASC") && lag($"status", 1).over(window) =!= $"status", $"status").otherwise($"tempstat_final")) 

    //get column that provides window for calculation 
    val ds6 = ds5.withColumn("startFlag", when($"new_status" === "start", 1).otherwise(0)) 
     .withColumn("stopFlag", when($"new_status" === "end", -1).otherwise(0)) 
     .withColumn("bothFlags", $"startFlag" + $"stopFlag") 
     .withColumn("engineFlag", sum($"bothFlags").over(Window.orderBy("timestamp"))) 

    //calculate runtime 
    val ds7 = ds6.withColumn("runtime", when($"engineFlag" === 1, 
     ((unix_timestamp(lead($"timestamp", 1).over(Window.orderBy($"timestamp"))) - unix_timestamp($"timestamp"))/60) 
    ).otherwise(lit(0))) 

輸出和級數如下所示。

+-------------------+------+----+--------+--------------+----------+---------+--------+---------+----------+-------+ 
|   timestamp|status| msg|tempstat|tempstat_final|new_status|startFlag|stopFlag|bothFlags|engineFlag|runtime| 
+-------------------+------+----+--------+--------------+----------+---------+--------+---------+----------+-------+ 
|2017-01-01 06:15:00| ASC_a|nice| ASC_a|   ASC_a|  ASC_a|  0|  0|  0|   0| 0.0| 
|2017-01-01 07:00:00| ASC_a|nice| ASC_a|   ASC_a|  ASC_a|  0|  0|  0|   0| 0.0| 
|2017-01-01 07:15:00| start|nice| null|   ASC_a|  start|  1|  0|  1|   1| 45.0| 
|2017-01-01 08:00:00| start|nice| null|   ASC_a|  ASC_a|  0|  0|  0|   1| 22.0| 
|2017-01-01 08:22:00| ASC_b|init| ASC_b|   ASC_b|  ASC_b|  0|  0|  0|   1| 38.0| 
|2017-01-01 09:00:00| ASC_b|init| ASC_b|   ASC_b|  ASC_b|  0|  0|  0|   1| 30.0| 
|2017-01-01 09:30:00| end| bla| null|   ASC_b|  end|  0|  -1|  -1|   0| 0.0| 
|2017-01-01 10:00:00| end| bla| null|   ASC_b|  ASC_b|  0|  0|  0|   0| 0.0| 
|2017-01-01 10:45:00| ASC_a|meas| ASC_a|   ASC_a|  ASC_a|  0|  0|  0|   0| 0.0| 
|2017-01-01 11:00:00| ASC_a|meas| ASC_a|   ASC_a|  ASC_a|  0|  0|  0|   0| 0.0| 
|2017-01-01 12:00:00| ASC_a|meas| ASC_a|   ASC_a|  ASC_a|  0|  0|  0|   0| 0.0| 
|2017-01-01 12:15:00| start|meas| null|   ASC_a|  start|  1|  0|  1|   1| 45.0| 
|2017-01-01 13:00:00| start|meas| null|   ASC_a|  ASC_a|  0|  0|  0|   1| 60.0| 
|2017-01-01 14:00:00| start|meas| null|   ASC_a|  ASC_a|  0|  0|  0|   1| 60.0| 
|2017-01-01 15:00:00| start|meas| null|   ASC_a|  ASC_a|  0|  0|  0|   1| 22.0| 
|2017-01-01 15:22:00| ASC_c|init| ASC_c|   ASC_c|  ASC_c|  0|  0|  0|   1| 38.0| 
|2017-01-01 16:00:00| ASC_c|init| ASC_c|   ASC_c|  ASC_c|  0|  0|  0|   1| 31.0| 
|2017-01-01 16:31:00| end|meas| null|   ASC_c|  end|  0|  -1|  -1|   0| 0.0| 
+-------------------+------+----+--------+--------------+----------+---------+--------+---------+----------+-------++ 

我總是樂於獲得有關改進或其他解決方案建議的提示,因爲我仍然對spark/scala很陌生。