2015-06-06 52 views
1

我在我的代碼中使用了Callable,它將被多線程調用,如下所示。截至目前,只要有任何RestClientException被拋出,然後我將hostname添加到blockList。如何在多線程應用程序連續失敗後將主機名添加到阻止列表?

public class Task implements Callable<DataResponse> { 

    private DataKey key; 
    private RestTemplate restTemplate; 

    public Task(DataKey key, RestTemplate restTemplate) { 
     this.key = key; 
     this.restTemplate = restTemplate; 
    } 

    @Override 
    public DataResponse call() { 
     ResponseEntity<String> response = null; 

     // construct what are the hostnames I can call basis on user id 
     List<String> hostnames = some_code_here; 

     for (String hostname : hostnames) { 
      // If host name is null or host name is in block list, skip sending request to this host 
      if (DataUtils.isEmpty(hostname) || DataMapping.isBlocked(hostname)) { 
       continue; 
      } 
      try { 
       String url = createURL(hostname); 
       response = restTemplate.exchange(url, HttpMethod.GET, key.getEntity(), String.class); 

       // some code here to return the response if successful 
      } catch (HttpClientErrorException ex) { 
       // log exception 
       return new DataResponse(errorMessage, error, DataStatusEnum.ERROR); 
      } catch (HttpServerErrorException ex) { 
       // log exception 
       return new DataResponse(errorMessage, error, DataStatusEnum.ERROR); 
      } catch (RestClientException ex) { 
       // I don't want to add it to block list instantly. 
       // If same hostname as failed five times consecutively, then only add it 
       DataMapping.blockHost(hostname); 
      } 
     } 

     return new DataResponse(DataErrorEnum.SERVER_UNAVAILABLE, DataStatusEnum.ERROR);   
    } 
} 

下面是我在DataMapping類:

private static final AtomicReference<ConcurrentHashMap<String, String>> blockedHosts = 
     new AtomicReference<ConcurrentHashMap<String, String>>(new ConcurrentHashMap<String, String>()); 

public static boolean isBlocked(String hostName) { 
    return blockedHosts.get().containsKey(hostName); 
} 

public static void blockHost(String hostName) { 
    blockedHosts.get().put(hostName, hostName); 
} 

問題陳述: -

現在,你可以在call方法,我擋住了hostname只要看它會拋出RestClientException這可能是不對的。我需要查看一個特定的hostname是否連續五次拋出RestClientException,然後只通過調用這一行DataMapping.blockHost(hostname);將此hostname添加到blockList,否則不要將它添加到blockList。

什麼是最有效和最好的方式來做到這一點?最大,我將總共有70-100臺獨特的機器。

在這種情況下,我的調用方法將從多個線程中調用,因此我需要確保每個hostname的計數保持正確,以防他們拋出RestClientException

編輯:

我也有以下方法DataMapping類以及:

我有它運行每2分鐘替換整套的,因爲我的服務提供實時數據是否後臺線程任何主機名都被封鎖或不封鎖。我想我需要atomic reference當我替換整個集合。

我在代碼中本地添加了塊功能,因爲我可能知道哪些機器在2分鐘後被阻塞,因此如果可能的話最好事先知道它。

// this is being updated from my background thread which runs every 2 minutes 
public static void replaceBlockedHosts(List<String> hostNames) { 
    ConcurrentHashMap<String, String> newBlockedHosts = new ConcurrentHashMap<>(); 
    for (String hostName : hostNames) { 
     newBlockedHosts.put(hostName, hostName); 
    } 
    blockedHosts.set(newBlockedHosts); 
} 

回答

1

我將每臺主機與在每個RestClientException遞增的AtomicInteger關聯。在成功執行「五個連續次」約束的成功調用時,此整數將被設置爲零。代碼看起來像這樣。

private final ConcurrentHashMap<String, AtomicInteger> failedCallCount = new ConcurrentHashMap<>(); 

void call() { 
     try { 
      String url = createURL(host); 
      // make rest call 
      resetFailedCallCount(host); 
      // ... 
     } catch (RestClientException ex) { 
      registerFailedCall(host); 
      if (shouldBeBlocked(host)) { 
       DataMapping.blockHost(host); 
      } 
     } 
} 


private boolean shouldBeBlocked(String hostName) { 
    AtomicInteger count = failedCallCount.getOrDefault(hostName, new AtomicInteger()); 
    return count.get() >= 5; 
} 

private void registerFailedCall(String hostName) { 
    AtomicInteger newValue = new AtomicInteger(); 
    AtomicInteger val = failedCallCount.putIfAbsent(hostName, newValue); 
    if (val == null) { 
     val = newValue; 
    } 
    if (val.get() < 5) { 
     val.incrementAndGet(); 
    } 
} 

private void resetFailedCallCount(String hostName) { 
    AtomicInteger count = failedCallCount.get(hostName); 
    if (count != null) { 
     count.set(0); 
    } 
} 

這是無鎖的(至少在我們自己的代碼中)並且非常有效。但它很容易受到某些競爭條件的影響。最值得注意的是計數可能會大於5.但是,這不應該成爲一個問題,因爲主機無論如何都被阻止,並且計數不會用於其他任何事情。

+0

感謝您的建議。我從你的建議中得到了一些想法。我在'blockedHosts'上使用了'AtomicReference',這樣我就可以立即通知我的所有線程,如果有任何更改放入該地圖。我仍然需要這個權利?第二個問題是,我們在什麼地方取消設置整數值 - 我的意思是讓我們說'machineA'連續4次拋出'RestClientException',但是第5次,它開始服務請求,那麼計數應該被設置爲0,以便該主機名?我會在那裏做什麼? – john

+0

@david只要不重新分配該字段,''ConcurrentHashMap'上不需要'AtomicReference'?無論如何,它內部的更改都將立即對所有線程可見。我會在一個成功的調用之後重置計數,也就是在'response = restTemplate.exchange(url,HttpMethod.GET,key.getEntity(),String.class)'這行之後重置計數,並且每次調用都會這樣做(比檢查更便宜應該重置或不重置 - 就這樣做!)。 –

+0

嘿,你還在嗎?我在這個聊天室裏(http://chat.stackoverflow.com/rooms/79863/room-for-david-and-k-erlandsson)。還有一個基本問題。 – john

0

DataMapping類中維護一個類似於 - public static ConcurrentHashMap<String, Integer> toBeBlockedHostName = new ConcurrentHashMap<String, Integer>();的靜態寄存器。然後用它你這樣的FOR循環:

for (String hostname : hostnames) { 

     // .. some code here 
     //After ensuring everything is success and no RestClientException, i.e. can be last line of your TRY block... 
     DataMapping.toBeBlockedHostName.remove("stackoverflow6361"); 
     catch (RestClientException ex) { 
      if(DataMapping.toBeBlockedHostName.get("stackoverflow6361") == null){ 
       DataMapping.toBeBlockedHostName.put("stackoverflow6361", new Integer(1)); 
      } else{ 
       if(DataMapping.toBeBlockedHostName.get("stackoverflow6361") == 5){ //Don't hard code 5, have it from some property file after defining as retryThreshold... 
        System.out.println("Blocking threshold reached, block the hostname..."); 
        DataMapping.blockHost(hostname); 
       } else{ 
        DataMapping.toBeBlockedHostName.put("stackoverflow6361", (toBeBlockedHostName.get("stackoverflow6361") + 1)); 
       } 
      } 
     } 

請注意::對於ConcurrentHashMap,即使所有的操作都是線程安全的,檢索操作並不意味着鎖定。

請注意,連續5次重試失敗後,您將阻止主機名,但是如果您再次解除阻止,則應清除寄存器。

P.S .:有適當的getter和setter用於HashMap。

相關問題