2017-07-03 94 views
1

我有以下POJO的:如何按天彙總?

public class MyPojo { 
    Date startDate; 
    Double usageAMount; 
    // ... bla bla bla 
} 

所以我有MyPojo對象的列表,傳遞給函數作爲參數:

public Map<Date, Double> getWeeklyCost(@NotNull List<MyPojo> reports) { 
     JavaRDD<MyPojo> rdd = context.parallelize(reports); 
     JavaPairRDD<Date, Double> result = rdd.mapToPair(
       (PairFunction<MyPojo, Date, Double>) x -> 
         new Tuple2<>(x.getStartDate(), x.getUsageAmount())) 
       .reduceByKey((Function2<Double, Double, Double>) (x, y) -> x + y); 

     return result.collectAsMap(); 
} 

然而,我返回類似:

"2017-06-28T22:00:00.000+0000": 0.02916666, 
"2017-06-29T16:00:00.000+0000": 0.02916666, 
"2017-06-27T13:00:00.000+0000": 0.03888888, 
"2017-06-26T05:00:00.000+0000": 0.05833332000000001, 
"2017-06-28T21:00:00.000+0000": 0.03888888, 
"2017-06-27T02:00:00.000+0000": 0.03888888, 
"2017-06-28T03:00:00.000+0000": 0.07777776000000002, 
"2017-06-28T20:00:00.000+0000": 0.01944444, 
"2017-06-30T04:00:00.000+0000": 0.00972222, 
"2017-06-28T02:00:00.000+0000": 0.05833332000000001, 
"2017-06-29T21:00:00.000+0000": 0.03888888, 
"2017-06-29T23:00:00.000+0000": 0.06805554000000001, 
"2017-06-27T00:00:00.000+0000": 0.05833332000000001, 
"2017-06-26T06:00:00.000+0000": 0.03888888, 
"2017-06-28T01:00:00.000+0000": 0.09722220000000002, 
"2017-06-29T22:00:00.000+0000": 0.01944444, 
"2017-06-28T00:00:00.000+0000": 0.11666664000000003, 
"2017-06-27T12:00:00.000+0000": 0.01944444, 
"2017-06-26T11:00:00.000+0000": 0.01944444, 
"2017-06-29T03:00:00.000+0000": 0.01944444, 
"2017-06-26T04:00:00.000+0000": 0.07777776000000002, 
"2017-06-27T19:00:00.000+0000": 0.01944444, 
"2017-06-29T20:00:00.000+0000": 0.048611100000000004, 
"2017-06-29T02:00:00.000+0000": 0.02916666, 
"2017-06-29T15:00:00.000+0000": 0.01944444, 
"2017-06-27T17:00:00.000+0000": 0.01944444, 
"2017-06-29T14:00:00.000+0000": 0.02916666, 
"2017-06-30T01:00:00.000+0000": 0.02916666, 
"2017-06-29T00:00:00.000+0000": 0.01944444, 
"2017-06-27T18:00:00.000+0000": 0.03888888, 
"2017-06-26T03:00:00.000+0000": 0.07777776000000002, 
"2017-06-28T05:00:00.000+0000": 0.05833332000000001, 
"2017-06-29T13:00:00.000+0000": 0.01944444, 
"2017-06-30T03:00:00.000+0000": 0.00972222, 
"2017-06-27T11:00:00.000+0000": 0.01944444, 
"2017-06-28T04:00:00.000+0000": 0.05833332000000001, 
"2017-06-29T12:00:00.000+0000": 0.00972222, 
"2017-06-30T02:00:00.000+0000": 0.06805554000000001, 
"2017-06-27T23:00:00.000+0000": 0.09722220000000002, 
"2017-06-27T16:00:00.000+0000": 0.01944444, 
"2017-06-26T15:00:00.000+0000": 0.01944444, 
"2017-06-29T06:00:00.000+0000": 0.00972222, 
"2017-06-30T07:00:00.000+0000": 0.00138889, 
"2017-06-30T00:00:00.000+0000": 0.01944444, 
"2017-06-27T21:00:00.000+0000": 0.01944444, 
"2017-06-26T02:00:00.000+0000": 0.07777776000000002, 
"2017-06-29T19:00:00.000+0000": 0.00972222, 
"2017-06-27T03:00:00.000+0000": 0.03888888, 
"2017-06-27T20:00:00.000+0000": 0.01944444, 
"2017-06-30T05:00:00.000+0000": 74.1458333, 
"2017-06-29T18:00:00.000+0000": 0.00972222, 
"2017-06-29T17:00:00.000+0000": 0.01944444, 
"2017-06-28T23:00:00.000+0000": 0.00972222, 
"2017-06-27T01:00:00.000+0000": 0.01944444, 
"2017-06-27T22:00:00.000+0000": 0.05833332000000001 

我想在當天彙總它,按日期降序排列。 例如:

"2017-06-28T03:00:00.000+0000": 0.07777776000000002, 
"2017-06-28T20:00:00.000+0000": 0.01944444, 

是在同一天,所以它們的值(usageAmount)應增加。我只關心一天,而不是一個小時。我如何減少或彙總我的RDD以獲得理想的結果?

** **更新答案一定是火花RDD解決方案......

+1

你可以使用SQL星火的DataFrames?這太容易了,以後再寫和理解。 –

+0

@JacekLaskowski數據來自MongoDB .... – cybertextron

+0

沒有接受的答案? – c0der

回答

0

相對容易(即使它會是一個很大的代碼)

讓我們先從一個POJO實現:

static class Record 
{ 
    private Date date; 
    private double amount; 
    public Record(Date d, double a) 
    { 
     this.date = d; 
     this.amount = a; 
    } 
    @Override 
    public String toString() { 
     return date.toString() + "\t" + amount; 
    } 
} 

現在一個實用的方法來檢查兩個記錄是否在同一天:

private static boolean sameDay(Record r0, Record r1) 
{ 
    Date d0 = r0.date; 
    Date d1 = r1.date; 

    Calendar cal = new GregorianCalendar(); 
    cal.setTime(d0); 

    int[] dateParts0 = {cal.get(Calendar.DAY_OF_MONTH), cal.get(Calendar.MONTH), cal.get(Calendar.YEAR)}; 

    cal.setTime(d1); 

    return cal.get(Calendar.DAY_OF_MONTH) == dateParts0[0] && 
      cal.get(Calendar.MONTH) == dateParts0[1] && 
      cal.get(Calendar.YEAR) == dateParts0[2]; 
} 

現在我們已經有了,我們可以開始研究算法的主要部分。 這裏的想法是按天排序輸入列表。然後遍歷列表。 對於我們正在處理的每個條目,我們檢查它是否與我們聚合數據集的最後一個已知日期相同。如果是,我們添加記錄的數量,如果不是,我們添加一個新條目。

public static List<Record> aggregate(Collection<Record> rs) 
{ 
    List<Record> tmp = new ArrayList<>(rs); 
    java.util.Collections.sort(tmp, new Comparator<Record>() { 
     @Override 
     public int compare(Record o1, Record o2) { 
      return o1.date.compareTo(o2.date); 
     } 
    }); 

    List<Record> out = new ArrayList<>(); 
    out.add(new Record(tmp.get(0).date, 0)); 
    for(int i=0;i<tmp.size();i++) 
    { 
     Record last = out.get(out.size() - 1); 
     Record recordBeingProcessed = tmp.get(i); 
     if(sameDay(last, recordBeingProcessed)) 
     { 
      last.amount += recordBeingProcessed.amount; 
     } 
     else 
     { 
      out.add(recordBeingProcessed); 
     } 
    } 

    return out; 
} 

最後,一個漂亮的主要方法來測試一切:

public static void main(String[] args) throws ParseException { 
    DateFormat format = new SimpleDateFormat("MMMM d, yyyy", Locale.ENGLISH); 
    String[] dateStrings = {"January 2, 2010", "January 2, 2010", "January 3, 2010"}; 
    List<Record> rs = new ArrayList<>(); 
    for(int i=0;i<dateStrings.length;i++) 
    { 
     rs.add(new Record(format.parse(dateStrings[i]), 1)); 
    } 
    for(Record r : aggregate(rs)) 
    { 
     System.out.println(r); 
    } 
} 

打印出:

Sat Jan 02 00:00:00 CET 2010 2.0 
Sun Jan 03 00:00:00 CET 2010 1.0 
0
public class MyPojo { 

     Date startDate; 
     Double usageAMount; 
     static DateFormat format = new SimpleDateFormat("yyyy-mm-dd:hh"); 

    MyPojo(Date startDate, Double usageAMount) { 

     this.startDate = startDate; 
     this.usageAMount = usageAMount; 
    } 

    Date getStrartDate() { return startDate;} 
    Double getUsage() { return usageAMount;} 

    public static void main(String[] args) throws ParseException { 

     List<MyPojo> reports = getReports(); 

     //sort by date 
     reports = reports.stream().sorted(getComperator()).collect(Collectors.toList()); 
     output(reports); 

     //you can collect to map but map keys are not sorted 
     //and keys (dates) must be unique 
     Map<Date, Double> result = reports.stream().sorted(getComperator()).collect(Collectors 
       .toMap(e-> e.startDate , e-> e.usageAMount)); 
    } 

    private static List<MyPojo> getReports() throws ParseException { 

     List<MyPojo> reports = new ArrayList<>(); 

     reports.add(new MyPojo(format.parse("2017-06-28:01"), 0.02916666)); 
     reports.add(new MyPojo(format.parse("2017-06-29:01"), 0.02916666)); 
     reports.add(new MyPojo(format.parse("2017-06-27:01"), 0.03888888)); 
     reports.add(new MyPojo(format.parse("2017-06-26:01"), 0.05833332000000001)); 
     reports.add(new MyPojo(format.parse("2017-06-28:02"), 0.03888888)); 
     reports.add(new MyPojo(format.parse("2017-06-27:02"), 0.03888888)); 
     reports.add(new MyPojo(format.parse("2017-06-28:03"), 0.07777776000000002)); 
     reports.add(new MyPojo(format.parse("2017-06-28:04"), 0.01944444)); 
     reports.add(new MyPojo(format.parse("2017-06-30:01"), 0.00972222)); 

     return reports; 
    } 

    private static Comparator<? super MyPojo> getComperator() { 

     Comparator<? super MyPojo> comperator = new Comparator<MyPojo>() { 

      @Override 
      public int compare(MyPojo o1, MyPojo o2) { 

       if((o1 == o2) || ((o1 == null) && (o2 == null))) { 
        return 0; 
       } 
       if(o1 == null) { 
        return -1; 
       } 
       if(o2 == null) { 
        return 1; 
       } 

       return (o1).startDate.compareTo((o2).startDate); 
      } 

     }; 
     return comperator; 
    } 

    static void output(List<MyPojo> reports) { 

     for(MyPojo p : reports) { 
      System.out.println(format.format(p.startDate) +" - "+ p.usageAMount); 
     } 
    } 
} 

輸出:


2017-06-27:01 - 0.03888888
2017-06-27:02 - 0.03888888
2017-06-28:01 - 0.02916666
2017-06-28:02 - 0.03888888
2017-06-28:03 - 0.07777776000000002
2017年6月28日:04 - 0.01944444
2017年6月29日:01 - 0.02916666
二零一七年六月三十零日:01 - 0.00972222