2013-07-19 107 views
1

我建立了一個基本的網頁解析器,它使用hadoop來把URL傳遞給多個線程。這很好地工作,直到我到達輸入文件的末尾,Hadoop會在線程仍在運行時聲明自己已完成。這會導致org.apache.hadoop.fs.FSError錯誤:java.io.IOException:Stream Closed。無論如何要保持這個流線足夠長的時間來完成線程嗎? (我可以用合理的準確度預測線程在單個url上花費的最大時間量)。如何防止hadoop流關閉?

繼承人我是如何執行的線程

public static class Map extends MapReduceBase implements 
      Mapper<LongWritable, Text, Text, Text> { 
     private Text word = new Text(); 
     private URLPile pile = new URLPile(); 
     private MSLiteThread[] Threads = new MSLiteThread[16]; 
     private boolean once = true; 

     @Override 
     public void map(LongWritable key, Text value, 
       OutputCollector<Text, Text> output, Reporter reporter) { 

      String url = value.toString(); 
      StringTokenizer urls = new StringTokenizer(url); 
      Config.LoggerProvider = LoggerProvider.DISABLED; 
      System.out.println("In Mapper"); 
      if (once) { 
       for (MSLiteThread thread : Threads) { 
        System.out.println("created thread"); 
        thread = new MSLiteThread(pile); 
        thread.start(); 
       } 
       once = false; 
      } 

      while (urls.hasMoreTokens()) { 
       try { 
        word.set(urls.nextToken()); 
        String currenturl = word.toString(); 
        pile.addUrl(currenturl, output); 

       } catch (Exception e) { 
        e.printStackTrace(); 
        continue; 
       } 

      } 

     } 

螺紋自己得到這樣

public void run(){ 
      try { 
      sleep(3000); 
       while(!done()){ 
        try { 
        System.out.println("in thread"); 
         MSLiteURL tempURL = pile.getNextURL(); 
         String currenturl = tempURL.getURL(); 
         urlParser.parse(currenturl); 
         urlText.set(""); 
         titleText.set(currenturl+urlParser.export()); 
         System.out.println(urlText.toString()+titleText.toString()); 
         tempURL.getOutput().collect(urlText, titleText); 
         pile.doneParsing(); 
        sleep(30); 
        } catch (Exception e) { 
          pile.doneParsing(); 
        e.printStackTrace(); 
         continue; 
        } 
       } 
      } catch (InterruptedException e) { 
       // TODO Auto-generated catch block 
       e.printStackTrace(); 
      } 
      System.out.println("Thread done"); 

     } 

的網址,並在urlpile相關的方法是

public synchronized void addUrl(String url,OutputCollector<Text, Text> output) throws InterruptedException { 
     while(queue.size()>16){ 
      System.out.println("queue full"); 
      wait(); 
     } 
     finishedParcing--; 
     queue.add(new MSLiteURL(output,url)); 
     notifyAll(); 
    } 

    private Queue<MSLiteURL> queue = new LinkedList<MSLiteURL>(); 
    private int sent = 0; 
    private int finishedParcing = 0; 
    public synchronized MSLiteURL getNextURL() throws InterruptedException { 

     notifyAll(); 
     sent++; 
     //System.out.println(queue.peek()); 
     return queue.remove(); 

    } 

回答

1

正如我可以推斷從下面的評論中,你可以在每個map()函數中做到這一點,使事情變得簡單。 我看到你做了以下事情,以預先創建一些空閒線程。 可以所以下面的代碼移到

if (once) { 
    for (MSLiteThread thread : Threads) { 
    System.out.println("created thread"); 
    thread = new MSLiteThread(pile); 
    thread.start(); 
    } 
once = false; 
} 

到,

public static class Map extends MapReduceBase implements 
      Mapper<LongWritable, Text, Text, Text> { 
    @Override 
    public void configure(JobConf job) { 
     for (MSLiteThread thread : Threads) { 
     System.out.println("created thread"); 
     thread = new MSLiteThread(pile); 
     thread.start(); 
     } 
    } 

    @Override 
    public void map(LongWritable key, Text value, 
     OutputCollector<Text, Text> output, Reporter reporter) { 
    } 

} 

,這可能得到一次初始化,對於這個問題,不需要「一次」條件檢查了。

此外,您不需要像上面那樣製作空閒線程。 我不知道你會得到多少性能增益創建16個空閒線程。

不管怎麼說,這裏是一個解決方案(可能不是十全十美)

您可以使用類似的CountDownLatch Read more here處理您的網址或N個批次越來越封鎖,直到他們完成。這是因爲,如果將每個傳入的URL記錄釋放到一個線程中,下一個URL將被立即取回,並且當您以相同的方式處理最後一個url時,即使您還有線程,map()函數也會返回在隊列中進行處理。你將不可避免地得到你提到的例外。

這裏舉一個例子,說明如何使用倒數計時器阻塞。

public static class Map extends MapReduceBase implements 
       Mapper<LongWritable, Text, Text, Text> { 

      @Override 
      public void map(LongWritable key, Text value, 
       OutputCollector<Text, Text> output, Reporter reporter) { 

       String url = value.toString(); 
       StringTokenizer urls = new StringTokenizer(url); 
       Config.LoggerProvider = LoggerProvider.DISABLED; 

      //setting countdownlatch to urls.countTokens() to block off that many threads. 
      final CountDownLatch latch = new CountDownLatch(urls.countTokens()); 
      while (urls.hasMoreTokens()) { 
       try { 
        word.set(urls.nextToken()); 
        String currenturl = word.toString(); 
        //create thread and fire for current URL here 
        thread = new URLProcessingThread(currentURL, latch); 
        thread.start(); 
       } catch (Exception e) { 
        e.printStackTrace(); 
        continue; 
       } 

      } 

      latch.await();//wait for 16 threads to complete execution 
      //sleep here for sometime if you wish 

     } 

    } 

最後,URLProcessingThread只要一個URL處理減少鎖存計數器,

public class URLProcessingThread implments Runnable { 
    CountDownLatch latch; 
    URL url; 
    public URLProcessingThread(URL url, CountDownLatch latch){ 
     this.latch = latch; 
     this.url = url; 
    } 
    void run() { 
     //process url here 
     //after everything finishes decrement the latch 
     latch.countDown();//reduce count of CountDownLatch by 1 

    } 
} 

與您的代碼看出大概問題:pile.addUrl(currenturl, output);,當你添加一個新的URL,在此期間所有16個線程都會得到更新(我不太確定),因爲同一個一堆對象被傳遞給16個線程。有可能你的網址被重新處理,或者你可能會得到一些其他的副作用(我對此不太確定)。

其他建議:

此外,您可能需要使用

mapred.task.timeout

(默認值= 600000ms)= 10分鐘

Description: The number of milliseconds before a task will be terminated if it neither reads an input, writes an output, nor updates its status string.

您可以添加/覆蓋此增加地圖任務超時property map in mapred-site.xml

+0

那dec如果它真的超時,這個任務就會失敗,這並不是我想要的,但它似乎是正確的。 – Chenab

+1

啊!我可能錯過了問題中的一些細節。你是說你有從單個地圖任務運行的線程,並且當地圖完成處理它的輸入時,Hadoop退出了嗎? –

+0

或多或少。線程一段時間來處理每個輸入,這就是爲什麼我有更多的一個。然而,一旦hadoop聲明地圖任務完成,線程就沒有放置輸出的地方。 – Chenab