2015-02-10 35 views
0

我正在嘗試使用技術來確保使用Java Executor框架執行的併發任務完成副作用的可見性。 作爲一個簡單的場景,考慮一個矩陣乘法的假設問題。假設乘法的矩陣可能相當大(例如,幾千行和幾列),並且爲了加速這些矩陣的乘法,我實現了併發算法,其中結果矩陣中每個單元的計算是視爲一項獨立(即可並行)的任務。爲了簡化一下,讓我們忽略對於小輸入矩陣,這種並行化可能不是一個好主意。保證Java執行框架中併發任務副作用的可見性

所以considere我的程序的第一個版本如下:

public class MatrixMultiplier { 

    private final int[][] m; 
    private final int[][] n; 
    private volatile int[][] result; //the (lazily computed) result of the matrix multiplication 

    private final int numberOfMRows; //number of rows in M 
    private final int numberOfNColumns; //number of columns in N 
    private final int commonMatrixDimension; //number of columns in M and rows in N 

    public MatrixMultiplier(int[][] m, int[][] n) { 
     if(m[0].length != n.length) 
      throw new IllegalArgumentException("Uncompatible arguments: " + Arrays.toString(m) + " and " + Arrays.toString(n)); 
     this.m = m; 
     this.n = n; 
     this.numberOfMRows = m.length; 
     this.numberOfNColumns = n[0].length; 
     this.commonMatrixDimension = n.length; 
    } 

    public synchronized int[][] multiply() { 
     if (result == null) { 
      result = new int[numberOfMRows][numberOfNColumns]; 

      ExecutorService executor = createExecutor(); 

      Collection<Callable<Void>> tasks = new ArrayList<>(); 
      for (int i = 0; i < numberOfMRows; i++) { 
       final int finalI = i; 
       for (int j = 0; j < numberOfNColumns; j++) { 
        final int finalJ = j; 
        tasks.add(new Callable<Void>() { 
         @Override 
         public Void call() throws Exception { 
          calculateCell(finalI, finalJ); 
          return null; 
         } 
        }); 
       } 
      } 

      try { 
       executor.invokeAll(tasks); 
      } catch (InterruptedException e) { 
       Thread.currentThread().interrupt(); 
      } finally { 
       executor.shutdownNow(); 
      } 

     } 

     return result; 
    } 

    private ExecutorService createExecutor() { 
     final int availableProcessors = Runtime.getRuntime().availableProcessors(); 
     final int processorsBound = availableProcessors + 1; 
     final int maxConcurrency = numberOfMRows * numberOfNColumns; 
     final int threadPoolSize = maxConcurrency < processorsBound ? maxConcurrency : processorsBound; 
     return Executors.newFixedThreadPool(threadPoolSize); 
    } 

    private void calculateCell(int mRow, int nColumn) { 
     int sum = 0; 
     for (int k = 0; k < commonMatrixDimension; k++) { 
      sum += m[mRow][k] * n[k][nColumn]; 
     } 
     result[mRow][nColumn] = sum; 
    } 

} 

據我瞭解有一個與此實現了一個問題:在執行任務的一些修改result矩陣可能不是必然可見該線程調用multiply()

假設以前是正確的,考慮替代實現的multiply()依靠明確的鎖(新鎖相關的代碼被註釋與//<LRC>):

public synchronized int[][] multiply() { 
     if (result == null) { 
      result = new int[numberOfMRows][numberOfNColumns]; 

      final Lock[][] locks = new Lock[numberOfMRows][numberOfNColumns]; //<LRC> 
      for (int i = 0; i < numberOfMRows; i++) { //<LRC> 
       for (int j = 0; j < numberOfNColumns; j++) { //<LRC> 
        locks[i][j] = new ReentrantLock(); //<LRC> 
       } //<LRC> 
      } //<LRC> 

      ExecutorService executor = createExecutor(); 

      Collection<Callable<Void>> tasks = new ArrayList<>(); 
      for (int i = 0; i < numberOfMRows; i++) { 
       final int finalI = i; 
       for (int j = 0; j < numberOfNColumns; j++) { 
        final int finalJ = j; 
        tasks.add(new Callable<Void>() { 
         @Override 
         public Void call() throws Exception { 
          try { //<LRC> 
           locks[finalI][finalJ].lock(); //<LRC> 
           calculateCell(finalI, finalJ); 
          } finally { //<LRC> 
           locks[finalI][finalJ].unlock(); //<LRC> 
          } //<LRC> 
          return null; 
         } 
        }); 
       } 
      } 

      try { 
       executor.invokeAll(tasks); 
      } catch (InterruptedException e) { 
       Thread.currentThread().interrupt(); 
      } finally { 
       executor.shutdownNow(); 
      } 

      for (int i = 0; i < numberOfMRows; i++) { //<LRC> 
       for (int j = 0; j < numberOfNColumns; j++) { //<LRC> 
        locks[i][j].lock(); //<LRC> 
        locks[i][j].unlock(); //<LRC> 
       } //<LRC> 
      } //<LRC> 
     } 

     return result; 
    } 

上面明確鎖定的使用有獨特的目標,確保發佈對調用線程的更改,因爲不存在爭用的可能性。

我的主要問題是如果這是在我的方案中發佈副作用問題的有效解決方案。

作爲第二個問題:是否有更高效/優雅的方法來解決這個問題?請注意,我並不是在尋找其他算法實現(例如Strassen算法)來平行矩陣乘法,因爲我只是一個簡單的案例研究。我非常感興趣的替代品,以確保像這裏介紹的算法的變化的可見性。

UPDATE

我覺得下面的替代實施提高了對以前的實現。它使用一個單一的內部鎖定的,而不會影響太多併發:

public class MatrixMultiplier { 
    ... 
    private final Object internalLock = new Object(); 

    public synchronized int[][] multiply() { 
     if (result == null) { 
      result = new int[numberOfMRows][numberOfNColumns]; 

      ExecutorService executor = createExecutor(); 

      Collection<Callable<Void>> tasks = new ArrayList<>(); 
      for (int i = 0; i < numberOfMRows; i++) { 
       final int finalI = i; 
       for (int j = 0; j < numberOfNColumns; j++) { 
        final int finalJ = j; 
        tasks.add(new Callable<Void>() { 
         @Override 
         public Void call() throws Exception { 
          calculateCell(finalI, finalJ); 
          synchronized (internalLock){} 
          return null; 
         } 
        }); 
       } 
      } 

      try { 
       executor.invokeAll(tasks); 
      } catch (InterruptedException e) { 
       Thread.currentThread().interrupt(); 
      } finally { 
       executor.shutdownNow(); 
      } 

     } 

     synchronized (internalLock){} 

     return result; 
    } 
    ... 
} 

這種替代僅僅是更有效,但它和以前的實現,這使得使用許多鎖看起來是正確的給我。我的觀察結果是否正確?在我的場景中是否有更高效/更優雅的方式來處理同步問題?

+0

您可以將這些任務分解爲Fork Join框架嗎?它會處理所有這些類型的可見性問題。 – 2015-02-10 18:55:04

+0

所有的可見性問題都可以通過使用Fork/Join框架自動解決?我不知道,謝謝你的提示。這可能回答我的第二個問題,但不是第一個問題。 – Sergio 2015-02-10 19:00:29

回答

0

聲明resultvolatile僅確保更改參考result(即result = ...;操作)對每個人都是可見的。

解決此問題的最明顯方法是消除副作用。在這種情況下,這很容易:只需讓calculateCell()Callable調用它返回值並讓主線程將值寫入數組。

你當然可以做明確的鎖定,就像你在第二個例子中做的那樣,但是當你只使用一個鎖時,使用nxm鎖似乎是一種矯枉過正。當然,在你的例子中,一個鎖會消除並行性,所以再一次解決方法是使calculateCell()返回該值並且僅鎖定在將結果寫入result數組中的持續時間。

或者事實上,您可以使用ForkJoin而忘記了整件事情,因爲它會爲您做到這一點。

+0

我認爲我們同意'result'的'volatile'聲明,這是必要的,但只是爲了確保任務能看到賦給'result'的數組。我也意識到我可以使用從Callable返回的結果並將這樣的結果複製回原始數組中,但是如果數組非常大,就像在我的場景中一樣:(關於鎖的數量,我只是更新了我的問題,只是使用了一個鎖而不影響很多併發性。 – Sergio 2015-02-10 20:06:55

+0

@Sergio爲什麼你認爲'Callable'返回結果效率低下?絕大多數時間都應該在並行運行的'calculateResult()'方法 – biziclop 2015-02-10 20:22:19

+0

對不起,你說得對,有些地方感到困惑,它是一個單一的標量值(不是數組),因此只要返回它就可以了它在結果數組中的位置需要計算)。感謝@biziclop的更正。無論如何,你認爲我的兩個替代實現在功能上是正確的嗎?(儘管你所提議的似乎是更好的選擇)。 – Sergio 2015-02-10 20:26:41