2014-07-05 16 views
0

With code:如何在所有節點上同時執行G​​ridCache.forEach()和分區?

try (Grid grid = GridGain.start(AnswerYagoFactTests.class.getResource("yago.gridgain.xml"))) { 
    GridCache<Integer, YagoRule> cache = grid.cache("yagoRules"); 
    grid.compute().broadcast(new Runnable() { 
     @Override 
     public void run() { 
      try { 
       log.info("Cache formerly has size={} offheap={} overflow={}", 
         cache.size(), cache.offHeapEntriesCount(), cache.overflowSize()); 
      } catch (GridException e) { 
       log.error("Cannot get overflow size", e); 
      } 
     } 
    }).get(); 

    log.info("1 is {}", cache.get(1)); 
    grid.compute().apply(new GridClosure<String, String>() { 
     @Override 
     public String apply(String e) { 
      log.info("Found {} YAGO rules", cache.size()); 
      cache.forEach(new GridInClosure<GridCacheEntry<Integer,YagoRule>>() { 
       @Override 
       public void apply(GridCacheEntry<Integer, YagoRule> e) { 
        log.info("Processing rule #{} {}", e.getKey(), e.getValue()); 
       } 
      }); 
      return null; 
     } 
    }, msg).get(); 
} 

在3節點配置。 GridGain選擇一個節點(似乎是隨機的),然後僅處理該節點中的每個「處理規則」。

我想要做的是使forEach並行,所以理想情況下爲3個節點和30個條目,每個節點應該處理10個條目。緩存是partitioned,所以每個節點都有自己的條目。

回答

1

在您的代碼中,而不是grid.compute().apply(...)嘗試使用grid.compute().broadcast(...),封閉將廣播到網格中的所有節點。

此外,如果你需要遍歷僅通過初級組(不包括備份),你可以做如下:

grid.compute().broadcast(new GridClosure<String, Integer>() { 
    @Override public String apply(String e) { 
     for (GridCacheEntry<Integer, YagoRule> e : cache.primaryEntrySet() 
      log.info("Processing rule #{} {}", e.getKey(), e.getValue()); 
    } 
}, msg).get(); 
+0

如預期,因爲會有重複這是行不通的。即node1包含A,B,C,node2包含B,C,D。我想要處理這4個條目(不管是誰)。 –

+0

@HendyIrawan聽起來像你需要使用「primaryEntrySet()」方法。我在我的答案中更新了這個例子。 – Dmitriy

+0

謝謝! :)這會創造奇蹟 –

相關問題