2014-11-05 21 views
0

我希望在這裏有人可以幫助我,我是使用線程的新手,我需要做的就是在代碼中放置一個標誌,以通知所有線程完成並那麼它會調用一個方法來更新表來標記它已完成。我已經閱讀了很多關於執行器框架的知識,但我不知道如何實現它。如何在我的代碼中實現newSingleThreadExecutor或newFixedThreadPool

這裏是我的代碼:

public static void main(String[] args) throws SQLException { 
    LOG.info(args.length); 

    LOG.info("Args = " + args); 
    if (args != null && args.length > 0 && args[0] != null && args[0].equals("stop")) { 
     stop(args); 
    } else { 
     new Thread(new PayatReconService()).start(); 
     LOG.info("This is after the main method"); 
    } 
} 

static void stop(String[] args) { 
    LOG.info("STOPPING SERVER!"); 
    // Need to stop all TimerTasks here 
    running = false; 
    try { 
     // sleep for a second, allow threads to close 
     Thread.sleep(1000); 
    } catch (InterruptedException ex) { 
     LOG.info("exception " + ex.getMessage()); 
    } 

    System.exit(0); 
} 

@Override 
public void run() { 
    running = true; 
    PayatReconService reconService = new PayatReconService(); 
    try { 
     mailingQueue = new MailingQueue(prop); 
     LOG.info("Starting Recon Service"); 
     ProcessRecon pr = new ProcessRecon(); 
     //Starts startRecon method in processRecon class 
     pr.startRecon(); 

    } catch (Exception ex) { 
     LOG.error(ex, ex); 
    } 

    } 
} 

ProcessRecon.java:

public void startRecon() throws SQLException { 
    //log starting recon 
    loadRetailers(); 
} 




public void loadRetailers() throws SQLException { 
DB db = new DB(); 
try (Connection con = db.getConnection(); PreparedStatement loadSt = con.prepareStatement(DB.LOAD_RETAILERS)){ 
     PayatReconService.LOG.info("Load Retailers"); 


     rs = loadSt.executeQuery(); 
     while (rs.next()) { 
      Retailer r = new Retailer(); 
      //Retrieve retailer information from company table 
      r.setFtpFolder(rs.getString("ftp_folder")); 
      r.setFtpPassword(rs.getString("ftp_password")); 
      r.setFtpUsername(rs.getString("ftp_username")); 
      r.setIsAutoRecon(rs.getBoolean("autorecon")); 
      r.setIsSFTP(rs.getBoolean("sftp")); 
      r.setReconFolder(rs.getString("reconfolder")); 
      r.setRemoteFTPFolder(rs.getString("remoteftphost")); 
      r.setRemoteFTPPassword(rs.getString("remoteftppassword")); 
      r.setRemoteFTPUsername(rs.getString("remoteftpusername")); 
      r.setReportPrefix(rs.getString("reportprefix")); 
      r.setRetailID(rs.getString("recid")); 
      r.setName(rs.getString("name")); 


      new Thread(r).start(); 

     } 
    } catch (Exception ex) { 
     System.out.println(ex.getMessage()); 
     PayatReconService.LOG.info(ex.getMessage()); 
    } 
} 

}

因此,在這個類是從數據庫中檢索的所有信息,並將其鏈接到一個線程在零售商類中,一旦檢索到所有信息,它就激活運行方法。

Retailer.java:

@Override 
    public void run() { 
     try { 

     Thread thread = Thread.currentThread();   
     thread.setName(getName()); // Set thread name 
    System.out.println("RunnableJob is being run by " + thread.getName() + " (" + thread.getId() + ")"); 
      PayatReconService.LOG.info("RunnableJob is being run by " + thread.getName() + " (" + thread.getId() + ")"); 

     DB db = new DB(); 
     try (Connection con = db.getConnection(); Statement st = con.createStatement()){ 

      while (PayatReconService.running) { 
       try { 
        Thread.sleep(1000); 
       } catch (InterruptedException ex) { 
        Logger.getLogger(Retailer.class.getName()).log(Level.SEVERE, null, ex); 
       } 
       PayatReconService.LOG.info("Check if allowed to run"); 
       if (canRun()) { 
        PayatReconService.LOG.info("Run approved"); 
        searchReconFile(); 
        // the flag needs to be here to indicate that all the threads are done running 
        System.out.println("Mark table as complete"); 
        markasComplete(); 

        PayatReconService.LOG.info("Waiting to rerun"); 
        //wait 5min before trying again 
        Thread.sleep(300000); 
        PayatReconService.LOG.info("Go");      
       } else {       
        Thread.sleep(300000); 
       } 
      } 
     } catch (Exception e) { 
      //sql connect error 
      System.out.println(e.getMessage()); 
      PayatReconService.LOG.info(e.getMessage()); 
      setErrors("01"); 
     } 

    } catch (SQLException ex) { 
     Logger.getLogger(Retailer.class.getName()).log(Level.SEVERE, null, ex); 
    } 

    } 

    public void markasComplete() throws SQLException { 
    PayatReconService.LOG.info("Update [email protected] as complete."); 
    DB db = new DB(); 
    try (Connection con = db.getConnection(); Statement st = con.createStatement()){ 

     String sql; 
     sql = " update [email protected] set scheduled = 0 " 
       + " where timertype in ('recon_new')"; 
     st.executeUpdate(sql); 
    } catch (SQLException e) { 
     PayatReconService.LOG.error(e.getMessage()); 
    } 

} 

所以之前markasComplete()可以運行,所有的線程應該先完成,然後更新表,如果有人能幫我,將是巨大的或點我在正確的方向。

感謝

回答

0

有一對夫婦的方式 - 可能是最好的方法是使用一個鎖存器,例如代碼啓動10個任務和一個單獨的一個,其運行時,他們都完成了:

public void test1() throws InterruptedException { 
    final ExecutorService service = Executors.newFixedThreadPool(5); 
    final CountDownLatch latch = new CountDownLatch(10); 

    for(int i = 0; i < 10; i++) { 
     final int threadId = i; 
     Callable<?> c = new Callable<Object>() { 
      @Override 
      public Object call() throws Exception { 
       long waitUntil = (long) (System.currentTimeMillis() + (Math.random() * 5000)); 
       while(System.currentTimeMillis() < waitUntil); 
       latch.countDown(); 
       System.out.println("Finished task " + threadId); 
       return null; 
      } 
     }; 
     service.submit(c); 
    } 

    // Start a thread which will run when the others are finished. 
    new Thread(new Runnable() { 
     @Override 
     public void run() { 
      try { latch.await(); } catch (InterruptedException e) { } 
      System.out.println("All finished"); 
     } 
    }).start(); 

    // Carry on main thread.... 
    Thread.sleep(60000); 
} 

當然,你可以只關閉執行服務,並等待它完成,在這種情況下,你並不需要一個鎖,用替換最後一個線程:

new Thread(new Runnable() { 
     @Override 
     public void run() { 
      service.shutdown(); 
      try { service.awaitTermination(10, TimeUnit.DAYS); } catch (InterruptedException e) { } 
      System.out.println("All finished"); 
     } 
    }).start();