2013-07-25 84 views
1

我正在處理hadoop中輸入的日誌文件,其中的密鑰不均勻分佈。這意味着減值者的價值分佈不均衡。例如,key1有1個值,key2有1000個值。處理Hadoop中密鑰間值的不均勻分佈mapreduce

有沒有辦法做一個相同的鍵關聯的值的負載均衡[我不想改變我的鑰匙也如果你知道哪些鍵將會有一個不同尋常的大量的

+1

你能從算法的角度描述你的工作嗎 - 一旦他們進入reducer(例如它是一個總和/分鐘/最大/平均計算或類似的 - 你可以做什麼?部分計算會被移植到組合器中,以減少映射器和縮減器之間數據流的偏斜鍵?) –

回答

0

值,你可以使用下面的技巧。

您可以實現自定義Partitioner這將確保每個歪斜鍵進入到一個分區,然後一切就由獲得分配給其餘的分區的hashCode(這是默認HashPartitioner一樣)。

您可以通過實現此接口創建自定義Partitioner

public interface Partitioner<K, V> extends JobConfigurable { 
    int getPartition(K key, V value, int numPartitions); 
} 

然後你就可以告訴Hadoop的使用您的Partitioner有:

conf.setPartitionerClass(CustomPartitioner.class); 
+0

非常感謝@charles。不幸的是,我不知道哪個鍵會有大量的值。同樣在你的解決方案中,這種方法會導致一個特定的reducer [接收1000個值的那個]來處理大量的數據。我擔心的原因是因爲對於屬於特定鍵的每個值,我都進行了大量計算[可以說某個鍵將有75000個值,並且我將遍歷reducer中的值並進行一些每次需要2分鐘的計算] – udag

0

也許你能擊中之前使用組合減速?這是相當推測...

想法是將每組密鑰分區成預設最大大小的分區,然後將這些分區的k/v對輸出到reducer。這段代碼假設你已經在你的配置中設置了這個大小。

public static class myCombiner extends Reducer<Text, Text, Text, Text> { 
    public void reduce(Text key, Iterable<Text> values, Context context) 
      throws IOException, InterruptedException { 

     List<Text> textList = new ArrayList<Text>(); 
     int part = 0; 

     while (values.iterator().hasNext()) { 
      if (textList.size() <= Integer.parseInt(context.getConfiguration().get("yourMaxSize"))) { 
       textList.add(values.iterator().next()); 

      } else { 
       for(Text t : textList) { 
        //essentially partitioning each key... 
        context.write(new Text(key.toString() + "_" + Integer.toString(part)), t); 
       } 
       textList.clear(); 
      } 
      part += 1; 
     } 
     //output any stragglers ... 
     for(Text t : textList) { 
      context.write(new Text(key.toString() + "_" + Integer.toString(part)), t); 
     } 

    } 
}