2017-06-23 63 views
0

描述問題的最佳方法是給出一個輸入示例和我想要輸出的內容。對Spark中出現的次數進行分類後的數據進行分類

輸入

-------------------- 
|id|timestamp |count| 
| 1|2017-06-22| 1 | 
| 1|2017-06-23| 0 | 
| 1|2017-06-24| 1 | 
| 2|2017-06-22| 0 | 
| 2|2017-06-23| 1 | 

邏輯會是這樣的,如果(的1 S IN計數總數等於或高於Y最後X天)

code = True 

其他

code = False 

比方說X = 5Y = 2然後輸出應該看起來像

輸出

--------------------- 
id | code | 
1 | True | 
2 | False | 

輸入是一個SparkSQLdataframeorg.apache.spark.sql.DataFrame

不聽起來像是一個非常複雜的問題,但我被困在第一步。我只能設法加載數據在dataframe

任何想法?

回答

1

看着你的要求,UDAFaggregation適合最好的。您可以結帳databricksragrawal以獲得更好的理解。

我根據你提供指導,我的理解,我希望這是有幫助的

所有你需要定義UDAF第一。在您成功閱讀上述鏈接後,您就可以做到這一點。

private class ManosAggregateFunction(daysToCheck: Int, countsToCheck: Int) extends UserDefinedAggregateFunction { 

    var referenceDate: String = _ 
    def inputSchema: StructType = new StructType().add("timestamp", StringType).add("count", IntegerType) 
    // the aggregation buffer can also have multiple values in general but 
    // this one just has one: the partial sum 
    def bufferSchema: StructType = new StructType().add("timestamp", StringType).add("count", IntegerType).add("days", IntegerType) 
    // returns just a double: the sum 
    def dataType: DataType = BooleanType 
    // always gets the same result 
    def deterministic: Boolean = true 

    def initialize(buffer: MutableAggregationBuffer): Unit = { 
    buffer.update(0, "") 
    buffer.update(1, 0) 
    buffer.update(2, 0) 
    referenceDate = "" 
    } 

    def update(buffer: MutableAggregationBuffer, input: Row): Unit = { 
    val nowDate = input.getString(0) 
    val count = input.getInt(1) 

    buffer.update(0, nowDate) 
    buffer.update(1, count) 
    } 

    def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { 
    val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd") 
    val previousDate = buffer1.getString(0) 
    val nowDate = buffer2.getString(0) 
    if(previousDate != "") { 
     val oldDate = LocalDate.parse(previousDate, formatter) 
     val newDate = LocalDate.parse(nowDate, formatter) 
     buffer1.update(2, buffer1.getInt(2)+(oldDate.toEpochDay() - newDate.toEpochDay()).toInt) 
    } 
    buffer1.update(0, buffer2.getString(0)) 
    if(buffer1.getInt(2) < daysToCheck) { 
     buffer1.update(1, buffer1.getInt(1) + buffer2.getInt(1)) 
    } 
    } 

    def evaluate(buffer: Row): Any = { 
    countsToCheck <= buffer.getInt(1) 
    } 
} 

在上面UDAFdaysToCheckcountsToCheck是你的問題XY

您可以撥打定義UDAF如下

val manosAgg = new ManosAggregateFunction(5,2) 
    df.orderBy($"timestamp".desc).groupBy("id").agg(manosAgg(col("timestamp"), col("count")).as("code")).show 

最終輸出

+---+-----+ 
| id| code| 
+---+-----+ 
| 1| true| 
| 2|false| 
+---+-----+ 

給定的輸入

val df = Seq(
    (1, "2017-06-22", 1), 
    (1, "2017-06-23", 0), 
    (1, "2017-06-24", 1), 
    (2, "2017-06-28", 0), 
    (2, "2017-06-29", 1) 
).toDF("id","timestamp","count") 
+---+----------+-----+ 
|id |timestamp |count| 
+---+----------+-----+ 
|1 |2017-06-22|1 | 
|1 |2017-06-23|0 | 
|1 |2017-06-24|1 | 
|2 |2017-06-28|0 | 
|2 |2017-06-29|1 | 
+---+----------+-----+ 

我希望你已經得到了你的問題的想法。 :)

相關問題