絕對可能的,如果你可以有一個複合密鑰,封裝分組和排序屬性之前減少。
假設您需要一個保存int散列碼和長時間戳的密鑰。然後你需要實現一個可寫的元組(比如IntLongPair),你可以在其中定義你的用例需要的各種比較器和分區器。
所以,你設置你的工作,因爲這樣的事情(我會回來可能IntLongPair實施後):
job.setPartitionerClass(IntLongPair.IntOnlyPartitioner.class); //partition by your hash code stored in the int part of the part
job.setGroupingComparatorClass(IntLongPair.IntAscComparator.class); //your hash code grouping - perhaps does not matter ascending or descending
job.setSortComparatorClass(IntLongPair.IntDescLongAscComparator.class); //assuming you need newest items first
大號
這裏是IntLongPair可以使用:
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Partitioner;
public class IntLongPair implements WritableComparable<IntLongPair> {
private IntWritable intVal = new IntWritable();
private LongWritable longVal = new LongWritable();
public void write(DataOutput d) throws IOException {
intVal.write(d);
longVal.write(d);
}
public void readFields(DataInput di) throws IOException {
intVal.readFields(di);
longVal.readFields(di);
}
/**
* Natural order is int first, long next
* @param o
* @return
*/
public int compareTo(IntLongPair o) {
int diff = intVal.compareTo(o.intVal);
if (diff != 0) {
return diff;
}
return longVal.compareTo(o.longVal);
}
public IntWritable getInt() {
return intVal;
}
public void setInt(IntWritable intVal) {
this.intVal = intVal;
}
public void setInt(int intVal) {
this.intVal.set(intVal);
}
public LongWritable getLong() {
return longVal;
}
public void setLong(LongWritable longVal) {
this.longVal = longVal;
}
public void setLong(long longVal) {
this.longVal.set(longVal);
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
final IntLongPair other = (IntLongPair) obj;
if (this.intVal != other.intVal && (this.intVal == null || !this.intVal.equals(other.intVal))) {
return false;
}
if (this.longVal != other.longVal && (this.longVal == null || !this.longVal.equals(other.longVal))) {
return false;
}
return true;
}
@Override
public int hashCode() {
int hash = 3;
hash = 47 * hash + (this.intVal != null ? this.intVal.hashCode() : 0);
hash = 47 * hash + (this.longVal != null ? this.longVal.hashCode() : 0);
return hash;
}
@Override
public String toString() {
return "IntLongPair{" + intVal + ',' + longVal + '}';
}
public IntWritable getFirst() {
return intVal;
}
public LongWritable getSecond() {
return longVal;
}
public void setFirst(IntWritable value) {
intVal.set(value.get());
}
public void setSecond(LongWritable value) {
longVal.set(value.get());
}
public static class Comparator extends WritableComparator {
public Comparator() {
super(IntLongPair.class);
}
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
return compareBytes(b1, s1, l1, b2, s2, l2);
}
}
static { // register this comparator
WritableComparator.define(IntLongPair.class, new Comparator());
}
public static class IntDescLongAscComparator implements RawComparator<IntLongPair> {
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
int comp = IntWritable.Comparator.compareBytes(b1, s1, 4, b2, s2, 4);
if (comp != 0) {
return -comp;
}
return LongWritable.Comparator.compareBytes(b1, s1 + 4, 8, b2, s2 + 4, 8);
}
public int compare(IntLongPair o1, IntLongPair o2) {
int comp = o1.getInt().compareTo(o2.getInt());
if (comp != 0) {
return -comp;
}
return o1.getLong().compareTo(o2.getLong());
}
}
public static class LongAscIntAscComparator implements RawComparator<IntLongPair> {
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
int comp = LongWritable.Comparator.compareBytes(b1, s1 + 4, 8, b2, s2 + 4, 8);
if (comp != 0) {
return comp;
}
return IntWritable.Comparator.compareBytes(b1, s1, 4, b2, s2, 4);
}
public int compare(IntLongPair o1, IntLongPair o2) {
int comp = o1.getLong().compareTo(o2.getLong());
if (comp != 0) {
return comp;
}
return o1.getInt().compareTo(o2.getInt());
}
}
public static class LongAscIntDescComparator implements RawComparator<IntLongPair> {
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
int comp = LongWritable.Comparator.compareBytes(b1, s1 + 4, 8, b2, s2 + 4, 8);
if (comp != 0) {
return comp;
}
return -IntWritable.Comparator.compareBytes(b1, s1, 4, b2, s2, 4);
}
public int compare(IntLongPair o1, IntLongPair o2) {
int comp = o1.getLong().compareTo(o2.getLong());
if (comp != 0) {
return comp;
}
return -o1.getInt().compareTo(o2.getInt());
}
}
public static class LongDescIntAscComparator implements RawComparator<IntLongPair> {
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
int comp = LongWritable.Comparator.compareBytes(b1, s1 + 4, 8, b2, s2 + 4, 8);
if (comp != 0) {
return -comp;
}
return IntWritable.Comparator.compareBytes(b1, s1, 4, b2, s2, 4);
}
public int compare(IntLongPair o1, IntLongPair o2) {
int comp = o1.getLong().compareTo(o2.getLong());
if (comp != 0) {
return -comp;
}
return o1.getInt().compareTo(o2.getInt());
}
}
public static class LongDescIntDescComparator implements RawComparator<IntLongPair> {
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
int comp = LongWritable.Comparator.compareBytes(b1, s1 + 4, 8, b2, s2 + 4, 8);
if (comp != 0) {
return -comp;
}
return -IntWritable.Comparator.compareBytes(b1, s1, 4, b2, s2, 4);
}
public int compare(IntLongPair o1, IntLongPair o2) {
int comp = o1.getLong().compareTo(o2.getLong());
if (comp != 0) {
return -comp;
}
return -o1.getInt().compareTo(o2.getInt());
}
}
public static class IntAscComparator implements RawComparator<IntLongPair> {
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
return IntWritable.Comparator.compareBytes(b1, s1, 4, b2, s2, 4);
}
public int compare(IntLongPair o1, IntLongPair o2) {
return o1.getInt().compareTo(o2.getInt());
}
}
public static class IntDescComparator implements RawComparator<IntLongPair> {
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
return -IntWritable.Comparator.compareBytes(b1, s1, 4, b2, s2, 4);
}
public int compare(IntLongPair o1, IntLongPair o2) {
return -o1.getInt().compareTo(o2.getInt());
}
}
public static class LongAscComparator implements RawComparator<IntLongPair> {
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
return LongWritable.Comparator.compareBytes(b1, s1 + 4, 8, b2, s2 + 4, 8);
}
public int compare(IntLongPair o1, IntLongPair o2) {
return o1.getLong().compareTo(o2.getLong());
}
}
public static class LongDescComparator implements RawComparator<IntLongPair> {
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
return -LongWritable.Comparator.compareBytes(b1, s1 + 4, 8, b2, s2 + 4, 8);
}
public int compare(IntLongPair o1, IntLongPair o2) {
return -o1.getLong().compareTo(o2.getLong());
}
}
/**
* Partition based on the long part of the pair.
*/
public static class LongOnlyPartitioner extends Partitioner<IntLongPair, Writable> {
@Override
public int getPartition(IntLongPair key, Writable value,
int numPartitions) {
return Math.abs(key.getLong().hashCode() & Integer.MAX_VALUE) % numPartitions;
}
}
/**
* Partition based on the int part of the pair.
*/
public static class IntOnlyPartitioner extends Partitioner<IntLongPair, Writable> {
@Override
public int getPartition(IntLongPair key, Writable value,
int numPartitions) {
return Math.abs(key.getInt().hashCode() & Integer.MAX_VALUE) % numPartitions;
}
}
}
你的reducer是否需要通過散列,排序和顯示來完成除聚集外的任何事情?另外,你不關心哈希的順序,但是同一個哈希的所有記錄必須被排序,是正確的? – Jedi
我的reduce()只需要通過散列進行聚合。但我希望所有reduce調用的輸出按時間戳排序。我不需要排序相同散列的所有記錄。我需要按時間戳排序的減速器的累積輸出。此時間戳可能來自reduce()的迭代器中的任何值。 – medloh