2016-11-03 42 views
0

的有序列表,我有以下RDD:減少主要與價值

| Key | Value | Date  | 
|-----|-------|------------| 
| 1 | A  | 10/30/2016 | 
| 1 | B  | 10/31/2016 | 
| 1 | C  | 11/1/2016 | 
| 1 | D  | 11/2/2016 | 
| 2 | A  | 11/2/2016 | 
| 2 | B  | 11/2/2016 | 
| 2 | C  | 11/2/2016 | 
| 3 | A  | 10/30/2016 | 
| 3 | B  | 10/31/2016 | 
| 3 | C  | 11/1/2016 | 
| 3 | D  | 11/2/2016 | 

而且我想將它轉化爲以下RDD:

| Key | List   | 
|-----|--------------| 
| 1 | (A, B, C, D) | 
| 2 | (A, B, C) | 
| 3 | (A, B, C, D) | 

這是關鍵,列表(value) - 值的列表按相應的日期排序。顯然,所有密鑰都是唯一的,但並非所有值都必須是唯一的。我仍然想列出所有值。我怎樣才能做到這一點?

+1

你應該嘗試你問的問題計算器之前 – pamu

回答

1

創建一個模型來表示數據(你可以使用元組爲好,但與元組編碼將很快變得醜陋。它總是好的田裏有名字)

case class DataItem(key: Int, value: String, timeInMillis: Long) 

然後

分析數據(可以使用喬達DateTimeFormat解析日期時間),然後創建您的RDD

val rdd = sc.parallelize(List(DataItem(1, "A", 123), DataItem(2, "B", 1234), DataItem(2, "C", 12345))) 

,然後最後一步groupBy鍵和sortByŧ IME

rdd.groupBy(_.key).map { case (k, v) => k -> v.toList.sortBy(_.timeInMillis)} 

斯卡拉REPL

scala> case class DataItem(key: Int, value: String, timeInMillis: Long) 
defined class DataItem 

scala> sc.parallelize(List(DataItem(1, "A", 123), DataItem(2, "B", 1234), DataItem(2, "C", 12345))) 
res10: org.apache.spark.rdd.RDD[DataItem] = ParallelCollectionRDD[12] at parallelize at <console>:36 

scala> val rdd = sc.parallelize(List(DataItem(1, "A", 123), DataItem(2, "B", 1234), DataItem(2, "C", 12345))) 
rdd: org.apache.spark.rdd.RDD[DataItem] = ParallelCollectionRDD[13] at parallelize at <console>:35 

scala> rdd.groupBy(_.key).map { case (k, v) => k -> v.toList.sortBy(_.timeInMillis)} 
res11: org.apache.spark.rdd.RDD[(Int, List[DataItem])] = MapPartitionsRDD[16] at map at <console>:38 

scala> rdd.groupBy(_.key).map { case (k, v) => k -> v.toList.sortBy(_.timeInMillis)}.foreach(println) 
(1,List(DataItem(1,A,123))) 
(2,List(DataItem(2,B,1234), DataItem(2,C,12345))) 

scala> rdd.groupBy(_.key).map { case (k, v) => k -> v.toList.sortBy(_.timeInMillis)}.map { case (k, v) => (k, v.map(_.value)) }.foreach(println) 
(1,List(A)) 
(2,List(B, C))