2016-04-21 97 views

回答

1

如果你想要一個Spark解決方案,我會選擇一個DataFrame。它會混亂,但它是一個有趣的問題:

val testDf = Seq(
    ("A1", 1999), 
    ("A2", 2000), 
    ("A1", 2000), 
    ("A1", 1998), 
    ("A1", 2002), 
    ("B1", 1998) 
).toDF("ID", "YEAR") 

然後,我會進行自連接(第兩,實際上):

val selfJoined = testDf.orderBy($"YEAR").join(
    testDf.orderBy($"YEAR").toDF("R_ID", "R_YEAR"), 
    $"R_ID" === $"ID" && $"YEAR" === ($"R_YEAR" - 1), 
    "full_outer" 
).filter($"ID".isNull || $"R_ID".isNull) 

selfJoined.show 
+----+----+----+------+ 
| ID|YEAR|R_ID|R_YEAR| 
+----+----+----+------+ 
|null|null| A2| 2000| 
| A2|2000|null| null| 
|null|null| B1| 1998| 
| B1|1998|null| null| 
|null|null| A1| 1998| 
| A1|2000|null| null| 
|null|null| A1| 2002| 
| A1|2002|null| null| 
+----+----+----+------+ 

正如你可以從上面看到,我們現在有連續幾年的開始和結束日期。 R_YEAR,當不是null時,包含連續年份的「運行」開始。接下來的一行,YEAR是該年的結束。如果我更熟練地使用Window功能,我可能會使用lag將記錄縫合在一起,但我並不這樣做。我會做另一個自連接,然後groupBy,然後一些數學在select,然後又groupBy

selfJoined.filter($"ID".isNull).as("a").join(
    selfJoined.filter($"R_ID".isNull).as("b"), 
    $"a.R_ID" === $"b.ID" && $"a.R_YEAR" <= $"b.YEAR" 
).groupBy($"a.R_ID", $"a.R_YEAR").agg(min($"b.YEAR") as "last_YEAR") 
.select($"R_ID" as "ID", $"last_YEAR" - $"R_YEAR" + 1 as "inarow") 
.groupBy($"ID").agg(max($"inarow") as "MAX").show 
+---+---+ 
| ID|MAX| 
+---+---+ 
| B1| 1| 
| A1| 3| 
| A2| 1| 
+---+---+ 

Wheee!

+0

感謝您的時間和步驟的詳細解釋,真的很感謝解決方案!但是,我期望的輸出應該有A1 - > 2,因爲A1有1999,2000,2002(這裏2002年不是連續年)。 – Mithunram

+0

看看我的數據,它是不同的。爲了更好的情況,我又增加了A1的一年。 –

+0

大衛格里芬你可以解釋邏輯部分,多個自連接和它背後的算術。厭倦了我的最好把握它看起來乏味,並想確認我的解釋是否正確。 – Mithunram

0

我會嘗試的東西沿着這些路線:

scala> case class DataRow(id: String, year: Int) 
defined class DataRow 
scala> val data = Seq(
      DataRow("A1", 1999), 
      DataRow("A2", 2000), 
      DataRow("A1", 2000), 
      DataRow("B1", 1998), 
      DataRow("A1", 2002) 
     ) 
data: Seq[DataRow] = List(DataRow("A1", 1999), DataRow("A2", 2000), DataRow("A1", 2000), DataRow("B1", 1998), DataRow("A1", 2002)) 
scala> data.groupBy(_.id).mapValues { rows => 
      val years = rows.map(_.year) 
      val firstYear = years.head 
      years.zipWithIndex.takeWhile { case (y, i) => y == firstYear + i }.size 
     } 
res1: Map[String, Int] = Map("B1" -> 1, "A2" -> 1, "A1" -> 2) 

這個計算的連續多年的每個ID的最大數,假設它看到的第一年,是罷工的最早日期。在val years行插入.sorted是不是這種情況。

+0

那不使用'Spark'。 –

+0

哦對,我完全不在話題上,對不起! –

2

如果你不想與星火SQL打擾(在我看來,這是矯枉過正的任務),你可以簡單地使用groupByKey(而每個ID可能年數是合理的)

val rdd = sc.parallelize(Seq(
    ("A1", 1999), 
    ("A2", 2000), 
    ("A1", 2000), 
    ("A1", 1998), 
    ("A1", 2002), 
    ("B1", 1998) 
)) 

def findMaxRange(l: Iterable[Int]) = { 
    val ranges = mutable.ArrayBuffer[Int](1) 
    l.toSeq.sorted.distinct.sliding(2).foreach { case y1 :: tail => 
    if (tail.nonEmpty) { 
     val y2 = tail.head 
     if (y2 - y1 == 1) ranges(ranges.size - 1) += 1 
     else ranges += 1 
    } 
    } 
    ranges.max 
} 

rdd1.groupByKey.map(r => (r._1, findMaxRange(r._2))).collect() 

res7: Array[(String, Int)] = Array((A1,3), (A2,1), (B1,1))