2017-10-29 97 views
9

我有一個類,我在updateLiveSockets()方法內每隔30秒從單個後臺線程填充地圖liveSocketsByDatacenter,然後我有一個方法getNextSocket()將被調用多個閱讀器線程來獲取可用的活動套接字,它使用相同的地圖來獲取此信息。同時閱讀一個地圖,而單個後臺線程定期修改它

public class SocketManager { 
    private static final Random random = new Random(); 
    private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); 
    private final AtomicReference<Map<Datacenters, List<SocketHolder>>> liveSocketsByDatacenter = 
     new AtomicReference<>(Collections.unmodifiableMap(new HashMap<>())); 
    private final ZContext ctx = new ZContext(); 

    // Lazy Loaded Singleton Pattern 
    private static class Holder { 
    private static final SocketManager instance = new SocketManager(); 
    } 

    public static SocketManager getInstance() { 
    return Holder.instance; 
    } 

    private SocketManager() { 
    connectToZMQSockets(); 
    scheduler.scheduleAtFixedRate(new Runnable() { 
     public void run() { 
     updateLiveSockets(); 
     } 
    }, 30, 30, TimeUnit.SECONDS); 
    } 

    // during startup, making a connection and populate once 
    private void connectToZMQSockets() { 
    Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS; 
    // The map in which I put all the live sockets 
    Map<Datacenters, List<SocketHolder>> updatedLiveSocketsByDatacenter = new HashMap<>(); 
    for (Map.Entry<Datacenters, ImmutableList<String>> entry : socketsByDatacenter.entrySet()) { 
     List<SocketHolder> addedColoSockets = connect(entry.getKey(), entry.getValue(), ZMQ.PUSH); 
     updatedLiveSocketsByDatacenter.put(entry.getKey(), 
      Collections.unmodifiableList(addedColoSockets)); 
    } 
    // Update the map content 
    this.liveSocketsByDatacenter.set(Collections.unmodifiableMap(updatedLiveSocketsByDatacenter)); 
    } 

    private List<SocketHolder> connect(Datacenters colo, List<String> addresses, int socketType) { 
    List<SocketHolder> socketList = new ArrayList<>(); 
    for (String address : addresses) { 
     try { 
     Socket client = ctx.createSocket(socketType); 
     // Set random identity to make tracing easier 
     String identity = String.format("%04X-%04X", random.nextInt(), random.nextInt()); 
     client.setIdentity(identity.getBytes(ZMQ.CHARSET)); 
     client.setTCPKeepAlive(1); 
     client.setSendTimeOut(7); 
     client.setLinger(0); 
     client.connect(address); 

     SocketHolder zmq = new SocketHolder(client, ctx, address, true); 
     socketList.add(zmq); 
     } catch (Exception ex) { 
     // log error 
     } 
    } 
    return socketList; 
    } 

    // this method will be called by multiple threads to get the next live socket 
    // is there any concurrency or thread safety issue or race condition here? 
    public Optional<SocketHolder> getNextSocket() { 
    // For the sake of consistency make sure to use the same map instance 
    // in the whole implementation of my method by getting my entries 
    // from the local variable instead of the member variable 
    Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = 
     this.liveSocketsByDatacenter.get(); 
    Optional<SocketHolder> liveSocket = Optional.absent(); 
    List<Datacenters> dcs = Datacenters.getOrderedDatacenters(); 
    for (Datacenters dc : dcs) { 
     liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc)); 
     if (liveSocket.isPresent()) { 
     break; 
     } 
    } 
    return liveSocket; 
    } 

    // is there any concurrency or thread safety issue or race condition here? 
    private Optional<SocketHolder> getLiveSocketX(final List<SocketHolder> endpoints) { 
    if (!CollectionUtils.isEmpty(endpoints)) { 
     // The list of live sockets 
     List<SocketHolder> liveOnly = new ArrayList<>(endpoints.size()); 
     for (SocketHolder obj : endpoints) { 
     if (obj.isLive()) { 
      liveOnly.add(obj); 
     } 
     } 
     if (!liveOnly.isEmpty()) { 
     // The list is not empty so we shuffle it an return the first element 
     Collections.shuffle(liveOnly); 
     return Optional.of(liveOnly.get(0)); 
     } 
    } 
    return Optional.absent(); 
    } 

    // Added the modifier synchronized to prevent concurrent modification 
    // it is needed because to build the new map we first need to get the 
    // old one so both must be done atomically to prevent concistency issues 
    private synchronized void updateLiveSockets() { 
    Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS; 

    // Initialize my new map with the current map content 
    Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = 
     new HashMap<>(this.liveSocketsByDatacenter.get()); 

    for (Entry<Datacenters, ImmutableList<String>> entry : socketsByDatacenter.entrySet()) { 
     List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey()); 
     List<SocketHolder> liveUpdatedSockets = new ArrayList<>(); 
     for (SocketHolder liveSocket : liveSockets) { // LINE A 
     Socket socket = liveSocket.getSocket(); 
     String endpoint = liveSocket.getEndpoint(); 
     Map<byte[], byte[]> holder = populateMap(); 
     Message message = new Message(holder, Partition.COMMAND); 

     boolean status = SendToSocket.getInstance().execute(message.getAdd(), holder, socket); 
     boolean isLive = (status) ? true : false; 
     // is there any problem the way I am using `SocketHolder` class? 
     SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive); 
     liveUpdatedSockets.add(zmq); 
     } 
     liveSocketsByDatacenter.put(entry.getKey(), 
      Collections.unmodifiableList(liveUpdatedSockets)); 
    } 
    this.liveSocketsByDatacenter.set(Collections.unmodifiableMap(liveSocketsByDatacenter)); 
    } 
} 

正如你可以在我的班級看到:

  • 從一個運行每30秒一個後臺線程,我填充liveSocketsByDatacenter地圖與updateLiveSockets()方法所有帶電的插座。
  • 然後從多個線程,我打電話getNextSocket()方法給我一個活插座可用,它使用liveSocketsByDatacenter地圖來獲取所需的信息。

我有我的代碼工作正常,沒有任何問題,並希望看看是否有任何更好或更有效的方式來寫這個。我也想得到關於線程安全問題或任何競爭條件的意見(如果有的話),但到目前爲止我還沒有看到,但我可能是錯的。

我大多擔心updateLiveSockets()方法和getLiveSocketX()方法。我在LINE A迭代liveSockets這是ListSocketHolder,然後製作一個新的SocketHolder對象並添加到另一個新列表中。這裏好嗎?

注意:SocketHolder是一個不可變的類。你可以忽略我擁有的東西ZeroMQ

+3

它看起來我就像'liveSocketsByDatacenter'是** **不可改變。這是一條使這一切變得更加簡單的途徑。 –

+2

加上你的邏輯在很多地方都非常清晰。 「可選」的濫用使我的眼睛水。我會完全擺脫它 - 你不會在任何地方正確使用它。提示:調用'Optional.isPresent'總是一個壞主意。 –

+0

我有幾種方法可以出錯。首先是每隔30秒由後臺線程調用的'updateLiveSockets',其次是由多個讀取器線程同時調用'getNextSocket'方法,這個方法在內部調用'getLiveSocket'方法,所以這三種方法在線程安全問題上都是正確的。你認爲他們都做對了嗎?我更害怕'updateLiveSockets'方法。 – john

回答

8

您使用以下同步技術。

  1. 帶有活套接字數據的地圖位於原子參考之後,這允許安全地切換地圖。
  2. updateLiveSockets()方法是同步的(隱式地在此上),這將阻止同時由兩個線程切換地圖。
  3. 如果在getNextSocket()方法中發生切換,您在使用它時避免混淆,從而製作本地地圖。

它是否像現在一樣是線程安全的?

線程安全總是取決於共享可變數據是否有適當的同步。在這種情況下,共享可變數據是數據中心到其SocketHolders列表的映射。

地圖位於AtomicReference以及製作本地副本以供使用的事實在地圖上具有足夠的同步性。由於AtomicReference的性質,您的方法採用地圖版本並使用該版本,切換版本是線程安全的。這也可以通過僅爲地圖volatile創建成員字段來實現,因爲您所做的只是更新引用(您不對其執行任何檢查 - 然後動作操作)。

由於scheduleAtFixedRate()保證了通過Runnable將不能同時與自身運行,在updateLiveSockets()的​​是不需要的,但是,它也不會產生什麼負面影響。

所以,這個類是線程安全的,因爲它是。

但是,並不完全清楚SocketHolder是否可以被多個線程同時使用。實際上,這個類只是試圖通過選擇一個隨機的活動來儘量減少併發使用SocketHolders(不需要隨機選擇一個隨機索引來拖動整個數組)。它沒有什麼實際阻止併發使用。

它可以提高效率嗎?

我相信可以。在查看updateLiveSockets()方法時,它似乎構建了完全相同的地圖,但SocketHolder可能具有與isLive標誌不同的值。這使我得出結論,我不想切換整個地圖,而只想切換地圖中的每個列表。並且爲了以線程安全的方式更改地圖中的條目,我可以使用ConcurrentHashMap

如果我使用一個ConcurrentHashMap,並且不切換地圖,而是在地圖上的值,我可以擺脫AtomicReference

要改變映射,我可以建立新列表並將其直接放入地圖。這樣更高效,因爲我可以更快地發佈數據,並且創建更少的對象,而我的同步只是建立在現成的組件上,這有利於可讀性。

這裏是我的版本(省略了不太相關的一些地方,爲了簡潔)

public class SocketManager { 
    private static final Random random = new Random(); 
    private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); 
    private final Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = new ConcurrentHashMap<>(); // use ConcurrentHashMap 
    private final ZContext ctx = new ZContext(); 

    // ... 

    private SocketManager() { 
     connectToZMQSockets(); 
     scheduler.scheduleAtFixedRate(this::updateLiveSockets, 30, 30, TimeUnit.SECONDS); 
    } 

    // during startup, making a connection and populate once 
    private void connectToZMQSockets() { 
     Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS; 
     for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) { 
     List<SocketHolder> addedColoSockets = connect(entry.getValue(), ZMQ.PUSH); 
     liveSocketsByDatacenter.put(entry.getKey(), addedColoSockets); // we can put it straight into the map 
     } 
    } 

    // ...  

    // this method will be called by multiple threads to get the next live socket 
    // is there any concurrency or thread safety issue or race condition here? 
    public Optional<SocketHolder> getNextSocket() { 
     for (Datacenters dc : Datacenters.getOrderedDatacenters()) { 
     Optional<SocketHolder> liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc)); // no more need for a local copy, ConcurrentHashMap, makes sure I get the latest mapped List<SocketHolder> 
     if (liveSocket.isPresent()) { 
      return liveSocket; 
     } 
     } 
     return Optional.absent(); 
    } 

    // is there any concurrency or thread safety issue or race condition here? 
    private Optional<SocketHolder> getLiveSocket(final List<SocketHolder> listOfEndPoints) { 
     if (!CollectionUtils.isEmpty(listOfEndPoints)) { 
     // The list of live sockets 
     List<SocketHolder> liveOnly = new ArrayList<>(listOfEndPoints.size()); 
     for (SocketHolder obj : listOfEndPoints) { 
      if (obj.isLive()) { 
      liveOnly.add(obj); 
      } 
     } 
     if (!liveOnly.isEmpty()) { 
      // The list is not empty so we shuffle it an return the first element 
      return Optional.of(liveOnly.get(random.nextInt(liveOnly.size()))); // just pick one 
     } 
     } 
     return Optional.absent(); 
    } 

    // no need to make this synchronized 
    private void updateLiveSockets() { 
     Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS; 

     for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) { 
     List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey()); 
     List<SocketHolder> liveUpdatedSockets = new ArrayList<>(); 
     for (SocketHolder liveSocket : liveSockets) { // LINE A 
      Socket socket = liveSocket.getSocket(); 
      String endpoint = liveSocket.getEndpoint(); 
      Map<byte[], byte[]> holder = populateMap(); 
      Message message = new Message(holder, Partition.COMMAND); 

      boolean status = SendToSocket.getInstance().execute(message.getAdd(), holder, socket); 
      boolean isLive = (status) ? true : false; 

      SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive); 
      liveUpdatedSockets.add(zmq); 
     } 
     liveSocketsByDatacenter.put(entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets)); // just put it straigth into the map, the mapping will be updated in a thread safe manner. 
     } 
    } 

} 
+0

感謝您的詳細解釋。我一方面很困惑,一旦狀態在'updateLiveSockets()'方法中改變,它是否能夠保證所有讀者線程都能從'getNextSocket()'方法看到套接字的狀態? – john

+0

是的,一旦'updateLiveSockets()'將新列表放入地圖中,讀者線程中的所有後續'get()'將會看到新列表 – bowmore

+0

好吧,讓我試試看看它是如何工作的。我得到了另一個[問題](https://stackoverflow.com/questions/47107331/how-to-make-sure-that-i-am-not-sharing-same-socket-between-two-threads-at-a - 相同)的基礎上只有這一個,我試圖確保我不會在兩個線程之間共享相同的套接字。我已經得到了解決方案,但試圖看到在這裏工作還是還有更好的方法?也許我們可以使用ThreadLocal?但我還不確定。 – john

4

如果SocketHolderDatacenters,是不可變的,你的程序看起來很好。不過,這裏有一些小反饋。

1的AtomicReference的用法

AtomicReference<Map<Datacenters, List<SocketHolder>>> liveSocketsByDatacenter

此成員變量不需要被包裹在一個的AtomicReference。你沒有對它進行任何原子CAS操作。你可以簡單地聲明一個volative Map<Datacenters, List<SocketHolder>>,閱讀時只需創建一個本地引用。這足以保證對新映射的引用進行原子交換。

2. Synchronized方法

private synchronized void updateLiveSockets()

該方法是從一個單獨的線程執行調用,所以沒有必要爲它進行同步。

3。一些簡化

  • 從當前的這個類的用法,好像你可以過濾掉插座其不在updateLiveSockets活着,避免了每次來篩選客戶端調用getNextSocket

  • 您可以更換 Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS 作者:Set<Datacenters> datacenters = Utils.SERVERS.keySet()並使用鑰匙工作。

    4. Java的8

如果可能的話,切換到Java 8流連同Java8的可選將消除大量的樣板代碼,使你的代碼更易於閱讀。

+0

我正在通過你的答案,我想知道我怎麼能實現你在'3.1'中提到的。如果我篩選出沒有活動的套接字,那麼我將如何在我的'updateLiveSockets'方法中重新遍歷所有套接字?正如你在'updateLiveSockets'方法中看到的那樣,我正在遍歷所有的套接字並且對它們進行ping處理,以瞭解它們是否存在或不存在? – john

相關問題