2015-05-08 47 views
2

我在使用多線程編程的數據一致性方面存在問題。多線程程序中的數據不一致

使用案例:我將在隊列中獲取消息(人員信息)。我有一個多線程代碼,它從隊列中獲取數據並將其放入另一個數據庫。在這裏,我需要比較個人信息,如果有任何重複,我需要合併/更新並將其插入另一個數據庫。

問題:如果兩個相似的人的對象是在兩個不同的線程在同一時間,雙方對待這個人,因爲它是不存在的第二個數據庫,都試圖插入它 - 所以在這裏,我們將有重複記錄。

我該如何解決上述問題?

從概念上講,如果我知道該怎麼做,我可以在Java中編寫代碼或者使用Apache風暴並運行並行進程。

+1

您將需要shyncronize線程。 – Dagriel

+0

'消息'隊列會有重複嗎? –

+0

我有類似的問題。所以我用了Distrobuter。它基本上抓住一個任務,如果沒有類似的任務正在處理ATM,然後它創建一個新的線程,並給它的任務..但在我的情況下DISTROBOBER可以創建多達400個線程 – nafas

回答

1

可能的解決方案:

  1. 檢查複製當u插入到隊列中。除隊列外還保留一個散列表。每次你插入隊列,檢查數據是否已經在散列表中。如果是的話,丟棄插入。插入的複雜度仍然是O(1),但會增加內存成本。

  2. 根據散列值插入到多個隊列,而不是插入到單個隊列中。一個消費者線程處理一個隊列。這也是維護時間序列數據的常用方法。

0

很久以前,我寫了一個簡單的鎖定機制,鎖定對象的值而不是實例。這有點慢,但是如果你在兩個線程上都有相同的密鑰,它可能會工作。

/* 
* Copyright (c) 2012, Isaiah van der Elst ([email protected]) 
* All rights reserved. 
* 
* Redistribution and use in source and binary forms, with or without 
* modification, are permitted provided that the following conditions are met: 
* 
* - Redistributions of source code must retain the above copyright notice, 
* this list of conditions and the following disclaimer. 
* 
* - Redistributions in binary form must reproduce the above copyright notice, 
* this list of conditions and the following disclaimer in the documentation 
* and/or other materials provided with the distribution. 
* 
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
* POSSIBILITY OF SUCH DAMAGE. 
*/ 

package org.gearman.impl.util; 
import java.util.HashMap; 
import java.util.Map; 


/** 
* A simple lock based on the value of an object instead of the object's instance. 
* 
* The synchronizing problem in the server is that sometimes it's required to 
* synchronize on a key value for a hash table. However, the key being used will 
* never be the same instance from one thread to another, and synchronizing on the 
* hash table itself will be too slow. Currently synchronization is done by this 
* lock which locks based on Object value, not the Object's instance. 
* 
* Synchronization could have been done using primitive Objects, like Integers, 
* but I decided not to because this program is designed to be embedded. That 
* kind of synchronization may interfere with the wrapping program, possibly 
* causing a deadlock that is impossible to find. 
* 
* @author isaiah.v 
*/ 
public class EqualsLock { 

     /** The set of all keys and lock owners */ 
     private final Map<Object, Thread> keys = new HashMap<Object, Thread>(); 

     /** 
     * Accrues a lock for the given key. If this thread acquires a lock with 
     * key1, any subsequent threads trying to acquire the lock with key2 will 
     * block if key1.equals(key2). If key1.equals(key2) is not true, the 
     * subsequent thread will acquire the lock for key2 and continue execution. 
     * 
     * @param key 
     *    The key 
     */ 
     public final void lock(final Object key) { 
       boolean isInterrupted = false; 

       try { 
         synchronized(keys){ 

           while(!acquireLock(key, Thread.currentThread())) { 
             keys.wait(); 
           } 
         } 

       } catch (InterruptedException e) { 
         // Ignore the interruption until we've finished 
         isInterrupted = Thread.interrupted(); 
       } 

       if(isInterrupted) { 
         // re-interrupt thread if an interruption occured 
         Thread.currentThread().interrupt(); 
       } 
     } 

     /** 
     * Acquires the lock only if it is free at the time of invocation. 
     * 
     * Acquires the lock if it is available and returns immediately with the 
     * value true. If the lock is not available then this method will return 
     * immediately with the value false. 
     * 
     * @param key 
     *    The key to acquire the lock 
     * @return  
     *    true if the lock was acquired, false if the lock was not acquired 
     */ 
     public final boolean tryLock(final Object key) { 
       synchronized(keys) { 
         return acquireLock(key, Thread.currentThread()); 
       } 
     } 

     /** 
     * Releases the lock of the given key. The lock is only released if the 
     * calling thread owns the lock for the given key 
     * 
     * @param key The key 
     */ 
     public final void unlock(final Object key) { 
       synchronized(keys){ 
         if(keys.get(key)==Thread.currentThread()) { 
           keys.remove(key); 
           keys.notifyAll(); 
         } 
       } 
     } 

     /** 
     * Adds the (Object,Thread) pair if the key is not already in the key set. 
     * 
     * @param key The key to add 
     * @param t    The Thread to be associated with the key 
     * @return 
     *    true if the Thread t and the Object key is successfully added, or 
     *    Thread t is already associated with Object key. false if the Object 
     *    key has already been added but Thread t is not associated with it. 
     */ 
     private final boolean acquireLock (final Object key, final Thread t) { 
       final Thread value = keys.get(key); 

       if(value == t) 
         return true; 
       if(value != null) 
         return false; 

       keys.put(key, t); 
       return true; 
     } 
} 
+1

注意:如果線程在鎖定時被中斷,我想我會看到一個錯誤。 –

0

如果您使用的數據庫支持事務和事務隔離,那麼您可以依賴它。您可能需要使用Serializable隔離級別來避免Phantom Reads。每個驗證+更新/插入操作應該在單個事務中執行。

說明: 您描述的問題是一種併發效應。它被稱爲幻影讀取。想象一下,首先通過使用select查詢來檢查數據庫表是否已經包含名爲「test」的人員。該查詢返回一個空的結果集。所以你決定把這個人插入數據庫。與此同時,在您發出選擇查詢之後,但在您發出插入查詢之前,另一個線程正在嘗試執行相同的操作,即檢查數據庫是否包含名稱爲「test」的人員。第二個線程將該人員插入數據庫。如果第一個線程在執行插入後發出相同的選擇查詢,並且觀察到有2行而不是預期1(它剛剛插入),那就是幻像讀取。您可以在此維基百科文章中閱讀有關隔離和併發效果的更多信息Isolation (database systems)

如果您的數據庫不支持事務或可序列化隔離級別,則需要自行同步。如果所有線程都在單個JVM中,則可以使用synchronized關鍵字或ReentrantReadWriteLock。如果線程位於不同的JVM中,則可以使用分佈式鎖定服務(Terracotta或Hazelcast)。