2017-02-25 51 views
1

我正在編寫一個mapreduce程序,其中reduce函數接收作爲輸入值的PageRankNode(具有兩個字段)對象的迭代並將它添加到優先級隊列中。在遍歷每個對象並將其添加到優先級隊列時,結果優先級隊列只包含我添加的最後一個對象。 但是,當我創建一個相同類型的新對象並添加到優先級隊列中時,它似乎能夠按預期工作。優先級隊列添加新對象vs添加已創建

我想知道爲什麼會發生這種情況? 下面的示例作品。然而,而不是「topPages.add(新PageRankNode(pageNode.pageName,pageNode.pageRank))」,我使用「topPages.add(pageNode)」它不按預期工作。

下面還添加了優先級隊列的比較器實現。

private Comparator<PageRankNode> comparator= new PageNodeComparator(); 
    private PriorityQueue<PageRankNode> topPages= new PriorityQueue<PageRankNode>(100,comparator); 

public void reduce(NullWritable key,Iterable<PageRankNode> pageNodes,Context context) throws IOException,InterruptedException{ 
    for(PageRankNode pageNode:pageNodes){ 
     //topPages.add(pageNode); 
     topPages.add(new PageRankNode(pageNode.pageName,pageNode.pageRank)); 
     if(topPages.size()>100){ 
      topPages.poll(); 
     } 
    } 
    PageRankNode pageNode; 
    while(!topPages.isEmpty()){ 
     pageNode=topPages.poll(); 
     context.write(NullWritable.get(),new Text(pageNode.pageName+":"+pageNode.pageRank)); 
    } 

} 
public class PageNodeComparator implements Comparator<PageRankNode>{ 

    public int compare(PageRankNode x,PageRankNode y){ 
     if(x.pageRank < y.pageRank){ 
      return -1; 
     } 
     if(x.pageRank > y.pageRank){ 
      return 1; 
     } 
     return 0; 
    } 
} 

回答

1

我不認爲你提供足夠的信息來正確診斷此。我發現reduce方法中有InterruptedException這個方法表明你可能在多線程上運行這個函數 - 如果是這樣的話,可能是潛在的原因。

我寫了一個小程序,其功能相同,輸出如預期。

import java.util.Arrays; 
import java.util.Comparator; 
import java.util.PriorityQueue; 

public class Main { 
    private static Comparator<PageRankNode> comparator = new PageNodeComparator(); 
    private static PriorityQueue<PageRankNode> topPages = new PriorityQueue<PageRankNode>(100, comparator); 

    public static void main(String[] args) { 
    reduce(Arrays.asList(
     new PageRankNode("A", 1000), 
     new PageRankNode("B", 1500), 
     new PageRankNode("C", 500), 
     new PageRankNode("D", 700), 
     new PageRankNode("E", 7000), 
     new PageRankNode("F", 60) 
    )); 
    } 

    public static void reduce(Iterable<PageRankNode> pageNodes) { 
    for(PageRankNode pageNode : pageNodes) { 
     //topPages.add(pageNode); 
     topPages.add(new PageRankNode(pageNode.pageName, pageNode.pageRank)); 
     if(topPages.size() > 100) { 
     topPages.poll(); 
     } 
    } 
    PageRankNode pageNode; 
    while(!topPages.isEmpty()) { 
     pageNode = topPages.poll(); 
     System.out.println(pageNode.pageName); 
    } 
    } 

    public static class PageRankNode { 
    private String pageName; 
    private int pageRank; 

    public PageRankNode(String pageName, int pageRank) { 
     this.pageName = pageName; 
     this.pageRank = pageRank; 
    } 
    } 

    public static class PageNodeComparator implements Comparator<PageRankNode> { 

    @Override 
    public int compare(PageRankNode x, PageRankNode y) { 
     if(x.pageRank < y.pageRank) { 
     return -1; 
     } 
     if(x.pageRank > y.pageRank) { 
     return 1; 
     } 
     return 0; 
    } 
    } 
} 

輸出是:

F 
C 
D 
A 
B 
E