2016-06-20 48 views
0

剛開始編寫Hadoop MR作業。希望我們很快會切換到Spark,但我們現在仍然堅持做MR。Hadoop Map Reduce - 如何將分組和排序分開?

我想通過它們的值散列來對記錄進行分組。但我想用完全不相關的東西對它們進行分類 - 它們的價值是一個時間戳。我對如何最好地做到這一點感到困惑。我看到兩個選項:

1)第一個MR作業計算其映射器中每個值的哈希值,然後將該哈希值的所有記錄減少到相同的值,但它想要的(我其實有這麼多的工作就像我們現在需要的那樣)。然後鏈接第二個MR作業,該作業使用值中的時間戳重新排列上面減速器的輸出。效率低下?

2)我讀過一些關於如何使用組合鍵的博客/文章,所以也許我可以一步完成它?我會創建一種組合鍵,它具有用於分組的散列和用於在映射器中排序的時間戳。但我不清楚這是否可能。如果排序與分組完全無關,它還可以正確分組嗎?還不確定我需要實現哪些接口以及需要創建哪些類或如何配置它。

我不是在談論二次排序。我不關心Iterator中每個reduce調用的對象的順序。我關心的是從reducer發出的訂單,需要按照時間戳進行全球排序。

什麼是推薦的方式來做這樣的事情?

+0

你的reducer是否需要通過散列,排序和顯示來完成除聚集外的任何事情?另外,你不關心哈希的順序,但是同一個哈希的所有記錄必須被排序,是正確的? – Jedi

+0

我的reduce()只需要通過散列進行聚合。但我希望所有reduce調用的輸出按時間戳排序。我不需要排序相同散列的所有記錄。我需要按時間戳排序的減速器的累積輸出。此時間戳可能來自reduce()的迭代器中的任何值。 – medloh

回答

1

絕對可能的,如果你可以有一個複合密鑰,封裝分組和排序屬性之前減少。

假設您需要一個保存int散列碼和長時間戳的密鑰。然後你需要實現一個可寫的元組(比如IntLongPair),你可以在其中定義你的用例需要的各種比較器和分區器。

所以,你設置你的工作,因爲這樣的事情(我會回來可能IntLongPair實施後):

job.setPartitionerClass(IntLongPair.IntOnlyPartitioner.class); //partition by your hash code stored in the int part of the part 
job.setGroupingComparatorClass(IntLongPair.IntAscComparator.class); //your hash code grouping - perhaps does not matter ascending or descending 
job.setSortComparatorClass(IntLongPair.IntDescLongAscComparator.class); //assuming you need newest items first 

大號

這裏是IntLongPair可以使用:

import java.io.DataInput; 
import java.io.DataOutput; 
import java.io.IOException; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.RawComparator; 
import org.apache.hadoop.io.Writable; 
import org.apache.hadoop.io.WritableComparable; 
import org.apache.hadoop.io.WritableComparator; 
import org.apache.hadoop.mapreduce.Partitioner; 

public class IntLongPair implements WritableComparable<IntLongPair> { 

    private IntWritable intVal = new IntWritable(); 
    private LongWritable longVal = new LongWritable(); 

    public void write(DataOutput d) throws IOException { 
     intVal.write(d); 
     longVal.write(d); 
    } 

    public void readFields(DataInput di) throws IOException { 
     intVal.readFields(di); 
     longVal.readFields(di); 
    } 

    /** 
    * Natural order is int first, long next 
    * @param o 
    * @return 
    */ 
    public int compareTo(IntLongPair o) { 
     int diff = intVal.compareTo(o.intVal); 
     if (diff != 0) { 
      return diff; 
     } 
     return longVal.compareTo(o.longVal); 
    } 

    public IntWritable getInt() { 
     return intVal; 
    } 

    public void setInt(IntWritable intVal) { 
     this.intVal = intVal; 
    } 

    public void setInt(int intVal) { 
     this.intVal.set(intVal); 
    } 

    public LongWritable getLong() { 
     return longVal; 
    } 

    public void setLong(LongWritable longVal) { 
     this.longVal = longVal; 
    } 

    public void setLong(long longVal) { 
     this.longVal.set(longVal); 
    } 

    @Override 
    public boolean equals(Object obj) { 
     if (obj == null) { 
      return false; 
     } 
     if (getClass() != obj.getClass()) { 
      return false; 
     } 
     final IntLongPair other = (IntLongPair) obj; 
     if (this.intVal != other.intVal && (this.intVal == null || !this.intVal.equals(other.intVal))) { 
      return false; 
     } 
     if (this.longVal != other.longVal && (this.longVal == null || !this.longVal.equals(other.longVal))) { 
      return false; 
     } 
     return true; 
    } 

    @Override 
    public int hashCode() { 
     int hash = 3; 
     hash = 47 * hash + (this.intVal != null ? this.intVal.hashCode() : 0); 
     hash = 47 * hash + (this.longVal != null ? this.longVal.hashCode() : 0); 
     return hash; 
    } 

    @Override 
    public String toString() { 
     return "IntLongPair{" + intVal + ',' + longVal + '}'; 
    } 

    public IntWritable getFirst() { 
     return intVal; 
    } 

    public LongWritable getSecond() { 
     return longVal; 
    } 

    public void setFirst(IntWritable value) { 
     intVal.set(value.get()); 
    } 

    public void setSecond(LongWritable value) { 
     longVal.set(value.get()); 
    } 


    public static class Comparator extends WritableComparator { 

     public Comparator() { 
      super(IntLongPair.class); 
     } 

     @Override 
     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { 
      return compareBytes(b1, s1, l1, b2, s2, l2); 
     } 
    } 

    static {          // register this comparator 
     WritableComparator.define(IntLongPair.class, new Comparator()); 
    } 

    public static class IntDescLongAscComparator implements RawComparator<IntLongPair> { 

     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { 
      int comp = IntWritable.Comparator.compareBytes(b1, s1, 4, b2, s2, 4); 
      if (comp != 0) { 
       return -comp; 
      } 
      return LongWritable.Comparator.compareBytes(b1, s1 + 4, 8, b2, s2 + 4, 8); 
     } 

     public int compare(IntLongPair o1, IntLongPair o2) { 
      int comp = o1.getInt().compareTo(o2.getInt()); 
      if (comp != 0) { 
       return -comp; 
      } 
      return o1.getLong().compareTo(o2.getLong()); 
     } 
    } 

    public static class LongAscIntAscComparator implements RawComparator<IntLongPair> { 

     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { 
      int comp = LongWritable.Comparator.compareBytes(b1, s1 + 4, 8, b2, s2 + 4, 8); 
      if (comp != 0) { 
       return comp; 
      } 
      return IntWritable.Comparator.compareBytes(b1, s1, 4, b2, s2, 4); 
     } 

     public int compare(IntLongPair o1, IntLongPair o2) { 
      int comp = o1.getLong().compareTo(o2.getLong()); 
      if (comp != 0) { 
       return comp; 
      } 
      return o1.getInt().compareTo(o2.getInt()); 
     } 
    } 

    public static class LongAscIntDescComparator implements RawComparator<IntLongPair> { 

     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { 
      int comp = LongWritable.Comparator.compareBytes(b1, s1 + 4, 8, b2, s2 + 4, 8); 
      if (comp != 0) { 
       return comp; 
      } 
      return -IntWritable.Comparator.compareBytes(b1, s1, 4, b2, s2, 4); 
     } 

     public int compare(IntLongPair o1, IntLongPair o2) { 
      int comp = o1.getLong().compareTo(o2.getLong()); 
      if (comp != 0) { 
       return comp; 
      } 
      return -o1.getInt().compareTo(o2.getInt()); 
     } 
    } 

    public static class LongDescIntAscComparator implements RawComparator<IntLongPair> { 

     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { 
      int comp = LongWritable.Comparator.compareBytes(b1, s1 + 4, 8, b2, s2 + 4, 8); 
      if (comp != 0) { 
       return -comp; 
      } 
      return IntWritable.Comparator.compareBytes(b1, s1, 4, b2, s2, 4); 
     } 

     public int compare(IntLongPair o1, IntLongPair o2) { 
      int comp = o1.getLong().compareTo(o2.getLong()); 
      if (comp != 0) { 
       return -comp; 
      } 
      return o1.getInt().compareTo(o2.getInt()); 
     } 
    } 

    public static class LongDescIntDescComparator implements RawComparator<IntLongPair> { 

     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { 
      int comp = LongWritable.Comparator.compareBytes(b1, s1 + 4, 8, b2, s2 + 4, 8); 
      if (comp != 0) { 
       return -comp; 
      } 
      return -IntWritable.Comparator.compareBytes(b1, s1, 4, b2, s2, 4); 
     } 

     public int compare(IntLongPair o1, IntLongPair o2) { 
      int comp = o1.getLong().compareTo(o2.getLong()); 
      if (comp != 0) { 
       return -comp; 
      } 
      return -o1.getInt().compareTo(o2.getInt()); 
     } 
    } 

    public static class IntAscComparator implements RawComparator<IntLongPair> { 

     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { 
      return IntWritable.Comparator.compareBytes(b1, s1, 4, b2, s2, 4); 
     } 

     public int compare(IntLongPair o1, IntLongPair o2) { 
      return o1.getInt().compareTo(o2.getInt()); 
     } 
    } 

    public static class IntDescComparator implements RawComparator<IntLongPair> { 

     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { 
      return -IntWritable.Comparator.compareBytes(b1, s1, 4, b2, s2, 4); 
     } 

     public int compare(IntLongPair o1, IntLongPair o2) { 
      return -o1.getInt().compareTo(o2.getInt()); 
     } 
    } 

    public static class LongAscComparator implements RawComparator<IntLongPair> { 

     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { 
      return LongWritable.Comparator.compareBytes(b1, s1 + 4, 8, b2, s2 + 4, 8); 
     } 

     public int compare(IntLongPair o1, IntLongPair o2) { 
      return o1.getLong().compareTo(o2.getLong()); 
     } 
    } 

    public static class LongDescComparator implements RawComparator<IntLongPair> { 

     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { 
      return -LongWritable.Comparator.compareBytes(b1, s1 + 4, 8, b2, s2 + 4, 8); 
     } 

     public int compare(IntLongPair o1, IntLongPair o2) { 
      return -o1.getLong().compareTo(o2.getLong()); 
     } 
    } 

    /** 
    * Partition based on the long part of the pair. 
    */ 
    public static class LongOnlyPartitioner extends Partitioner<IntLongPair, Writable> { 

     @Override 
     public int getPartition(IntLongPair key, Writable value, 
       int numPartitions) { 
      return Math.abs(key.getLong().hashCode() & Integer.MAX_VALUE) % numPartitions; 
     } 
    } 

    /** 
    * Partition based on the int part of the pair. 
    */ 
    public static class IntOnlyPartitioner extends Partitioner<IntLongPair, Writable> { 

     @Override 
     public int getPartition(IntLongPair key, Writable value, 
       int numPartitions) { 
      return Math.abs(key.getInt().hashCode() & Integer.MAX_VALUE) % numPartitions; 
     } 
    } 
} 
+0

我只需要確認它可能在使用複合鍵進行深層處理之前。一旦我開始工作,我會回來並將其標記爲已回答。 – medloh

+0

沒問題 - 我剛剛爲我以前的項目編寫的這個類很方便,所以認爲值得分享,因爲它包含一些原始二進制比較所需的相當無聊的代碼。 – yurgis

+0

從閱讀博客,我擔心如果我改變了時間戳的類型,那麼它將不再能夠通過散列正確分組,但事實並非如此,對嗎?順便說一句,非常感謝代碼。 – medloh