2016-12-01 35 views
-1

我嘗試處理使用Java中的一個ForkJoinPool的圖像。我使用流在圖像上執行一些自定義操作。我試圖用ForkJoinPoolgetRGBsetRGB方法。如何在getRGB方法上實現並行性?ForkJoinPool BufferedImage的處理方式

@Override 
    public int[] getRGB(int xStart, int yStart, int w, int h, int[] rgbArray,int offset, int scansize) { 

     int[][] sol = new int[h][w]; 

     int threshold = w; 

     class RecursiveSetter extends RecursiveAction { 
      int from; 
      int to; 
      FJBufferedImage image; 

      RecursiveSetter(int from, int to, FJBufferedImage image) { 
       this.from = from; 
       this.to = to; 
       this.image = image; 
      } 

      @Override 
      protected void compute() { 
       System.out.println("From : " + from + " To : " + to); 
       if (from >= to) return; 

       if (to - from == 1) { 
        computeDirectly(from); 
        return; 
       } else { 
        int mid = from + (to - from)/2; 
        System.out.println("From : " + from + " To : " + to + 
          "Mid :" + mid); 
        invokeAll(
          new RecursiveSetter(from, mid, image), 
          new RecursiveSetter(mid + 1, to, image)); 
        return; 
       } 
      } 

      void computeDirectly(int row) { 
       sol[from] = image.getRealRGB(from, 0, w, 1, null, offset, 
         scansize); 

      } 
     } 

     ForkJoinPool pool = new ForkJoinPool(Runtime.getRuntime().availableProcessors()); 
     pool.invoke(new RecursiveSetter(0, h-1, this)); 
     return Arrays.stream(sol) 
       .flatMapToInt(Arrays::stream) 
       .toArray(); 
    } 

getRealRGB只是代理到的BufferedImage的方法。我知道這可能是不切實際的,但我只想知道如何在這種情況下使用ForkJoinPool。是的,上面的代碼是拋出ArrayIndexOutOfBound異常。請給出關於如何分割工作量的建議(行與列與小格,現在我正在分割),以及如何確定閾值。

回答

3

首先對您嘗試一些言論:

int[][] sol = new int[h][w]; 

這裏您要創建一個二維數組,在Java是一維數組與已經填充了int[]類型的子數組元素類型int[] 。由於您要覆蓋sol[from] = /* something returning an int[] array */的元素,因此分配這些子陣列已過時。因此,在這種情況下,你應該使用

int[][] sol = new int[h][]; 

代替。但是,認識到外部陣列的一維的性質也允許認識到一個簡單的流媒體解決方案將做的工作,即

int[][] sol = IntStream.range(yStart, yStart+h) 
    .parallel() 
    .mapToObj(y -> getRealRGB(xStart, y, w, 1, null, 0, scansize)) 
    .toArray(int[][]::new); 

這已經做的可用內核分配工作量的工作。它在幕後使用Fork/Join框架,就像您試圖做的那樣,但這是一個實現細節。您可以將其與下一個流操作相融合,例如

return IntStream.range(yStart, yStart+h) 
    .parallel() 
    .flatMap(y -> Arrays.stream(getRealRGB(xStart, y, w, 1, null, 0, scansize))) 
    .toArray(); 

不過,如果我理解正確的方法簽名,你真正想要做的

public int[] getRGB(
     int xStart, int yStart, int w, int h, int[] rgbArray, int offset, int scansize) { 

    final int[] result = rgbArray!=null? rgbArray: new int[offset+h*scansize]; 
    IntStream.range(yStart, yStart+h).parallel() 
     .forEach(y -> getRealRGB(xStart, y, w, 1, result, offset+y*scansize, scansize)); 
    return result; 
} 

履行合同。這也最大限度地減少了複製操作的次數。由於每個查詢寫入數組的不同區域,直接寫入目標數組是線程安全的。

這僅保持行的分裂範圍的策略。行分拆是可能的,但更復雜,而很少有償還。這隻會有助於調用者請求行數很少但每行很多值的情況。但即使如此,由於存儲器的局部性問題,並不清楚複雜的子行分裂是否會得到回報。


關於你原來的問題,如果直接實行ForkJoinTask,你可以使用getSurplusQueuedTaskCount(),以決定是否再次分裂或直接計算。

門檻的選擇開銷之間的權衡,由於有同步和核心利用率任務對象的數量。如果工作負載可以完美平衡分配,並且其他不相關的線程或進程使用CPU時間,那麼每個內核只有一個項目是完美的。但實際上,這些任務永遠不會完全同時運行,因此需要先由完成的那些內核執行一些備用分割任務。典型的閾值在1或3之間(記住這是每個核心的排隊任務的數量),對於你的任務類型,具有非常均勻的工作量,可以使用較小的數量,例如,一旦有另一個隊列項目就停止分割。