我有一個班級,其中有一個ConcurrentHashMap
,每30秒更新一個線程,然後通過調用getNextSocket()
方法從同一個ConcurrentHashMap
讀取多個閱讀器線程。從單個線程填充ConcurrentHashMap,然後從多個線程讀取而沒有任何競爭條件?
以下是我的單例類,它在初始化時調用connectToSockets()
方法來填充我的ConcurrentHashMap
,然後啓動一個後臺線程,通過調用updateSockets()
方法每隔30秒更新一次相同的地圖。
然後從多個線程我打電話getNextSocket()
方法獲得下一個可用的活套接字使用相同的地圖來獲取信息。我也有SocketInfo
類是不可變的,它包含所有套接字的狀態,不管它們是否存在。
public class SocketHolder {
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final Map<DatacenterEnum, List<SocketInfo>> liveSocketsByDc = new ConcurrentHashMap<>();
// Lazy Loaded Singleton Pattern
private static class Holder {
private static final SocketHolder INSTANCE = new SocketHolder();
}
public static SocketHolder getInstance() {
return Holder.INSTANCE;
}
private SocketHolder() {
connectToSockets();
scheduler.scheduleAtFixedRate(new Runnable() {
public void run() {
updateSockets();
}
}, 30, 30, TimeUnit.SECONDS);
}
private void connectToSockets() {
Map<DatacenterEnum, ImmutableList<String>> socketsByDc = TestUtils.SERVERS;
for (Map.Entry<DatacenterEnum, ImmutableList<String>> entry : socketsByDc.entrySet()) {
List<SocketInfo> addedColoSockets = connect(entry.getKey(), entry.getValue(), ZMQ.PUSH);
liveSocketsByDc.put(entry.getKey(), addedColoSockets);
}
}
private List<SocketInfo> connect(DatacenterEnum dc, List<String> addresses, int socketType) {
List<SocketInfo> socketList = new ArrayList<>();
// ... some code here
return socketList;
}
// called from multiple reader threads to get next live available socket
public Optional<SocketInfo> getNextSocket() {
Optional<SocketInfo> liveSocket = getLiveSocket(liveSocketsByDc.get(DatacenterEnum.CORP));
return liveSocket;
}
private Optional<SocketInfo> getLiveSocket(final List<SocketInfo> listOfEndPoints) {
if (!CollectionUtils.isEmpty(listOfEndPoints)) {
Collections.shuffle(listOfEndPoints);
for (SocketInfo obj : listOfEndPoints) {
if (obj.isLive()) {
return Optional.of(obj);
}
}
}
return Optional.absent();
}
// update CHM map every 30 seconds
private void updateSockets() {
Map<DatacenterEnum, ImmutableList<String>> socketsByDc = TestUtils.SERVERS;
for (Entry<DatacenterEnum, ImmutableList<String>> entry : socketsByDc.entrySet()) {
List<SocketInfo> liveSockets = liveSocketsByDc.get(entry.getKey());
List<SocketInfo> liveUpdatedSockets = new ArrayList<>();
for (SocketInfo liveSocket : liveSockets) {
Socket socket = liveSocket.getSocket();
String endpoint = liveSocket.getEndpoint();
boolean sent = ....;
boolean isLive = sent ? true : false;
// is this right here? or will it cause any race condition?
SocketInfo state = new SocketInfo(socket, liveSocket.getContext(), endpoint, isLive);
liveUpdatedSockets.add(state);
}
// update map with new liveUpdatedSockets
liveSocketsByDc.put(entry.getKey(), liveUpdatedSockets);
}
}
}
問:
是我上面的代碼線程安全的,有在updateSockets()
和getNextSocket()
方法的競爭狀態?
在我updateSockets()
的方法,我提取liveSocketsByDc
的ConcurrentHashMap List<SocketInfo>
這是之前已經在connectToSockets()
方法初始化或30秒的下一個時間間隔內updateSockets()
方法填充,然後我遍歷同一個列表liveSockets
並根據創建一個新的SocketInfo
對象不管isLive
是真是假。然後我用這個新的SocketInfo
對象更新liveSocketsByDc
ConcurrentHashMap。這看起來正確嗎?由於來自多個讀者線程,我打算撥打getNextSocket()
方法,它會調用getLiveSocket
方法,該方法使用相同的地圖獲取下一個可用的活動套接字。
我在迭代liveSockets
列表,然後創建一個新的SocketInfo
對象,只需更改isLive
字段,其他內容將保持不變。這是正確的嗎?
如果存在線程安全問題,解決此問題的最佳方法是什麼?
不,它不是線程安全的。我發現的第一個違規行爲(沒有更進一步)是每一次讀取都會將一個共享的ArrayList從地圖中取出並對其進行洗牌。 –
這個問題可能適合我們的姊妹網站[代碼評論](http://codereview.stackexchange.com/)比它在這裏。 –
@JBNizet我明白了,我們如何解決這個問題? – user1234