我有兩個過程如下所示。我的每一個過程都有run
和shutdown
方法如何確保每個線程在不同的過程對象上工作以避免線程安全問題?
Process processA = new ProcessA("processA", getProcessAProperties());
Process processB = new ProcessB("processB", getProcessBProperties());
- 我想爲每個進程不同的線程池配置,以便ProcessA在自己的線程池運行和進程B在自己的線程池中運行相互獨立的。
- 而且我無法在自己的線程池的每個線程之間共享Process對象。
下面是我如何運行我自己的線程池ProcessA
的簡單示例。有三個線程,每個線程都有自己的Process對象來處理。現在我想以更通用的方式擴展它,以便它可以用於我的兩個進程,如上所示。
public static void main(String[] args) {
int numberOfThreads = 3;
ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads);
final List<Process> processes = new ArrayList<>();
for (int i = 0; i < numberOfThreads; i++) {
// each thread works on different Process object
Process processA = new ProcessA("processA", getProcessAProperties());
processes.add(processA);
executor.submit(processA);
}
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
for (Process process : processes) {
process.shutdown();
}
executor.shutdown();
try {
executor.awaitTermination(5000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
e.printStackTrace;
}
}
});
}
所以我創建了一個進程處理程序來解決一個通用的方法上述問題,但有一個線程安全問題在這裏:
public final class ProcessHandler {
private final ExecutorService executorServiceProcess;
private final Process process;
private final Thread shutdownHook = new Thread() {
@Override
public void run() {
process.shutdown();
executorServiceProcess.shutdown();
}
};
// in this constructor my code is reusing the same
// process instance for each thread in the pool
// which is a problem for my application, how to fix this?
public ProcessHandler(Process process, int poolSize) {
this.executorServiceProcess = Executors.newFixedThreadPool(poolSize);
this.process = process;
Runtime.getRuntime().addShutdownHook(shutdownHook);
for (int i = 0; i < poolSize; i++) {
executorServiceProcess.submit(process);
}
}
public void shutdown() {
Runtime.getRuntime().removeShutdownHook(shutdownHook);
shutdownHook.start();
try {
shutdownHook.join();
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
}
這是現在我的主要方法的樣子:
public static void main(String[] args) {
Process processA = new ProcessA("processA", getProcessAProperties());
Process processB = new ProcessB("processB", getProcessBProperties());
// processA will run with three threads in its own thread pool
ProcessHandler processHandlerA = new ProcessHandler (processA, 3);
// processB will run with two threads in its own thread pool
ProcessHandler processHandlerB = new ProcessHandler (processB, 2);
// now I can call shutdown on them
processHandlerA.shutdown();
processHandlerB.shutdown();
}
正如你可以看到我上面的ProcessHandler
構造,我重複使用相同的流程實例中,是不是我想要做的池中的每個線程。我希望每個線程都能在Process對象的不同實例上工作,就像我在ProcessA的第一個主要方法中所使用的一樣,每個線程都在處理不同的Process對象。
解決此設計問題的最佳方法是什麼?我也打開重新設計我的ProcessHandler以正確的方式解決這個問題。
我不明白。爲什麼你多次提交相同的'Process'?簡單的答案是「不要」。 – EJP
@EJP每個進程都是一個kafka消費者,而kafka消費者不是線程安全的,所以我不能在線程之間共享它們。這是我正在按照[文章]中所示(https://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0-9-consumer-client/ )。搜索「把它放在一起」。他們有一個例子,其中每個線程在不同的'ConsumerLoop'對象上工作。我已經用完全相同的例子來說明我想要做什麼。 – john
順便說一句,它看起來相同的過程,但當它與下面的kafka交談時,它將從不同分區獲得不同的數據,所以如果我有兩個線程,每個線程都有自己的進程對象,並且每個線程將從kafka獲得不同的數據。這是如何工作的。 – john