1

我有一個班級,其中有一個ConcurrentHashMap,每30秒更新一個線程,然後通過調用getNextSocket()方法從同一個ConcurrentHashMap讀取多個閱讀器線程。從單個線程填充ConcurrentHashMap,然後從多個線程讀取而沒有任何競爭條件?

以下是我的單例類,它在初始化時調用connectToSockets()方法來填充我的ConcurrentHashMap,然後啓動一個後臺線程,通過調用updateSockets()方法每隔30秒更新一次相同的地圖。

然後從多個線程我打電話getNextSocket()方法獲得下一個可用的活套接字使用相同的地圖來獲取信息。我也有SocketInfo類是不可變的,它包含所有套接字的狀態,不管它們是否存在。

public class SocketHolder { 
    private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); 
    private final Map<DatacenterEnum, List<SocketInfo>> liveSocketsByDc = new ConcurrentHashMap<>(); 

    // Lazy Loaded Singleton Pattern 
    private static class Holder { 
    private static final SocketHolder INSTANCE = new SocketHolder(); 
    } 

    public static SocketHolder getInstance() { 
    return Holder.INSTANCE; 
    } 

    private SocketHolder() { 
    connectToSockets(); 
    scheduler.scheduleAtFixedRate(new Runnable() { 
     public void run() { 
     updateSockets(); 
     } 
    }, 30, 30, TimeUnit.SECONDS); 
    } 

    private void connectToSockets() { 
    Map<DatacenterEnum, ImmutableList<String>> socketsByDc = TestUtils.SERVERS; 
    for (Map.Entry<DatacenterEnum, ImmutableList<String>> entry : socketsByDc.entrySet()) { 
     List<SocketInfo> addedColoSockets = connect(entry.getKey(), entry.getValue(), ZMQ.PUSH); 
     liveSocketsByDc.put(entry.getKey(), addedColoSockets); 
    } 
    } 

    private List<SocketInfo> connect(DatacenterEnum dc, List<String> addresses, int socketType) { 
    List<SocketInfo> socketList = new ArrayList<>(); 
    // ... some code here 
    return socketList; 
    } 

    // called from multiple reader threads to get next live available socket 
    public Optional<SocketInfo> getNextSocket() { 
    Optional<SocketInfo> liveSocket = getLiveSocket(liveSocketsByDc.get(DatacenterEnum.CORP)); 
    return liveSocket; 
    } 

    private Optional<SocketInfo> getLiveSocket(final List<SocketInfo> listOfEndPoints) { 
    if (!CollectionUtils.isEmpty(listOfEndPoints)) { 
     Collections.shuffle(listOfEndPoints); 
     for (SocketInfo obj : listOfEndPoints) { 
     if (obj.isLive()) { 
      return Optional.of(obj); 
     } 
     } 
    } 
    return Optional.absent(); 
    } 

    // update CHM map every 30 seconds 
    private void updateSockets() { 
    Map<DatacenterEnum, ImmutableList<String>> socketsByDc = TestUtils.SERVERS; 

    for (Entry<DatacenterEnum, ImmutableList<String>> entry : socketsByDc.entrySet()) { 
     List<SocketInfo> liveSockets = liveSocketsByDc.get(entry.getKey()); 
     List<SocketInfo> liveUpdatedSockets = new ArrayList<>(); 
     for (SocketInfo liveSocket : liveSockets) { 
     Socket socket = liveSocket.getSocket(); 
     String endpoint = liveSocket.getEndpoint(); 

     boolean sent = ....; 

     boolean isLive = sent ? true : false; 

     // is this right here? or will it cause any race condition? 
     SocketInfo state = new SocketInfo(socket, liveSocket.getContext(), endpoint, isLive); 
     liveUpdatedSockets.add(state); 
     } 
     // update map with new liveUpdatedSockets 
     liveSocketsByDc.put(entry.getKey(), liveUpdatedSockets); 
    } 
    } 
} 

問:

是我上面的代碼線程安全的,有在updateSockets()getNextSocket()方法的競爭狀態?

在我updateSockets()的方法,我提取liveSocketsByDc的ConcurrentHashMap List<SocketInfo>這是之前已經在connectToSockets()方法初始化或30秒的下一個時間間隔內updateSockets()方法填充,然後我遍歷同一個列表liveSockets並根據創建一個新的SocketInfo對象不管isLive是真是假。然後我用這個新的SocketInfo對象更新liveSocketsByDc ConcurrentHashMap。這看起來正確嗎?由於來自多個讀者線程,我打算撥打getNextSocket()方法,它會調用getLiveSocket方法,該方法使用相同的地圖獲取下一個可用的活動套接字。

我在迭代liveSockets列表,然後創建一個新的SocketInfo對象,只需更改isLive字段,其​​他內容將保持不變。這是正確的嗎?

如果存在線程安全問題,解決此問題的最佳方法是什麼?

+0

不,它不是線程安全的。我發現的第一個違規行爲(沒有更進一步)是每一次讀取都會將一個共享的ArrayList從地圖中取出並對其進行洗牌。 –

+0

這個問題可能適合我們的姊妹網站[代碼評論](http://codereview.stackexchange.com/)比它在這裏。 –

+0

@JBNizet我明白了,我們如何解決這個問題? – user1234

回答

1

這裏:

List<SocketInfo> liveSockets = liveSocketsByDc.get(entry.getKey()); 

你不同的線程中可能寫入/讀取的相同列表對象並行。

所以:不是線程安全的。它沒有幫助有一個「外部」線程安全的數據結構;當那個線程安全的東西包含數據...這不是線程安全的;但後來「並行」!

+0

在我的代碼中修復這個線程安全問題的最好方法是什麼? – user1234