2016-12-18 42 views
0

我有兩個過程如下所示。我的每一個過程都有runshutdown方法如何確保每個線程在不同的過程對象上工作以避免線程安全問題?

Process processA = new ProcessA("processA", getProcessAProperties()); 
Process processB = new ProcessB("processB", getProcessBProperties()); 
  • 我想爲每個進程不同的線程池配置,以便ProcessA在自己的線程池運行和進程B在自己的線程池中運行相互獨立的。
  • 而且我無法在自己的線程池的每個線程之間共享Process對象。

下面是我如何運行我自己的線程池ProcessA的簡單示例。有三個線程,每個線程都有自己的Process對象來處理。現在我想以更通用的方式擴展它,以便它可以用於我的兩個進程,如上所示。

public static void main(String[] args) { 
    int numberOfThreads = 3; 
    ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads); 

    final List<Process> processes = new ArrayList<>(); 
    for (int i = 0; i < numberOfThreads; i++) { 
    // each thread works on different Process object 
    Process processA = new ProcessA("processA", getProcessAProperties()); 
    processes.add(processA); 
    executor.submit(processA); 
    } 

    Runtime.getRuntime().addShutdownHook(new Thread() { 
    @Override 
    public void run() { 
     for (Process process : processes) { 
     process.shutdown(); 
     } 
     executor.shutdown(); 
     try { 
     executor.awaitTermination(5000, TimeUnit.MILLISECONDS); 
     } catch (InterruptedException e) { 
     e.printStackTrace; 
     } 
    } 
    }); 
} 

所以我創建了一個進程處理程序來解決一個通用的方法上述問題,但有一個線程安全問題在這裏:

public final class ProcessHandler { 
    private final ExecutorService executorServiceProcess; 
    private final Process process; 
    private final Thread shutdownHook = new Thread() { 
    @Override 
    public void run() { 
     process.shutdown(); 
     executorServiceProcess.shutdown(); 
    } 
    }; 

    // in this constructor my code is reusing the same 
    // process instance for each thread in the pool 
    // which is a problem for my application, how to fix this? 
    public ProcessHandler(Process process, int poolSize) { 
    this.executorServiceProcess = Executors.newFixedThreadPool(poolSize); 
    this.process = process; 
    Runtime.getRuntime().addShutdownHook(shutdownHook);  
    for (int i = 0; i < poolSize; i++) { 
     executorServiceProcess.submit(process); 
    } 
    } 

    public void shutdown() { 
    Runtime.getRuntime().removeShutdownHook(shutdownHook); 
    shutdownHook.start(); 
    try { 
     shutdownHook.join(); 
    } catch (InterruptedException ex) { 
     Thread.currentThread().interrupt(); 
    } 
    } 
} 

這是現在我的主要方法的樣子:

public static void main(String[] args) { 

    Process processA = new ProcessA("processA", getProcessAProperties()); 
    Process processB = new ProcessB("processB", getProcessBProperties()); 

    // processA will run with three threads in its own thread pool 
    ProcessHandler processHandlerA = new ProcessHandler (processA, 3); 
    // processB will run with two threads in its own thread pool 
    ProcessHandler processHandlerB = new ProcessHandler (processB, 2); 

    // now I can call shutdown on them 
    processHandlerA.shutdown(); 
    processHandlerB.shutdown(); 
} 

正如你可以看到我上面的ProcessHandler構造,我重複使用相同的流程實例中,是不是我想要做的池中的每個線程。我希望每個線程都能在Process對象的不同實例上工作,就像我在ProcessA的第一個主要方法中所使用的一樣,每個線程都在處理不同的Process對象。

解決此設計問題的最佳方法是什麼?我也打開重新設計我的ProcessHandler以正確的方式解決這個問題。

+0

我不明白。爲什麼你多次提交相同的'Process'?簡單的答案是「不要」。 – EJP

+0

@EJP每個進程都是一個kafka消費者,而kafka消費者不是線程安全的,所以我不能在線程之間共享它們。這是我正在按照[文章]中所示(https://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0-9-consumer-client/ )。搜索「把它放在一起」。他們有一個例子,其中每個線程在不同的'ConsumerLoop'對象上工作。我已經用完全相同的例子來說明我想要做什麼。 – john

+0

順便說一句,它看起來相同的過程,但當它與下面的kafka交談時,它將從不同分區獲得不同的數據,所以如果我有兩個線程,每個線程都有自己的進程對象,並且每個線程將從kafka獲得不同的數據。這是如何工作的。 – john

回答

1

也許嘗試這樣的事:

// Replace Process process by a list of Process 
List<Process> processes = new ArrayList<Process>(); 

private final Thread shutdownHook = new Thread() { 
    @Override 
    public void run() { 
     for (Process process : processes) 
     process.shutdown(); 
     executorServiceProcess.shutdown(); 
    } 
}; 

public ProcessHandler(Process process, int poolSize) { 
    this.executorServiceProcess = Executors.newFixedThreadPool(poolSize); 
    Runtime.getRuntime().addShutdownHook(shutdownHook);  
    for (int i = 0; i < poolSize; i++) { 
     // Get a deep copy of the process 
     Process p = process.clone(); 
     processes.add(p); 
     executorServiceProcess.submit(p); 
    } 
} 

的另一種方式,沒有克隆方法,是抽象的過程,並根據這些添加兩個構造函數, 別忘了根據這些來適應你的代碼元素

public abstract class AProcess extends Process { 

    private String name; 
    private Properties properties; 
    public AProcess(String name, Properties properties) 
    { 
     this.setName(name); 
     this.setProperties(properties); 
    } 

    public String getName() { 
     return name; 
    } 

    public void setName(String name) { 
     this.name = name; 
    } 

    public Properties getProperties() { 
     return properties; 
    } 

    public void setProperties(Properties properties) { 
     this.properties = properties; 
    } 
} 

現在,您可以實現您的ProcessA和進程B:

public class ProcessA extends AProcess { 

    public ProcessA(String name, Properties properties) 
    { 
     super(name, properties); 
    } 
} 

現在創建您的ProcessHandler:

public final class ProcessHandler { 
    private final ExecutorService executorServiceProcess; 
    private final List<AProcess> processes = new ArrayList<AProcess>(); 
    private final Thread shutdownHook = new Thread() { 
    @Override 
    public void run() { 
     for (AProcess process : processes) 
      process.shutdown(); 
     executorServiceProcess.shutdown(); 
    } 
    }; 

    public ProcessHandler(ProcessA process, int poolSize) { 
    this.executorServiceProcess = Executors.newFixedThreadPool(poolSize); 
    Runtime.getRuntime().addShutdownHook(shutdownHook);  
    for (int i = 0; i < poolSize; i++) { 
     ProcessA p = new ProcessA(process.getName(), process.getProperties()); 
     processes.add(p); 
     executorServiceProcess.submit(p); 
    } 
    } 

    public ProcessHandler(ProcessB process, int poolSize) { 
    this.executorServiceProcess = Executors.newFixedThreadPool(poolSize); 
    Runtime.getRuntime().addShutdownHook(shutdownHook);  
    for (int i = 0; i < poolSize; i++) { 
     ProcessB p = new ProcessB(process.getName(), process.getProperties()); 
     processes.add(p); 
     executorServiceProcess.submit(p); 
    } 
    } 

    public void shutdown() { 
    Runtime.getRuntime().removeShutdownHook(shutdownHook); 
    shutdownHook.start(); 
    try { 
     shutdownHook.join(); 
    } catch (InterruptedException ex) { 
     Thread.currentThread().interrupt(); 
    } 
    } 
} 

而且使用這樣的ProcessHandler processHandlerA = new ProcessHandler(processA, 3);

+0

除克隆對象外,還有其他方法嗎?我可以相應地重新設計我的ProcessHandler類。 – john

+0

我已經添加了一種方法來實現它,而不需要克隆,但是通過使用構造函數創建對象的新實例,它有點不舒服,而且您需要適應您的存在代碼 – Xephi

相關問題