0

我使用線程池執行器產生5個線程並行執行5個不同的命令。每個線程完成後,我將以threadid的條目作爲鍵更新併發散列映射,並將其作爲值終止。但是我的線程池並沒有更新成功完成命令執行的hashmap。線程池執行器不更新併發哈希映射

主類:

package com.cisco.executor; 

import java.util.concurrent.ConcurrentHashMap; 
import java.util.concurrent.Executors; 
import java.util.concurrent.ThreadPoolExecutor; 
import java.util.ArrayList; 
import java.util.Arrays; 
import java.util.Iterator; 
import java.util.List; 
import java.util.Map; 

public class MainExecutor { 

    static String element; 
    static ConcurrentHashMap<Integer, String> map = new ConcurrentHashMap<Integer, String>(); 
    static Integer array[] = { 1, 2, 3, 4, 5 }; 
// static Integer array[] = { 1 }; 
    static List<Integer> threadid = Arrays.asList(array); 
    static String SQOOP_XXCCS_DS_SAHDR_CORE = ReadProperties.getInstance().getProperty("SQOOP_XXCCS_DS_SAHDR_CORE"); 
    static String SQOOP_XXCCS_DS_CVDPRDLINE_DETAIL = ReadProperties.getInstance() 
      .getProperty("SQOOP_XXCCS_DS_CVDPRDLINE_DETAIL"); 
    static String SQOOP_XXCCS_DS_INSTANCE_DETAIL = ReadProperties.getInstance() 
      .getProperty("SQOOP_XXCCS_DS_INSTANCE_DETAIL"); 
    static String SQOOP_XXCCS_SCDC_PRODUCT_PROFILE = ReadProperties.getInstance() 
      .getProperty("SQOOP_XXCCS_SCDC_PRODUCT_PROFILE"); 
    static String SQOOP_MTL_SYSTEM_ITEMS_B = ReadProperties.getInstance().getProperty("SQOOP_MTL_SYSTEM_ITEMS_B"); 

    public static void main(String[] args) { 

     ThreadPoolExecutor executors = (ThreadPoolExecutor) Executors.newFixedThreadPool(5); 
//  ThreadPoolExecutor executors = (ThreadPoolExecutor) Executors.newFixedThreadPool(1); 

     System.out.println("at executors step"); 
     List<String> getlist = getList(); 
     Iterator<Integer> itr2 = threadid.iterator(); 

     for (Iterator<String> itr = getlist.iterator(); itr.hasNext() && itr2.hasNext();) { 
      String element = (String) itr.next(); 
      int thread_id = itr2.next(); 
      String[] command = { "ssh", "hddev-c01-edge-02", "\"" + element + "\"" }; 
      System.out.println("the command is as below "); 
      System.out.println(Arrays.toString(command)); 
      System.out.println("inside the iterator"); 
      ParallelExecutor pe = new ParallelExecutor(command, thread_id, map); 
      executors.execute(pe); 
     } 
     // executors.shutdown(); 
     for(Map.Entry<Integer, String> entry: map.entrySet()) 
     { 
      Integer key = entry.getKey(); 
      String value = entry.getValue();    
      System.out.println("The key is " + key + " The value is " + value); 
      System.out.println("Thread " + key + " is terminated"); 
     } 

    } 

    public static List<String> getList() { 
     List<String> commandlist = new ArrayList<String>(); 
     System.out.println("inside getList"); 
     commandlist.add(SQOOP_XXCCS_DS_SAHDR_CORE); 
     commandlist.add(SQOOP_XXCCS_DS_CVDPRDLINE_DETAIL); 
     commandlist.add(SQOOP_XXCCS_DS_INSTANCE_DETAIL); 
     commandlist.add(SQOOP_XXCCS_SCDC_PRODUCT_PROFILE); 
     commandlist.add(SQOOP_MTL_SYSTEM_ITEMS_B); 
     return commandlist; 
    } 

} 

運行的類:

package com.cisco.executor; 

import java.io.BufferedReader; 
import java.io.InputStreamReader; 
import java.util.concurrent.ConcurrentHashMap; 

import org.apache.log4j.Logger; 

public class ParallelExecutor implements Runnable { 

    private static Logger LOGGER = Logger.getLogger(ParallelExecutor.class); 

    String[] command; 
    int threadid; 
    ConcurrentHashMap<Integer, String> map; 

    public ParallelExecutor(String[] command, int threadid, ConcurrentHashMap<Integer, String> map) { 
     this.command = command; 
     this.threadid = threadid; 
     this.map = map; 
    } 

    @Override 
    public void run() { 
     ProcessBuilder processbuilder = new ProcessBuilder(command); 
     LOGGER.info(command); 
     try { 
      Process process = processbuilder.inheritIO().start(); 
      System.out.println("inside process builder "); 
      process.waitFor(); 
      BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream())); 
      String readline; 
      while ((readline = reader.readLine()) != null) { 
       LOGGER.info(readline); 
      } 
      // getting the thread state and adding it to a collection 
      Thread.State state = Thread.currentThread().getState(); 
      if (state == Thread.State.TERMINATED) { 
       map.put(threadid, "TERMINATED"); 
      } 
     } catch (Exception e) { 
      LOGGER.error(e.getMessage()); 
     } 
    } 

} 

是我的錯誤執行。有人可以幫我實施。

+0

您預計何時會有線程說它已終止?如果你死了,你能告訴我你已經死了嗎? –

+0

是的,你是絕對正確的。我錯過了這個邏輯。那麼我應該抓住狀態並做一個入口? – dataEnthusiast

+0

我想要做的就是捕獲線程ID和執行線程後的狀態,並將其放入地圖中以輪詢地圖。 – dataEnthusiast

回答

0

ThreadPoolExecutor不會終止,直到它被要求這樣做。 所以,首先你必須調用

// executors.shutdown(); 

,你不停的評論。 第二,你需要等待線程正常終止。對於添加一個循環,之前(Map.Entry的條目:map.entrySet())

while (!es.isTerminated()) { 
     } 

但是,因爲一個線程可能會運行許多可運行,如果我讓你正確地要更新一次的CHM一個Runnable完成它的執行。

要做到這一點,你必須使用CustomThread類。擴展線程 並覆蓋只有1個方法,afterExecute()從你需要把代碼更新CHM與Runnable的ID和終止狀態。但請記住,這意味着完成傳遞的Runnables run()方法,而不是底層的Thread的終止。

1

而不是試圖捕獲線程中的線程結果(尤其是在拋出異常/錯誤時容易出錯)我建議您保留Future對象並檢查它們。

ExecutorService exec = Executors.newFixedThreadPool(5); 

    System.out.println("at executors step"); 
    Map<String, Future<?>> results = new HashMap<>(); 
    for (String element : getList()) { 
     String[] command = { "ssh", "hddev-c01-edge-02", "\"" + element + "\"" }; 
     results.put(element, exec.submit(new ParallelExecutor(command, thread_id, map))); 
    } 
    for(Map.Entry<String, Future<?>> entry: map.entrySet()) { 
     try { 
      entry.getValue().get(); 
      System.out.println(entry.getKey()+ " is complete"); 
     } catch (ExecutionException e) { 
      System.out.println(entry.getKey()+ " failed with"); 
      e.getCause().printStackTrace(System.out); 
     } 
    } 
+0

我會嘗試實現這一點。感謝您的片段! – dataEnthusiast

+1

@dataEnthusiast:如Peter Lawrey所建議的,保留Future Object並檢查結果。我也喜歡同樣的解決方案。 –

+0

當然,我會做。 – dataEnthusiast