2015-04-06 109 views
0

這裏的數據集是我的要求腹脹火花和斯卡拉

輸入

customer_id status start_date end_date 
1 Y 20140101 20140105 
2 Y 20140201 20140203 

輸出

customer_id status date 
1 Y 20140101 
1 Y 20140102 
1 Y 20140103 
1 Y 20140104 
1 Y 20140105 
2 Y 20140201 
2 Y 20140202 
2 Y 20140202 

我想在火花笛卡爾乘積來實現這一點,它看起來非常低效。我的數據集太大了。我正在尋找更好的選擇。

+0

所以,你要開始和結束之間每天的線路,包括爲每一個客戶ID。狀態總是Y?如果不是,非Y的輸出應該是什麼? – 2015-04-06 14:22:48

+0

狀態也可以是'N'。不管是Y還是N – user3279189 2015-04-06 14:35:42

回答

1

如果我正確地得到你的想法,你可以這樣來做:

val conf = new SparkConf().setMaster("local[2]").setAppName("test") 
    val sc = new SparkContext(conf) 

    case class Input(customerId: Long, status: String, startDate: LocalDate, endDate: LocalDate) 
    case class Output(customerId: Long, status: String, date: LocalDate) 

    val input: RDD[Input] = sc.parallelize(Seq(
    Input(1, "Y", LocalDate.of(2014, 1, 1), LocalDate.of(2014, 1, 5)), 
    Input(2, "Y", LocalDate.of(2014, 1, 1), LocalDate.of(2014, 1, 3)) 
)) 

    val result: RDD[Output] = input flatMap { input => 
    import input._ 
    val dates = Stream.iterate(startDate)(_.plusDays(1)).takeWhile(!_.isAfter(endDate)) 
    dates.map(date => Output(customerId, status, date)) 
    } 

    result.collect().foreach(println) 

輸出:

Output(1,Y,2014-01-01) 
Output(1,Y,2014-01-02) 
Output(1,Y,2014-01-03) 
Output(1,Y,2014-01-04) 
Output(1,Y,2014-01-05) 
Output(2,Y,2014-01-01) 
Output(2,Y,2014-01-02) 
Output(2,Y,2014-01-03) 
+0

日期都將成爲正常收集,但不是RDD。所以可能不適合任何一個節點。 – 2015-04-06 16:28:54

+0

我不認爲日期集合可以大於1兆字節,所以它不應該是一個問題 – 2015-04-06 16:30:05

+0

是的,對不起,我錯過了輸入flatMap,這應該意味着你最終得到一個RDD。 – 2015-04-06 16:32:38