2014-03-12 271 views
1

我有一個java代碼(很舊的遺留問題),它創建了很多線程。 當進程運行時,一些線程在沒有任何跟蹤的情況下被殺死。看看代碼,我覺得異常處理是正確的。但我不確定爲什麼一些線程會被殺死。請問這個論壇的任何專家是否幫助我瞭解這個類是否有關於處理線程的缺失?Java線程死亡

代碼如下:

public class WorkerGroup { 

    // This value shows the number of all none-idleling workers. 
    private volatile int m_numbersOfActiveWorkingWorkers = 0; 
    // This value is set by when a single worker wants to starve multi workers 
    private volatile int m_starveRequests = 0; 
    // Thread group 
    // Java Collection of all workers. 
    protected static ThreadGroup s_workGroup = null; 
    // Queue manager 
    // Work group name 
    private final String m_workGroupName; 
    // Shut Down process which is hooked onto termination process of the 
    // application. 
    private static FtpWorkerShutdownHook s_shutDownHook = null; 
    private final Log m_log = Log.create(WorkerGroup.class); 
    /** 
    * CTOR WorkerGroup and collector of workers 
    * @param workGroupName will name the workGroup 
    * @param manager Queue manager 
    */ 
    public WorkerGroup(final String workGroupName) { 
    super(); 
    m_workGroupName = workGroupName; 
    if (s_shutDownHook == null) { 
     s_shutDownHook = new FtpWorkerShutdownHook(); 
     Runtime.getRuntime().addShutdownHook(s_shutDownHook); 
    } 
    } 
    /** 
    * Start procedure to start all workers. This function can only be called 
    * once. 
    * @param numberOfWorkers The number of available workers for this group 
    */ 
    public void startWorkers(int numberOfWorkers) { 
    // can not use negative count 
    if (numberOfWorkers < 0) { 
     return; 
    } 
    // workgroup is assigned alread. Run once only. 
    if (s_workGroup != null) { 
     return; 
    } 
    // creation of the working group. All workers and the work group are 
    // set to be deamon threads. This will keep the main process alive until 
    // all workers are terminated. 
    s_workGroup = new ThreadGroup("Workers of " + m_workGroupName); 
    s_workGroup.setDaemon(true); 

    // maximize number of workers 
    if (numberOfWorkers > 16) { 
     numberOfWorkers = 16; 
    } 
    // create and start all workers. 
    for (int i = 0; i < numberOfWorkers; i++) { 
     startWorker("FtpWorker" + String.valueOf(i + 1)); 
    } 
    } 

// public void startWorker(final String workerName, final ITaskActions task) { 
// final FtpWorker worker = new FtpWorker(workerName, this); 
// worker.setDaemon(true); 
// worker.start(); 
// } 
    public void startWorker(final String workerName) { 
    final FtpWorker worker = new FtpWorker(workerName, this); 
    worker.setDaemon(true); 
    worker.start(); 
    } 
    /** 
    * Explicit shutdown procedure. The normal case is to kill the process 
    */ 
    public void shutDown() { 
    final Thread thread = new FtpWorkerShutdownHook(); 
    thread.start(); 
    try { 
     thread.join(); 
     m_log.event("ftp work group shutdown thread terminated"); 
     // LogAgent 
    } 
    catch (InterruptedException e) { 
     e.printStackTrace(); 
    } 
    } 

    /** 
    * Returns the number of running (non terminated) threads assinged the the 
    * thread group 
    * @return Number of active threads 
    */ 
    protected int activeCount() { 
    if (s_workGroup == null) { 
     return 0; 
    } 
    else { 
     synchronized (s_workGroup) { 
     final int count = s_workGroup.activeCount(); 
     // LogAgent.event("work group", "active count" + count + " active " 
     // + m_numbersOfActiveWorkingWorkers); 
     return count; 
     } 
    } 
    } 

    /** 
    * This function returns if the worker group is active This function may be 
    * overriden. 
    * 
    * @return True if there is workers available and/or 
    */ 
    public synchronized boolean runnable() { 
    return activeCount() > 0; 
    } 

    // ************************************************************************** 
    // Mutex operation 
    // ************************************************************************** 

    private final Mutex m_mutex = new Mutex(); 

    public synchronized void incrementRunningWorkers() { 
    if (m_numbersOfActiveWorkingWorkers == 0) { 
     try { 
     m_mutex.acquire(); 
     } 
     catch (InterruptedException e) { 
     e.printStackTrace(); 
     } 
    } 
    ++m_numbersOfActiveWorkingWorkers; 
    } 

    public synchronized void decrementRunningWorkers() { 
    --m_numbersOfActiveWorkingWorkers; 
    if (m_numbersOfActiveWorkingWorkers == 0) { 
     m_mutex.release(); 
    } 
    } 

    protected void incrementStarveRequest() { 
    synchronized (m_mutex) { 
     m_starveRequests++; 
    } 
    } 

    protected void decrementStarveRequest() { 
    synchronized (m_mutex) { 
     m_starveRequests--; 
    } 
    } 

    public void acquire() { 
    try { 
     synchronized (m_mutex) { 
     m_mutex.acquire(); 
     } 
    } 
    catch (InterruptedException e) { 
     e.printStackTrace(); 
    } 
    } 

    public void release() { 
    synchronized (m_mutex) { 
     m_mutex.release(); 
    } 
    } 

    // -------------------------------------------------------------------------- 
    // Basic Test getter methods 
    // -------------------------------------------------------------------------- 

    protected final int getWorkingWorkersCount() { 
    return m_numbersOfActiveWorkingWorkers; 
    } 

    protected final boolean workersIsStarving() { 
    return m_starveRequests != 0; 
    } 
} 

Ftpworker類是:

public class FtpWorker extends Thread{ 


    // This instance will use log facility for debug and for CIF log. 
    private final Log m_log = Log.create(FtpWorker.class); 

    // pointer to onwer of this worker 
    protected FtpWorkerGroup m_workerGroup = null; 

    // setting m_running to false cause worker to terminate 
    protected boolean m_running = true; 
    private static boolean isRetryList = false; 

    // default sleeping time in mSec 
    protected int m_workerSleep = 2000; 

    protected EAServer m_eaServer = EAServer.createInstance(); 

    private final NeHandler m_neManager = NeHandler.getHandler(); 

    protected List<String> fileNotf = new Vector<String>(); 

    protected List<String> retrylist = Collections.synchronizedList(new ArrayList<String>()); 

    public FtpWorker(final String name, final FtpWorkerGroup workerGroup) { 
    super(FtpWorkerGroup.s_workGroup,name); 

    } 

    public void run() { 
    idle(); 
    shutDownEvent(); 
    } 

    private String getTask() { 
    String notification=null; 
    String nename; 
    NeInfo ne=null; 
    int delim; 
    if(retrylist.size()>0){ 
     m_log.trace("============ RETRY LIST===== " + Thread.currentThread().getName()); 
     m_log.trace("RETRY LIST ::" + retrylist) ; 

    if((retrylist.isEmpty())){ 
     m_log.trace("RETRY LIST IS EMPTY "); 

     return null; 
    } 
     notification= retrylist.remove(0); 
     m_log.trace("Thread :: " + Thread.currentThread().getName() + " scanning list for Notification[RETRY] :: " + notification); 
     delim = notification.indexOf("$"); 
     nename= notification.substring(0, delim); 
     try { 
     ne = m_neManager.getNetworkElement(nename); 
     }catch (NeNotFoundException e1) { 
     m_log.critical("Unable to get properties for Network Elemenet " 
      + nename + " Reason: " + e1.getMessage()+ "The Network Element is of Unsupported Version");  
     } 
    if((ne.getValidState()== NeInfo.NE_IS_STOPPED) || (ne.getValidState()== NeInfo.NE_IS_STOPPING)){ 
     if((retrylist.isEmpty())){ 
     m_log.trace("RETRY LIST IS EMPTY "); 
     FtpWorker.isRetryList = false; 
     return null; 
     } 
     else{ 
     int size = retrylist.size(); 
     for(Iterator it = retrylist.iterator();it.hasNext();){ 
      String test = (String)it.next(); 
      // m_log.trace(" RETRY LIST files to be deleted, test is "+test); 
      if(test.contains(ne.getName())){ 
      //retrylist.remove(test); 
      it.remove(); 
      m_log.trace("file"+ test+ "is deleted from list"); 
      m_log.trace(" NEW RETRY LIST FILE NOTIFICATION LIST ::" + retrylist) ; 
      } 
     } 
     if((retrylist.isEmpty())){ 
      m_log.trace("RETRY LIST IS EMPTY "); 
      FtpWorker.isRetryList = false; 
     } 
     return null; 
     } 
     } 
     if(!ne.getisInFtp()){ 
     m_log.trace("FTP Not Set[RETRY] " +notification); 
     ne.setisInFtp(); 
     m_log.trace("FETCH FILE USING NOTIFICATION[RETRY] " +notification); 
     FtpWorker.isRetryList = false; 
     m_log.trace("============Returning from RETRY LIST===== " + Thread.currentThread().getName()); 
     return notification; 
     } 
     else{ 
     m_log.trace("FTP Set. ADDING NOTIFICATION BACK TO RETRY QUEUE:::: " +notification); 
     FtpWorker.isRetryList = true; 
     retrylist.add(notification); 
     m_log.trace("============Returning NULL RETRY LIST===== " + Thread.currentThread().getName()); 
     return null; 
     } 
    } 
    if(EAServer.fileNotification!=null && (EAServer.fileNotification.size())>0 && !(isRetryList)){ 
     m_log.trace("====ORIGINAL FILE NOTIFICATION LIST== " + Thread.currentThread().getName()); 
     try{ 

     m_log.trace("ORIGINAL FILE NOTIFICATION LIST ::" + EAServer.fileNotification + " for Thread " + Thread.currentThread().getName()) ; 

     if((EAServer.fileNotification.isEmpty())){ 
     m_log.trace("ORIGINAL LIST IS EMPTY "); 
     return null; 
     } 


     notification = (String)EAServer.fileNotification.remove(0); 

     m_log.trace("Thread :: " + Thread.currentThread().getName() + " scanning list for Notification[ORIGINAL] :: " + notification); 
     delim = notification.indexOf("$"); 
     nename= notification.substring(0, delim); 
     m_log.trace("NOTIFICATION FOUND ::: " +notification); 
     try { 
     ne = m_neManager.getNetworkElement(nename); 
     }catch (NeNotFoundException e1) { 
     m_log.critical("Unable to get properties for Network Elemenet " 
      + nename + " Reason: " + e1.getMessage()+ "The Network Element is of Unsupported Version");  
     } 
     }catch(Exception e){ 
     m_log.trace("Exception caught while reading fileNotification " + e.getMessage() + " for Thread " + Thread.currentThread().getName()); 
     return null; 
     } 
     if((ne.getValidState()== NeInfo.NE_IS_STOPPED) || (ne.getValidState()== NeInfo.NE_IS_STOPPING)){ 
     try { 
      ne.setisNotInFtp(); 
      ne.deleteFileWithoutCollecting(ne.getFilePath()); 
      ne.setValidState(NeInfo.NE_IS_STOPPED); 
     } 
     catch (CDMException e) { 
      m_log.trace("Could not delete file for " + ne.getName() 
       + " because " + e.getMessage()); 
     } 
     if((EAServer.fileNotification.isEmpty())){ 
      m_log.trace("ORIGINAL LIST IS EMPTY NOW"); 
      return null; 
     } 
     else{ 
     try { 
     m_log.trace(" DELETING for NE Since Node is Stopped "+ ne.getName()); 
     int size = EAServer.fileNotification.size(); 
     m_log.trace("Size of ORIGINAL NOTIFICATION LIST " + size); 
     for(Iterator it = EAServer.fileNotification.iterator();it.hasNext();){ 
      String test = (String)it.next(); 
      if(test.contains(ne.getName())){ 
      it.remove(); 
      m_log.trace("file"+ test+ "is deleted from list"); 
      m_log.trace(" NEW ORIGINAL FILE NOTIFICATION LIST ::" + EAServer.fileNotification) ; 
      } 
     } 
     return null; 
     } 
     catch(Exception e){ 
      m_log.trace(" CCCCCC IN exception" + e.getMessage()); 
      return null; 
     } 
     } 


     } 
     if(!ne.getisInFtp()){ 
     /* if((ne.getValidState() == NeInfo.NE_IS_STOPPED) || (ne.getValidState() == NeInfo.NE_IS_STOPPING)){ 
      return null; 
     }*/ 
     m_log.trace("FTP Not Set[ORIGINAL] " +notification); 
     ne.setisInFtp(); 
    /* //Remove 
     try{ 
      m_log.trace("Thread is going to sleep"); 
     Thread.sleep(120000); 
     } 
     catch(Exception e){ 
     //Do nothing 
     } 
     //Remve end */ 
     m_log.trace("FETCH FILE USING NOTIFICATION[ORIGINAL] " +notification); 
     m_log.trace("====Returning from ORIGINAL LIST===== " + Thread.currentThread().getName()); 
     return notification; 
     } 
     else{ 
     m_log.trace("FTP Set.ADDING NOTIFICATION BACK TO RETRY QUEUE:::: ::: "+notification); 
//  EAServer.fileNotification.add(notification); 
     FtpWorker.isRetryList = true; 
     retrylist.add(notification); 
     m_log.trace("===+Returning NULL from ORIGINAL LIST===== " + Thread.currentThread().getName()); 
     return null; 
     } 
    } 

    return null; 
    } 

    private void idle() { 
    while (m_running) { 
    // m_log.trace("ftp wroker" + Thread.currentThread().getName() + " is active"); 
     final String task = getTask(); 
     if (task != null) { 
     // if a task is found increment working threads and run work as long 
     // there exists tasks. 
     work(task); 
     } 
    /* else{ 
     m_log.trace("task is null which means no file notifications received yet"); 
     }*/ 
     try { 
     //m_log.trace("No TASK.SLEEPING" + Thread.currentThread().getName()); 
     sleep(m_workerSleep); 
     } 
     catch (InterruptedException e) { 
     interruptEvent(e); 
     } 
    } 
    } 

    private void work(final String t) { 
    String task = t; 
    do { 
     m_eaServer.execute(task); 
     m_log.trace("TASK EXECUTED :::: " + task + " by Thread " + Thread.currentThread().getName()); 
     yield(); 
     task = getTask(); 

    } 
    while ((task != null) && m_running); 
    } 

    public void shutDown() { 
    m_running = false; 
    } 

    /** 
    * Generates and logs an event. Should be called when the the worker 
    * shutdowns. 
    */ 
    protected void shutDownEvent() { 
    m_log.event("worker " + super.getName() + " has stopped.", 
     LogEventType.KERNEL_WORKER_EVENT); 
    } 

    /** 
    * Generates and logs an event from the given Exception. Should be called when 
    * the worker has been interupted. 
    * 
    * @param e the exception. 
    */ 
    protected void interruptEvent(final Exception e) { 
    m_log.event("worker " + super.getName() + " has interrupted, cause by " 
     + e.getMessage() + ".", LogEventType.KERNEL_WORKER_EVENT); 
    } 


} 
+0

什麼是'FtpWorker'?你怎麼知道他們被殺了?有可能ftp(?)不再「工作」,並且該線程靜靜地終止? – PeterMmm

+0

你的FTPWorker類是什麼樣的? – DaveH

回答

0

如果線程退出,而無需登錄關閉事件,那麼我會認爲事情是拋出一個運行時異常。對於初學者,我會嘗試以下看看是什麼被拋出:

public void run() { 
    try { 
    idle(); 
    shutDownEvent(); 
    } catch (Throwable e) { 
    m_log.critical(e.getMessage(); 
    } 
}