2016-03-28 58 views
0

我解析.log文件JavaRDD,後整理這JavaRDD,現在我有,例如oldJavaRDD
2016-03-28 | 11:00 | X | object1 | region1
2016-03-28 | 11:01 | Y | object1 | region1
2016-03-28 | 11:05 | X | object1 | region1
2016-03-28 | 11:09 | X | object1 | region1
2016-03-28 | 11:00 | X | object2 | region1
2016-03-28 | 11:01 | Z | object2 | region1如何計算當前和以前的行之間的差異星火JavaRDD

我怎麼能得到newJavaRDD f或保存到數據庫?
新JavaRDD結構必須是:
2016-03-28 | 9 | object1 | region1
2016-03-28 | 1 | object2 | region1
所以,我現在和以前的行之間的時間來計算(也使用標誌X, Y, Z在某些情況下定義,添加時間造成與否),並添加新元素更改爲date, objectNameobjectRegion之後的JavaRDD。

我可以使用這種類型的代碼(地圖)做到這一點,但我認爲這是不好的,不是最快的方式

JavaRDD<NewObject> newJavaRDD = oldJavaRDD.map { r -> 
     String datePrev[] = ... 
     if (datePrev != dateCurr ...) { 
      return newJavaRdd; 
     } else { 
      return null; 
     } 
    } 
+0

你能澄清'X,Y,Z'是什麼意思?不清楚哪些記錄應該包含在輸出中,哪些不應該...... –

+0

僅供示例:前一行包含標誌「X」,當前行包含「Y」,因此我們有轉換「X-> Y」。在這種情況下,我們不能在這些行之間聚合時間,結果'sum(11:01 - 11:00)= 0'。如果'Y-> X',我們必須在行之間聚合時間,結果'sum(11:05 - 11:01)= 4分鐘'。如果'X-> X' - 也聚合,則結果爲'4分鐘+總和(11:09 - 11:05)= 4分鐘+4分鐘= 8分鐘。我還必須認識到其他一些規則,但它們都涉及當前行和預覽行之間的區別。 –

回答

0

首先,你的代碼示例從轉換中引用newJavaRDD創建newJavaRDD - 在幾個不同的水平,是不可能的:

  • 不能引用該變量的德的右手側的可變claration ...
  • 你不能在RDD的轉換中使用RDD(同一個或另一個 - 無關緊要) - 轉換中的任何內容都必須由Spark序列化,並且Spark不能序列化它自己的RDD(這是沒有意義的)

那麼,你應該怎麼做?

假設

  1. 這裏你的目的是要獲得一個紀錄的date + object + region
  2. 不應該有對每一個這樣的組合太多記錄,所以它的每個組合安全groupBy這些領域爲重點

可以groupBy的重點領域,然後mapValues以獲得第一個和最後一個記錄之間的「分鐘距離」(如果我沒有正確理解,傳遞給mapValues的函數可以更改爲包含您的確切邏輯)。我將使用喬達時間庫的時間計算:

public static void main(String[] args) { 
    // some setup code for this test: 
    JavaSparkContext sc = new JavaSparkContext("local", "test"); 

    // input: 
    final JavaRDD<String[]> input = sc.parallelize(Lists.newArrayList(
      //    date  time  ? object  region 
      new String[]{"2016-03-28", "11:00", "X", "object1", "region1"}, 
      new String[]{"2016-03-28", "11:01", "Y", "object1", "region1"}, 
      new String[]{"2016-03-28", "11:05", "X", "object1", "region1"}, 
      new String[]{"2016-03-28", "11:09", "X", "object1", "region1"}, 
      new String[]{"2016-03-28", "11:00", "X", "object2", "region1"}, 
      new String[]{"2016-03-28", "11:01", "Z", "object2", "region1"} 
    )); 

    // grouping by key: 
    final JavaPairRDD<String, Iterable<String[]>> byObjectAndDate = input.groupBy(new Function<String[], String>() { 
     @Override 
     public String call(String[] record) throws Exception { 
      return record[0] + record[3] + record[4]; // date, object, region 
     } 
    }); 

    // mapping each "value" (all record matching key) to result 
    final JavaRDD<String[]> result = byObjectAndDate.mapValues(new Function<Iterable<String[]>, String[]>() { 
     @Override 
     public String[] call(Iterable<String[]> records) throws Exception { 
      final Iterator<String[]> iterator = records.iterator(); 
      String[] previousRecord = iterator.next(); 
      int diffMinutes = 0; 

      for (String[] record : records) { 
       if (record[2].equals("X")) { // if I got your intention right... 
        final LocalDateTime prev = getLocalDateTime(previousRecord); 
        final LocalDateTime curr = getLocalDateTime(record); 
        diffMinutes += Period.fieldDifference(prev, curr).toStandardMinutes().getMinutes(); 
       } 
       previousRecord = record; 
      } 

      return new String[]{ 
        previousRecord[0], 
        Integer.toString(diffMinutes), 
        previousRecord[3], 
        previousRecord[4] 
      }; 
     } 
    }).values(); 

    // do whatever with "result"... 
} 

// extracts a Joda LocalDateTime from a "record" 
static LocalDateTime getLocalDateTime(String[] record) { 
    return LocalDateTime.parse(record[0] + " " + record[1], formatter); 
} 

static final DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm"); 

附:在斯卡拉這將需要大約8行...:/

+0

對不起,我用僞代碼搞糊塗了,你說的對'newJavaRDD',我的意思是'返回new NewObject(...)'。沒關係,你的回答真的很有幫助和工作(幸運的是,我可以使用java8來減少愚蠢的線條)。 –

相關問題