2012-11-04 79 views
0

我有來自兩個源的輸入:在形式多個輸入和分組比較

  1. 地圖輸出,形式爲

    output.collect(new StockKey(Text(x+" "+id), new Text(id2)), new Text(data)); 
    
  2. 地圖輸出,

    output.collect(new StockKey(new Text(x+" "+id), new Text(1), new Text(data)); 
    

作業conf:

conf.setPartitionerClass(CustomPartitioner.class); 
conf.setValueGroupingComparatorClass(StockKeyGroupingComparator.class); 

其中StockKey是一個自定義類格式(new Text(), new Text())的;

構造:

public StockKey(){ 
    this.symbol = new Text(); 
    this.timestamp = new Text(); 
} 

分組比較:

public class StockKeyGroupingComparator extends WritableComparator { 

    protected StockKeyGroupingComparator() { 
     super(StockKey.class, true); 
    } 

    public int compare(WritableComparable w1, WritableComparable w2){ 
     StockKey k1 = (StockKey)w1; 
     StockKey k2 = (StockKey)w2; 

     Text x1 = new Text(k1.getSymbol()); 
     Text x2 = new Text(k2.getSymbol()); 

     return x1.compareTo(x2); 

    } 

} 

但是

我發現了僅在地圖輸出值達到我沒有從輸入端接收地圖輸出值減速器。我希望將這兩個地圖輸出中常見的符號即new Text(x+" "+id)的記錄歸入同一個縮減器。我感到震驚。

請幫忙!

+0

不清楚輸出結果:您剛看到一個映射器的輸出?他們純粹是「id1」或「id2」? – asksw0rder

+0

id,id2是整數用戶id,比如說id1是:1234和id2是1298 –

+0

我可以讓你發佈你的分區代碼嗎?謝謝。 –

回答

1

要做到這一點,你需要它適合於如下一個分區程序:

  1. 的映射器輸出一堆記錄作爲鍵/值對
  2. 對於每一條記錄,分區傳遞關鍵,價值和減少的數量。該分區決定哪些減速將處理記錄
  3. 記錄被運到各自的分區
  4. 的GroupingComparator運行,以決定哪些鍵值對獲得分成可迭代,用於向減速的單次調用(減速)( )方法
  5. 等等...

我覺得默認的分區是選擇基於密鑰的整個價值(這是默認的行爲),每條記錄的減速分區。但是你想要的記錄只有部分鍵(只是符號而不是符號和時間戳)分組。所以你需要編寫一個分區器,並在驅動程序類中指定/配置它。

一旦你這樣做,你是分組比較應該幫你按照你的意圖分組記錄。

編輯:雜感

  • 您可能會使事情對自己更容易,如果你移動的時間戳值,使鑰匙簡單(只是符號)與價值複合體(時間戳和值)。那麼你不需要分區器或分組比較器。
  • 你沒有說任何一種方式,但你確實使用了MultipleInputs類,對吧?這是爲同一項工作調用兩個或多個映射器的唯一方法。
+0

我已經實現了一個分區器,該分區器基於密鑰的getSymbol進行分組。 BUt仍然不起作用。當我沒有問題時,我在一個不同的程序中實現了自定義分區程序+分組比較器。但是,當我用兩張地圖實現它,但它不在這裏工作時,它不起作用。 :( –

+0

多少個reducer分區正在運行? –

+0

10個分區,並且從分區器爲記錄返回的分區num始終在10 –