2017-01-30 83 views
3

我想根據該組的上一行中該列的值設置列的值。然後這個更新的值將被用在下一行。如何遍歷pyspark中Dataframe/RDD的每一行。

我有以下數據幀

  id | start_date|sort_date | A | B | 
      ----------------------------------- 
      1 | 1/1/2017 | 31-01-2015 | 1 | 0 | 
      1 | 1/1/2017 | 28-02-2015 | 0 | 0 | 
      1 | 1/1/2017 | 31-03-2015 | 1 | 0 | 
      1 | 1/1/2017 | 30-04-2015 | 1 | 0 | 
      1 | 1/1/2017 | 31-05-2015 | 1 | 0 | 
      1 | 1/1/2017 | 30-06-2015 | 1 | 0 | 
      1 | 1/1/2017 | 31-07-2015 | 1 | 0 | 
      1 | 1/1/2017 | 31-08-2015 | 1 | 0 | 
      1 | 1/1/2017 | 30-09-2015 | 0 | 0 | 
      2 | 1/1/2017 | 31-10-2015 | 1 | 0 | 
      2 | 1/1/2017 | 30-11-2015 | 0 | 0 | 
      2 | 1/1/2017 | 31-12-2015 | 1 | 0 | 
      2 | 1/1/2017 | 31-01-2016 | 1 | 0 | 
      2 | 1/1/2017 | 28-02-2016 | 1 | 0 | 
      2 | 1/1/2017 | 31-03-2016 | 1 | 0 | 
      2 | 1/1/2017 | 30-04-2016 | 1 | 0 | 
      2 | 1/1/2017 | 31-05-2016 | 1 | 0 | 
      2 | 1/1/2017 | 30-06-2016 | 0 | 0 | 

輸出:

  id | start_date|sort_date | A | B | C 
      --------------------------------------- 
      1 | 1/1/2017 | 31-01-2015 | 1 | 0 | 1 
      1 | 1/1/2017 | 28-02-2015 | 0 | 0 | 0 
      1 | 1/1/2017 | 31-03-2015 | 1 | 0 | 1 
      1 | 1/1/2017 | 30-04-2015 | 1 | 0 | 2 
      1 | 1/1/2017 | 31-05-2015 | 1 | 0 | 3 
      1 | 1/1/2017 | 30-06-2015 | 1 | 0 | 4 
      1 | 1/1/2017 | 31-07-2015 | 1 | 0 | 5 
      1 | 1/1/2017 | 31-08-2015 | 1 | 0 | 6 
      1 | 1/1/2017 | 30-09-2015 | 0 | 0 | 0 
      2 | 1/1/2017 | 31-10-2015 | 1 | 0 | 1 
      2 | 1/1/2017 | 30-11-2015 | 0 | 0 | 0 
      2 | 1/1/2017 | 31-12-2015 | 1 | 0 | 1 
      2 | 1/1/2017 | 31-01-2016 | 1 | 0 | 2 
      2 | 1/1/2017 | 28-02-2016 | 1 | 0 | 3 
      2 | 1/1/2017 | 31-03-2016 | 1 | 0 | 4 
      2 | 1/1/2017 | 30-04-2016 | 1 | 0 | 5 
      2 | 1/1/2017 | 31-05-2016 | 1 | 0 | 6 
      2 | 1/1/2017 | 30-06-2016 | 0 | 0 | 0 

集團是ID和日期的

列C是衍生基於列A和B.

如果A == 1且B == 0,則C從前一行+ 1導出C.
還有其他一些條件,但我正在努力與這部分。

假設我們在數據框中有一個sort_date列。

我嘗試以下查詢:

SELECT 
id, 
date, 
sort_date, 
lag(A) OVER (PARTITION BY id, date ORDER BY sort_date) as prev, 
CASE 
    WHEN A=1 AND B= 0 THEN 1 
    WHEN A=1 AND B> 0 THEN prev +1 
    ELSE 0 
END AS A 
FROM 
Table 

這是我做的UDAF

val myFunc = new MyUDAF 
val w = Window.partitionBy(col("ID"), col("START_DATE")).orderBy(col("SORT_DATE")) 
val df = df.withColumn("C", myFunc(col("START_DATE"), col("X"), 
    col("Y"), col("A"), 
    col("B")).over(w)) 

PS:我使用的Spark 1.6

+0

您可以使用** Window函數**與Spark SQL。 – mrsrinivas

+0

你可以添加你試過的代碼嗎? – mrsrinivas

+0

請改善問題:你能否再解釋一下你試圖達到的目標,到目前爲止你做了什麼,你的輸入是什麼,你的期望輸出是什麼,你想在RDD中這樣做,就像標題所說或者在作爲專欄文字的數據框表明了什麼?你是什​​麼意思一個組?你的意思是一個groupby?你想如何分類? –

回答

2

首先定義一個窗口:

import org.apache.spark.sql.expressions.Window 
val winspec = Window.partitionBy("id","start_date").orderBy("sort_date") 

Ne xt創建一個UDAF,它接收A和B,基本上從0開始計算C,每當條件出現時變爲0(A = 1,B = 0)並在任何其他時間增加1。要了解如何編寫一個UDAF看到hereherehere

編輯 這裏的例子是UDAF(不是真的測試,因此有可能是拼寫錯誤)的樣本實現:

import org.apache.spark.sql.Row 
import org.apache.spark.sql.expressions.{MutableAggregationBuffer,UserDefinedAggregateFunction} 
import org.apache.spark.sql.types._ 

class myFunc() extends UserDefinedAggregateFunction { 

    // Input Data Type Schema 
    def inputSchema: StructType = StructType(Array(StructField("A", IntegerType), StructField("A", IntegerType))) 

    // Intermediate Schema 
    def bufferSchema = StructType(Array(StructField("C", IntegerType))) 

    // Returned Data Type . 
    def dataType: DataType = IntegerType 

    // Self-explaining 
    def deterministic = true 

    // This function is called whenever key changes 
    def initialize(buffer: MutableAggregationBuffer) = { 
    buffer(0) = 0 // set number of items to 0 
    } 

    // Iterate over each entry of a group 
    def update(buffer: MutableAggregationBuffer, input: Row) = { 
    buffer(0) = if (input.getInt(0) == 1 && input.getInt(1) == 0) buffer.getInt(0) + 1 else 0 
    } 

    // Merge two partial aggregates - doesn't really matter because the window will make sure the buffer remains in a 
    // single partition 
    def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = { 
    buffer1(0) = buffer1.getInt(0) + buffer2.getInt(0) 
    } 

    // Called after all the entries are exhausted. 
    def evaluate(buffer: Row) = { 
    buffer.getInt(0) 
    } 

} 

末將其應用於您的數據框。假設您命名了您的UDAF myFunc:

val f = new myFunc() 
val newDF = df.withColumn("newC", f($"A",$"B").over(winspec)) 
+0

C是派生列。最初每行有C = 0。我需要根據前一行C的計算值計算當前行的C值。在這種情況下,前面的C將始終爲0. –

+0

我改變了我的問題以反映這一點。這是我的錯誤。 –

+0

我剛在這裏叫它newC。創建計算的部分是UDAF。除非A = 1且B = 0,否則UDAF將以0開始newC並將其增加1。窗口的作用是確保UDAF的輸入被正確分區並正確排列 –