如果我正確地得到你的想法,你可以這樣來做:
val conf = new SparkConf().setMaster("local[2]").setAppName("test")
val sc = new SparkContext(conf)
case class Input(customerId: Long, status: String, startDate: LocalDate, endDate: LocalDate)
case class Output(customerId: Long, status: String, date: LocalDate)
val input: RDD[Input] = sc.parallelize(Seq(
Input(1, "Y", LocalDate.of(2014, 1, 1), LocalDate.of(2014, 1, 5)),
Input(2, "Y", LocalDate.of(2014, 1, 1), LocalDate.of(2014, 1, 3))
))
val result: RDD[Output] = input flatMap { input =>
import input._
val dates = Stream.iterate(startDate)(_.plusDays(1)).takeWhile(!_.isAfter(endDate))
dates.map(date => Output(customerId, status, date))
}
result.collect().foreach(println)
輸出:
Output(1,Y,2014-01-01)
Output(1,Y,2014-01-02)
Output(1,Y,2014-01-03)
Output(1,Y,2014-01-04)
Output(1,Y,2014-01-05)
Output(2,Y,2014-01-01)
Output(2,Y,2014-01-02)
Output(2,Y,2014-01-03)
所以,你要開始和結束之間每天的線路,包括爲每一個客戶ID。狀態總是Y?如果不是,非Y的輸出應該是什麼? – 2015-04-06 14:22:48
狀態也可以是'N'。不管是Y還是N – user3279189 2015-04-06 14:35:42