2017-07-31 37 views
1

我有一個的大數據幀有222列我想要做像下面的例子認沽值從一行到另一階dataaframe

|id  |day   |col1 |col2 | col3 .................... 
+----------+----------------+-------+-----+ 
|  329|    0| null|2.0 
|  329|    42| null|null 
|  329|    72| 5.55|null 
|  329|    106| null|null 
|  329|    135| null|3.0 
|  329|    168| null|4.0 
|  329|    189| 4.995|null 
|  329|    212| null|6.0 
|  329|    247| null|null 
|  329|    274| null|8.0 



|id  |  day  |col1 |col2 |....................... 
+----------+----------------+-------+-----+ 
|  329|    0| null|2.0 
|  329|    42| null|2.0 
|  329|    72| 5.55|2.0 
|  329|    106| 5.55|2.0 
|  329|    135| 5.55|3.0 
|  329|    168| 5.55|4.0 
|  329|    189| 4.995|4.0 
|  329|    212| 4.995|6.0 
|  329|    247| 4.995|6.0 
|  329|    274| 4.995|8.0 
. 
. 
. 
. 
. 

1.read行1 2.我有85K的唯一ID的和每個ID有10個結果(只有一個ID的示出的示例)3.如果 在第2行的數據不存在,則把它從ID的前一行

我得到導致這樣

id   | day   |original_col1 |Result_col1|prevValue| 
+----------+----------------+--------------+-----------+---------+ 
|  329|    0| null  | null |  null| 
|  329|    42| null  | null |  null| 
|  329|    72| 5.55  | 5.55 |  null| 
|  329|    106| null  | 5.55 |  5.55| 
|  329|    135| null  | null |  null| 
|  329|    168| null  | null |  null| 
|  329|    189| 4.995  | 4.995 |  null| 
|  329|    212| null  | 4.995 | 4.995| 
|  330|.................................................... 
|  330|..................................................... 
     . 
+0

是否有確定性的方式來排序數據,以便能夠使用窗口函數(滯後)?我知道你希望在由col「id」定義的分區中應用上面的邏輯,但是除非你有辦法定義一些排序(假定順序很重要),對於id爲「1」的分區,對於「col1」你可能會在第1/2/3行得到空值,結果會有所不同。如果數據沒有排序,您可以嘗試使用monotonically_increasing_id()函數在從文件/源讀取數據後立即生成order_id。 – Traian

+0

我忘了添加一列請現在檢查,ID是唯一的,強制性ID不過是用戶而且每個ID都少於6條記錄。 –

回答

3

你不能做到這一點與現有的窗函數(例如落後)。您需要使用類似的概念進行分區和排序,但需要使用定製邏輯來滾動非空值。

case class MyRec(id: Integer, day: Integer, col1: Option[Double], col2: Option[Double]) 

defined class MyRec 

scala> :paste 
// Entering paste mode (ctrl-D to finish) 

val ds = Seq(
    MyRec(329, 0, None, Some(2.0)), 
    MyRec(329, 42, None, None), 
    MyRec(329, 72, Some(5.55), None), 
    MyRec(329, 106, None, None), 
    MyRec(329, 135, None, Some(3.0)), 
    MyRec(329, 168, None, Some(4.0)), 
    MyRec(329, 189, Some(4.995), None), 
    MyRec(329, 212, None, Some(6.0)), 
    MyRec(329, 247, None, None), 
    MyRec(329, 274, None, Some(8.0)) 
).toDS() 

ds.printSchema() 
ds.show(false) 

val updated_ds = ds.repartition('id).sortWithinPartitions('id, 'day) 
    .mapPartitions(iter => { 
    var crtId: Integer = null 
    var prevId: Integer = null 
    var rollingVals = collection.mutable.Map[String, Option[Double]]() 
    for (rec <- iter) yield { 
     crtId = rec.id 

     // 1st record for new id 
     if (prevId == null || crtId != prevId) { 
     rollingVals = collection.mutable.Map[String, Option[Double]]() 
     prevId = crtId 
     } 

     rollingVals("col1") = if (rec.col1.isDefined) rec.col1 else rollingVals.getOrElse("col1", None) 
     rollingVals("col2") = if (rec.col2.isDefined) rec.col2 else rollingVals.getOrElse("col2", None) 
     MyRec(rec.id, rec.day, rollingVals("col1"), rollingVals("col2")) 
    } 
    }) 

updated_ds.printSchema() 
updated_ds.show(false) 

// Exiting paste mode, now interpreting. 

root 
|-- id: integer (nullable = true) 
|-- day: integer (nullable = true) 
|-- col1: double (nullable = true) 
|-- col2: double (nullable = true) 

+---+---+-----+----+ 
|id |day|col1 |col2| 
+---+---+-----+----+ 
|329|0 |null |2.0 | 
|329|42 |null |null| 
|329|72 |5.55 |null| 
|329|106|null |null| 
|329|135|null |3.0 | 
|329|168|null |4.0 | 
|329|189|4.995|null| 
|329|212|null |6.0 | 
|329|247|null |null| 
|329|274|null |8.0 | 
+---+---+-----+----+ 

root 
|-- id: integer (nullable = true) 
|-- day: integer (nullable = true) 
|-- col1: double (nullable = true) 
|-- col2: double (nullable = true) 

+---+---+-----+----+ 
|id |day|col1 |col2| 
+---+---+-----+----+ 
|329|0 |null |2.0 | 
|329|42 |null |2.0 | 
|329|72 |5.55 |2.0 | 
|329|106|5.55 |2.0 | 
|329|135|5.55 |3.0 | 
|329|168|5.55 |4.0 | 
|329|189|4.995|4.0 | 
|329|212|4.995|6.0 | 
|329|247|4.995|6.0 | 
|329|274|4.995|8.0 | 
+---+---+-----+----+ 

ds: org.apache.spark.sql.Dataset[MyRec] = [id: int, day: int ... 2 more fields] 
updated_ds: org.apache.spark.sql.Dataset[MyRec] = [id: int, day: int ... 2 more fields] 
+0

它的工作謝謝你 –

1

使用窗函數,然後案例時:

val df2 = df 
    .withColumn("prevValue", lag('col1, 1).over(Window.partitionBy('id).orderBy('day))) 
    .withColumn("col1", when('col1.isNull, 'prevValue).otherwise('col1)) 

進口也spark.implicits._

+0

我是新來的,所以請理解它可能很容易錯誤:重載的方法值滯後與替代品: (電子郵件:org.apache.spark.sql.Column,偏移量:Int,defaultValue:任何)org.apache.spark.sql .COLUMN (COLUMNNAME:字符串,偏移量:中等,默認值:任意)org.apache.spark.sql.Column (COLUMNNAME:字符串,偏移量:智力)org.apache.spark.sql.Column (例如:org.apache.spark.sql.Column,offset:Int)org.apache.spark.sql.Column 不能應用於(String) –

+0

@RahulNirdhar對不起,我忘了一個參數。現在它應該工作:) –

+0

謝謝,但它不工作,因爲我想,它給出了錯誤的結果,請參閱結果的問題 –