我正在嘗試使用技術來確保使用Java Executor框架執行的併發任務完成副作用的可見性。 作爲一個簡單的場景,考慮一個矩陣乘法的假設問題。假設乘法的矩陣可能相當大(例如,幾千行和幾列),並且爲了加速這些矩陣的乘法,我實現了併發算法,其中結果矩陣中每個單元的計算是視爲一項獨立(即可並行)的任務。爲了簡化一下,讓我們忽略對於小輸入矩陣,這種並行化可能不是一個好主意。保證Java執行框架中併發任務副作用的可見性
所以considere我的程序的第一個版本如下:
public class MatrixMultiplier {
private final int[][] m;
private final int[][] n;
private volatile int[][] result; //the (lazily computed) result of the matrix multiplication
private final int numberOfMRows; //number of rows in M
private final int numberOfNColumns; //number of columns in N
private final int commonMatrixDimension; //number of columns in M and rows in N
public MatrixMultiplier(int[][] m, int[][] n) {
if(m[0].length != n.length)
throw new IllegalArgumentException("Uncompatible arguments: " + Arrays.toString(m) + " and " + Arrays.toString(n));
this.m = m;
this.n = n;
this.numberOfMRows = m.length;
this.numberOfNColumns = n[0].length;
this.commonMatrixDimension = n.length;
}
public synchronized int[][] multiply() {
if (result == null) {
result = new int[numberOfMRows][numberOfNColumns];
ExecutorService executor = createExecutor();
Collection<Callable<Void>> tasks = new ArrayList<>();
for (int i = 0; i < numberOfMRows; i++) {
final int finalI = i;
for (int j = 0; j < numberOfNColumns; j++) {
final int finalJ = j;
tasks.add(new Callable<Void>() {
@Override
public Void call() throws Exception {
calculateCell(finalI, finalJ);
return null;
}
});
}
}
try {
executor.invokeAll(tasks);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
executor.shutdownNow();
}
}
return result;
}
private ExecutorService createExecutor() {
final int availableProcessors = Runtime.getRuntime().availableProcessors();
final int processorsBound = availableProcessors + 1;
final int maxConcurrency = numberOfMRows * numberOfNColumns;
final int threadPoolSize = maxConcurrency < processorsBound ? maxConcurrency : processorsBound;
return Executors.newFixedThreadPool(threadPoolSize);
}
private void calculateCell(int mRow, int nColumn) {
int sum = 0;
for (int k = 0; k < commonMatrixDimension; k++) {
sum += m[mRow][k] * n[k][nColumn];
}
result[mRow][nColumn] = sum;
}
}
據我瞭解有一個與此實現了一個問題:在執行任務的一些修改result
矩陣可能不是必然可見該線程調用multiply()
。
假設以前是正確的,考慮替代實現的multiply()
依靠明確的鎖(新鎖相關的代碼被註釋與//<LRC>
):
public synchronized int[][] multiply() {
if (result == null) {
result = new int[numberOfMRows][numberOfNColumns];
final Lock[][] locks = new Lock[numberOfMRows][numberOfNColumns]; //<LRC>
for (int i = 0; i < numberOfMRows; i++) { //<LRC>
for (int j = 0; j < numberOfNColumns; j++) { //<LRC>
locks[i][j] = new ReentrantLock(); //<LRC>
} //<LRC>
} //<LRC>
ExecutorService executor = createExecutor();
Collection<Callable<Void>> tasks = new ArrayList<>();
for (int i = 0; i < numberOfMRows; i++) {
final int finalI = i;
for (int j = 0; j < numberOfNColumns; j++) {
final int finalJ = j;
tasks.add(new Callable<Void>() {
@Override
public Void call() throws Exception {
try { //<LRC>
locks[finalI][finalJ].lock(); //<LRC>
calculateCell(finalI, finalJ);
} finally { //<LRC>
locks[finalI][finalJ].unlock(); //<LRC>
} //<LRC>
return null;
}
});
}
}
try {
executor.invokeAll(tasks);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
executor.shutdownNow();
}
for (int i = 0; i < numberOfMRows; i++) { //<LRC>
for (int j = 0; j < numberOfNColumns; j++) { //<LRC>
locks[i][j].lock(); //<LRC>
locks[i][j].unlock(); //<LRC>
} //<LRC>
} //<LRC>
}
return result;
}
上面明確鎖定的使用有獨特的目標,確保發佈對調用線程的更改,因爲不存在爭用的可能性。
我的主要問題是如果這是在我的方案中發佈副作用問題的有效解決方案。
作爲第二個問題:是否有更高效/優雅的方法來解決這個問題?請注意,我並不是在尋找其他算法實現(例如Strassen算法)來平行矩陣乘法,因爲我只是一個簡單的案例研究。我非常感興趣的替代品,以確保像這裏介紹的算法的變化的可見性。
UPDATE
我覺得下面的替代實施提高了對以前的實現。它使用一個單一的內部鎖定的,而不會影響太多併發:
public class MatrixMultiplier {
...
private final Object internalLock = new Object();
public synchronized int[][] multiply() {
if (result == null) {
result = new int[numberOfMRows][numberOfNColumns];
ExecutorService executor = createExecutor();
Collection<Callable<Void>> tasks = new ArrayList<>();
for (int i = 0; i < numberOfMRows; i++) {
final int finalI = i;
for (int j = 0; j < numberOfNColumns; j++) {
final int finalJ = j;
tasks.add(new Callable<Void>() {
@Override
public Void call() throws Exception {
calculateCell(finalI, finalJ);
synchronized (internalLock){}
return null;
}
});
}
}
try {
executor.invokeAll(tasks);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
executor.shutdownNow();
}
}
synchronized (internalLock){}
return result;
}
...
}
這種替代僅僅是更有效,但它和以前的實現,這使得使用許多鎖看起來是正確的給我。我的觀察結果是否正確?在我的場景中是否有更高效/更優雅的方式來處理同步問題?
您可以將這些任務分解爲Fork Join框架嗎?它會處理所有這些類型的可見性問題。 – 2015-02-10 18:55:04
所有的可見性問題都可以通過使用Fork/Join框架自動解決?我不知道,謝謝你的提示。這可能回答我的第二個問題,但不是第一個問題。 – Sergio 2015-02-10 19:00:29