2013-10-27 77 views
0

我嘗試在返回成品線程的Java打印結果上製作多線程程序。 的事情是,當我運行這段代碼它只是卡住上第二個值是隊列:多線程程序中完成線程的實時輸出(CompletionService)

 System.out.println("[!] Creaing pool"); 
     int max_threads  = 50; 
     ExecutorService threadPool = Executors.newFixedThreadPool(max_threads); 
     CompletionService<String> taskCompletionService = 
     new ExecutorCompletionService<String>(threadPool); 
     String url; 

     while(our_file.hasNext()){ 

      url = our_file.next(); 
      if (url.length()>0){ 

       futures.add(
        taskCompletionService.submit(
        new GoGo(url) 
        ) 
        ); 
        } 



      int total_tasks = futures.size(); 

      while(total_tasks>0){ 
      for (int i=0; i<futures.size(); i++){ 

       try{  

        Future result = taskCompletionService.poll(); 
        if(result!=null && result.isDone()){ 
         System.out.println(result.get()); 
         total_tasks--; 

        } 
       } 
       catch (InterruptedException e) { 
        // Something went wrong with a task submitted 
        System.out.println("Error Interrupted exception"); 
        e.printStackTrace(); 
       } catch (ExecutionException e) { 
        // Something went wrong with the result 
        e.printStackTrace(); 
        System.out.println("Error get() threw exception"); 
       } 

       } 


      } 

       } 


     threadPool.shutdown(); 
     try { 
     threadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); 
     } 
     catch (InterruptedException e) { 
     } 



... 



    class GoGo implements Callable{ 

     private String url; 

     public GoGo(String received_url){ 
      this.url = received_url; 
     } 

     public String call(){ 

      String URL = this.url; 
      return url; 

     } 
    } 

輸出是這樣的:

[!] Creaing pool 
http://www.www1.com/ 
http://www.www2.ch/ 

,並在這一點上節目只是stucks。 我試圖移動循環迭代期貨數組出主循環提交線程,它工作正常,但萬一如果我會通過非常大的文件我需要實時輸出。 請幫我弄清楚瓶頸在哪裏,我無法找到使用CompletionService的非阻塞poll()方法的任何合適的代碼片段。 感謝您的任何答覆或參考。

回答

1

問題是你正試圖在一個線程中同時做兩件事(提交工作,並閱讀工作結果)。

這沒有意義 - 對於同時執行的任務,您需要多個線程。

因此,創建另一個線程來讀取結果。或另一個線程提交任務。不管你用哪種方式做;無論哪種方式,你最終有2個線程,而不是一個。

0

感謝羅賓格林的建議,把未來的收割機班分開線程解決了問題!所以,我只是啓動無限循環線程,用poll()彈出參數,檢查彈出的未來對象是否指示線程isDone()和寫入輸出。關閉fixedThreadPool後,輸出寫入器類將停止。這裏的代碼(GoGo類除外):

public class headScanner { 


     public static List<Future<String>> gloabal_futures = new ArrayList<Future<String>>(); 


     public static void main(String args[]){ 

      Scanner our_file     = null; 
      ArrayList<String> our_urls  = new ArrayList<String>(); 
      List<Future<String>> futures  = new ArrayList<Future<String>>(); 
      ArrayList<String> urls_buffer = new ArrayList<String>(); 


      try { 
      our_file = new Scanner (new File ("list.txt")); 
      } 
      catch(IOException e){ 
       System.out.println("[-] Cant open the file!"); 
       System.exit(0); 
      } 

      System.out.println("[!] Creaing pool"); 
      int max_threads  = 50; 
      ExecutorService threadPool = Executors.newFixedThreadPool(max_threads); 
      CompletionService<String> taskCompletionService = 
      new ExecutorCompletionService<String>(threadPool); 
      String url; 



      Thread result_thread = new Thread(new ResultHarvester(futures.size(), taskCompletionService)); 
      result_thread.start(); 

      while(our_file.hasNext()){ 

       url = our_file.next(); 
       if (url.length()>0){ 

        futures.add(
         taskCompletionService.submit(
         new GoGo(url) 
         ) 
         ); 
         } 

       } 



      threadPool.shutdown(); 
      try { 
      threadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); 
      } 
      catch (InterruptedException e) { 
      } 

      result_thread.stop(); 

     } 
    } 


class ResultHarvester implements Runnable { 

    private int size; 
    private CompletionService<String> all_service; 

    public ResultHarvester (int size, CompletionService<String> service){ 
     this.size = size; 
     this.all_service = service; 
    } 

    public void run(){ 
      int future_size = 1; 
      CompletionService<String> this_service = this.all_service; 
       while(true){ 
        Future result = this_service.poll(); 
        try { 
          if(result!=null && result.isDone()){ 
           String output = result.get().toString(); 
           if(output.length()>1){ 
            System.out.println(output); 
           } 

          } 
         } 

        catch (InterruptedException e) { 
         // Something went wrong with a task submitted 
         System.out.println("Error Interrupted exception"); 
         e.printStackTrace(); 
        } catch (ExecutionException e) { 
         // Something went wrong with the result 
         e.printStackTrace(); 
         System.out.println("Error get() threw exception"); 
        } 


        } 
       } 

     }