我有一定的ID與各自多年的經營中的每一行:如何找出最大連續多年使用Scala的每個ID /星火
例子:
ID YEAR
A1 1999
A2 2000
A1 2000
B1 1998
A1 2002
現在,我需要確定的連續多年用於每個ID 結果號,
A1 : 2 because[1999, 2000 ]
等,
我有一定的ID與各自多年的經營中的每一行:如何找出最大連續多年使用Scala的每個ID /星火
例子:
ID YEAR
A1 1999
A2 2000
A1 2000
B1 1998
A1 2002
現在,我需要確定的連續多年用於每個ID 結果號,
A1 : 2 because[1999, 2000 ]
等,
如果你想要一個Spark
解決方案,我會選擇一個DataFrame
。它會混亂,但它是一個有趣的問題:
val testDf = Seq(
("A1", 1999),
("A2", 2000),
("A1", 2000),
("A1", 1998),
("A1", 2002),
("B1", 1998)
).toDF("ID", "YEAR")
然後,我會進行自連接(第兩,實際上):
val selfJoined = testDf.orderBy($"YEAR").join(
testDf.orderBy($"YEAR").toDF("R_ID", "R_YEAR"),
$"R_ID" === $"ID" && $"YEAR" === ($"R_YEAR" - 1),
"full_outer"
).filter($"ID".isNull || $"R_ID".isNull)
selfJoined.show
+----+----+----+------+
| ID|YEAR|R_ID|R_YEAR|
+----+----+----+------+
|null|null| A2| 2000|
| A2|2000|null| null|
|null|null| B1| 1998|
| B1|1998|null| null|
|null|null| A1| 1998|
| A1|2000|null| null|
|null|null| A1| 2002|
| A1|2002|null| null|
+----+----+----+------+
正如你可以從上面看到,我們現在有連續幾年的開始和結束日期。 R_YEAR
,當不是null
時,包含連續年份的「運行」開始。接下來的一行,YEAR
是該年的結束。如果我更熟練地使用Window
功能,我可能會使用lag
將記錄縫合在一起,但我並不這樣做。我會做另一個自連接,然後groupBy
,然後一些數學在select
,然後又groupBy
:
selfJoined.filter($"ID".isNull).as("a").join(
selfJoined.filter($"R_ID".isNull).as("b"),
$"a.R_ID" === $"b.ID" && $"a.R_YEAR" <= $"b.YEAR"
).groupBy($"a.R_ID", $"a.R_YEAR").agg(min($"b.YEAR") as "last_YEAR")
.select($"R_ID" as "ID", $"last_YEAR" - $"R_YEAR" + 1 as "inarow")
.groupBy($"ID").agg(max($"inarow") as "MAX").show
+---+---+
| ID|MAX|
+---+---+
| B1| 1|
| A1| 3|
| A2| 1|
+---+---+
Wheee!
我會嘗試的東西沿着這些路線:
scala> case class DataRow(id: String, year: Int)
defined class DataRow
scala> val data = Seq(
DataRow("A1", 1999),
DataRow("A2", 2000),
DataRow("A1", 2000),
DataRow("B1", 1998),
DataRow("A1", 2002)
)
data: Seq[DataRow] = List(DataRow("A1", 1999), DataRow("A2", 2000), DataRow("A1", 2000), DataRow("B1", 1998), DataRow("A1", 2002))
scala> data.groupBy(_.id).mapValues { rows =>
val years = rows.map(_.year)
val firstYear = years.head
years.zipWithIndex.takeWhile { case (y, i) => y == firstYear + i }.size
}
res1: Map[String, Int] = Map("B1" -> 1, "A2" -> 1, "A1" -> 2)
這個計算的連續多年的每個ID的最大數,假設它看到的第一年,是罷工的最早日期。在val years
行插入.sorted
是不是這種情況。
那不使用'Spark'。 –
哦對,我完全不在話題上,對不起! –
如果你不想與星火SQL打擾(在我看來,這是矯枉過正的任務),你可以簡單地使用groupByKey(而每個ID可能年數是合理的)
val rdd = sc.parallelize(Seq(
("A1", 1999),
("A2", 2000),
("A1", 2000),
("A1", 1998),
("A1", 2002),
("B1", 1998)
))
def findMaxRange(l: Iterable[Int]) = {
val ranges = mutable.ArrayBuffer[Int](1)
l.toSeq.sorted.distinct.sliding(2).foreach { case y1 :: tail =>
if (tail.nonEmpty) {
val y2 = tail.head
if (y2 - y1 == 1) ranges(ranges.size - 1) += 1
else ranges += 1
}
}
ranges.max
}
rdd1.groupByKey.map(r => (r._1, findMaxRange(r._2))).collect()
res7: Array[(String, Int)] = Array((A1,3), (A2,1), (B1,1))
感謝您的時間和步驟的詳細解釋,真的很感謝解決方案!但是,我期望的輸出應該有A1 - > 2,因爲A1有1999,2000,2002(這裏2002年不是連續年)。 – Mithunram
看看我的數據,它是不同的。爲了更好的情況,我又增加了A1的一年。 –
大衛格里芬你可以解釋邏輯部分,多個自連接和它背後的算術。厭倦了我的最好把握它看起來乏味,並想確認我的解釋是否正確。 – Mithunram