關於第一個問題:
你或許應該整條生產線傳遞給映射器和只保留第三令牌映射和地圖(user
,1)每次。
public class AnalyzeLogs
{
public static class FindFriendMapper extends Mapper<Object, Text, Text, IntWritable> {
public void map(Object, Text value, Context context) throws IOException, InterruptedException
{
String tempStrings[] = value.toString().split(",");
context.write(new Text(tempStrings[2]), new IntWritable(1));
}
}
對於第二個問題,我相信你不能避免在第二個MR Job之後(我想不出任何其他方式)。所以第一份工作的縮減器只會聚合這些值並給出每個鍵的總和,按鍵排序。這還不是你需要的。
因此,您將此作業的輸出作爲輸入傳遞給第二個MR作業。這項工作的目標是在傳遞給reducer之前按價值做一些特殊的分類(這絕對不會)。
我們的映射器的第二件事會有如下:
public static class SortLogsMapper extends Mapper<Object, Text, Text, NullWritable> {
public void map(Object, Text value, Context context) throws IOException, InterruptedException
{
context.write(value, new NullWritable());
}
正如你所看到的,我們不爲這個映射器使用價值可言。相反,我們創建了一個密鑰,包含我們的值(我們的密鑰是key1 value1
格式)。 現在還有什麼要做,是要指定框架,它應該基於value1
而不是整個key1 value1
進行排序。因此,我們將實現一個自定義的SortComparator
:
public static class LogDescComparator extends WritableComparator
{
protected LogDescComparator()
{
super(Text.class, true);
}
@Override
public int compare(WritableComparable w1, WritableComparable w2)
{
Text t1 = (Text) w1;
Text t2 = (Text) w2;
String[] t1Items = t1.toString().split(" "); //probably it's a " "
String[] t2Items = t2.toString().split(" ");
String t1Value = t1Items[1];
String t2Value = t2Items[1];
int comp = t2Value.compareTo(t1Value); // We compare using "real" value part of our synthetic key in Descending order
return comp;
}
}
您可以設置自定義的比較如下:job.setSortComparatorClass(LogDescComparator.class);
作業的減速應該什麼都不做。但是,如果我們不設置縮減器,則映射器鍵的排序將不會完成(我們需要這樣做)。因此,您需要將IdentityReducer
設置爲第二個MR作業的Reducer,以便不減少但仍然確保映射器的合成鍵按照我們指定的方式排序。
非常感謝您的詳細解答。這真的很有幫助! – Searene
@MarkZar不客氣:) –