2013-10-28 36 views
1

我在想如何同步下面的多線程,以便我仍然可以並行運行線程而不會導致保存多個對象。如何同步多線程映射更新

public void setupRender() { 
    ExecutorService executorService = Executors.newFixedThreadPool(10); 

    final Map<Integer, String> map = Collections.synchronizedMap(new HashMap<Integer, String>()); 

    for (int i = 0; i < 10; i++) { 
     executorService.execute(new Runnable() { 
      @Override 
      public void run() { 
       parse(map); 
      } 
     }); 
     System.out.println("map size " + map.size() + " loop " + i); 
    } 
} 

public void parse(Map<Integer, String> map) { 
    for (int j = 0; j < 100; j++) { 
     if (map.containsKey(j)) { 
      System.out.println("Update"); 
      //session.update(map.getKey(j)); 
     } else { 
      System.out.println("add to map " + j); 
      String obj = "test"; 
      map.put(j, obj); 
      System.out.println("save"); 
      //session.save(obj); 
     } 

     if (j % 50 == 0) { 
      System.out.println("commit"); 
      //tx.commit(); 
     } 
    } 
} 

結果,請注意同一個密鑰的多個對象添加到地圖中,但更糟糕的是保存到導致重複數據庫enteries數據庫。

map size 0 loop 0 
map size 0 loop 1 
add to map 0 
commit 
add to map 1 
add to map 0 
commit 
add to map 2 
add to map 3 
add to map 4 
add to map 2 
add to map 5 
add to map 6 
map size 1 loop 2 
add to map 7 
add to map 8 
add to map 9 
commit 
map size 10 loop 3 
commit 
add to map 5 
+0

你可以只設置在DB驅動程序的事務隔離級別? – MadConan

+0

當db僅僅將它看作是一個全新的對象時,我不明白這是如何工作的。 –

回答

1

您可以使用ConcurrentHashMapputIfAbsent來保證一致性和性能。在這種情況下,會話更新可以同時運行。

putIfAbsent: 
    - returns the previous value associated with the specified key, 
     or null if there was no mapping for the key 

public void parse(ConcurrentHashMap<Integer, String> map) { 
    for (int j = 0; j < 100; j++) {  
     String obj = "test"; 

     Object returnedValue = map.putIfAbsent(j, obj); 

     boolean wasInMap = returnedValue != null; 

     if (wasInMap) { 
      System.out.println("Update"); 
      //session.update(map.getKey(j)); 
     } else { 
      System.out.println("add to map " + j); 

      map.put(j, obj); 
      System.out.println("save"); 
      //session.save(obj); 
     } 
    } 
} 

你也可以把有這將創造條件的新實例的回調:

class Callback{ 
    abstract void saveOrUpdate(boolean wasInMap);  
} 

public void parse(Map<Integer, String> map) { 
    for (int j = 0; j < 100; j++) {  
     String obj = "test"; 

     Callback callback = (wasInMap) -> { //Java 8 syntax to be short 
      if (wasInMap) { 
       System.out.println("Update"); 
       //session.update(map.getKey(j)); 
      } else { 
       System.out.println("add to map " + j); 

       map.put(j, obj); 
       System.out.println("save"); 
       //session.save(obj); 
      } 
     } 

     Object returnedValue = map.putIfAbsent(j, callback); 

     callback.saveOrUpdate(returnedValue != null); 
    } 
} 
+0

我覺得你的代碼中缺少一些東西,我不確定map.putIfAbsent在哪裏被填充。這是來自我的原始地圖嗎?另外,這與Tarik的答案相比如何?我在我的例子中唯一遺漏的是我有幾個這樣的地圖,我存儲了數據庫查找值。我只是這樣做,以防止數百萬往返數據庫。 –

+0

*更新了代碼*。我已經從if中移除了新實例創建。與Tarik的回答相比,我需要這樣做。在這種情況下,會話更新期間不會鎖定,因此它應該更具性能。 –

+0

我正在測試您的代碼,因此它似乎正在工作。我想在做出答案之前做一些額外的測試。 –

1

使用syncronized關鍵字來確保檢查j是否在地圖中,如果沒有,則添加它,是自動完成的。然而,它破壞了多線程的目的,因爲當一個線程正在循環時,大多數線程將被阻塞。

public void setupRender() { 
    ExecutorService executorService = Executors.newFixedThreadPool(10); 

final Map<Integer, String> map = Collections.synchronizedMap(new HashMap<Integer, String>()); 

for (int i = 0; i < 10; i++) { 
    executorService.execute(new Runnable() { 
     @Override 
     public void run() { 
      parse(map); 
     } 
    }); 
    System.out.println("map size " + map.size() + " loop " + i); 
} 
} 

public void parse(Map<Integer, String> map) { 
    synchronized(map) { 
    for (int j = 0; j < 100; j++) { 
     if (map.containsKey(j)) { 
      System.out.println("Update"); 
      //session.update(map.getKey(j)); 
     } else { 
      System.out.println("add to map " + j); 
      String obj = "test"; 
      map.put(j, obj); 
      System.out.println("save"); 
      //session.save(obj); 
     } 
     } 

     if (j % 50 == 0) { 
      System.out.println("commit"); 
      //tx.commit(); 
     } 
    } 
} 
+0

線程是否仍然平行運行?另外,同步時有多少性能損失?最後在這個例子中,我只顯示一張地圖,在我的真實世界代碼中有幾張這樣的地圖,在這種情況下你將如何管理它。 –

+0

是的,它們並行運行,但大多數線程將在synchronized語句中被阻塞,而只有一個線程將循環。在這種特殊情況下,它使多線程在性能方面完全無用。如果有多個地圖,你可能確實會獲得一些東西,但是你需要向我們展示一些代碼。 – Tarik

+0

是的,如果我不能並行運行線程,它就變得毫無用處。不幸的是,我沒有與我的實際代碼,但我會盡我所能解釋我在做什麼。每晚我登錄到數百個ftp服務器並抓取csv文件進行解析。地圖中的數據是查找數據,示例年,品牌,型號數據。我處理了數百萬行,所以不是每次查詢都要查詢數據庫,我只是用數據初始化地圖,然後根據地圖進行搜索,如果它不存在,我只需將它添加並每250行提交給數據庫。我試圖一次處理多個csv文件。 –