首先,你的代碼示例從轉換中引用newJavaRDD
是創建newJavaRDD
- 在幾個不同的水平,是不可能的:
- 不能引用該變量的德的右手側的可變claration ...
- 你不能在RDD的轉換中使用RDD(同一個或另一個 - 無關緊要) - 轉換中的任何內容都必須由Spark序列化,並且Spark不能序列化它自己的RDD(這是沒有意義的)
那麼,你應該怎麼做?
假設:
- 這裏你的目的是要獲得一個紀錄的
date
+ object
+ region
- 不應該有對每一個這樣的組合太多記錄,所以它的每個組合安全
groupBy
這些領域爲重點
可以groupBy
的重點領域,然後mapValues
以獲得第一個和最後一個記錄之間的「分鐘距離」(如果我沒有正確理解,傳遞給mapValues
的函數可以更改爲包含您的確切邏輯)。我將使用喬達時間庫的時間計算:
public static void main(String[] args) {
// some setup code for this test:
JavaSparkContext sc = new JavaSparkContext("local", "test");
// input:
final JavaRDD<String[]> input = sc.parallelize(Lists.newArrayList(
// date time ? object region
new String[]{"2016-03-28", "11:00", "X", "object1", "region1"},
new String[]{"2016-03-28", "11:01", "Y", "object1", "region1"},
new String[]{"2016-03-28", "11:05", "X", "object1", "region1"},
new String[]{"2016-03-28", "11:09", "X", "object1", "region1"},
new String[]{"2016-03-28", "11:00", "X", "object2", "region1"},
new String[]{"2016-03-28", "11:01", "Z", "object2", "region1"}
));
// grouping by key:
final JavaPairRDD<String, Iterable<String[]>> byObjectAndDate = input.groupBy(new Function<String[], String>() {
@Override
public String call(String[] record) throws Exception {
return record[0] + record[3] + record[4]; // date, object, region
}
});
// mapping each "value" (all record matching key) to result
final JavaRDD<String[]> result = byObjectAndDate.mapValues(new Function<Iterable<String[]>, String[]>() {
@Override
public String[] call(Iterable<String[]> records) throws Exception {
final Iterator<String[]> iterator = records.iterator();
String[] previousRecord = iterator.next();
int diffMinutes = 0;
for (String[] record : records) {
if (record[2].equals("X")) { // if I got your intention right...
final LocalDateTime prev = getLocalDateTime(previousRecord);
final LocalDateTime curr = getLocalDateTime(record);
diffMinutes += Period.fieldDifference(prev, curr).toStandardMinutes().getMinutes();
}
previousRecord = record;
}
return new String[]{
previousRecord[0],
Integer.toString(diffMinutes),
previousRecord[3],
previousRecord[4]
};
}
}).values();
// do whatever with "result"...
}
// extracts a Joda LocalDateTime from a "record"
static LocalDateTime getLocalDateTime(String[] record) {
return LocalDateTime.parse(record[0] + " " + record[1], formatter);
}
static final DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm");
附:在斯卡拉這將需要大約8行...:/
你能澄清'X,Y,Z'是什麼意思?不清楚哪些記錄應該包含在輸出中,哪些不應該...... –
僅供示例:前一行包含標誌「X」,當前行包含「Y」,因此我們有轉換「X-> Y」。在這種情況下,我們不能在這些行之間聚合時間,結果'sum(11:01 - 11:00)= 0'。如果'Y-> X',我們必須在行之間聚合時間,結果'sum(11:05 - 11:01)= 4分鐘'。如果'X-> X' - 也聚合,則結果爲'4分鐘+總和(11:09 - 11:05)= 4分鐘+4分鐘= 8分鐘。我還必須認識到其他一些規則,但它們都涉及當前行和預覽行之間的區別。 –