2014-07-24 89 views
0

我有兩個HashMap s表示,我想在一個類中進行同步synchronized塊。 我保持兩個地圖的原因是,我派遣任務到不同的服務器,和我保持在一個HashMap<String, Task>原始任務對象,並在同一時間,我管理的另一個HashMap<String, HashMap<InetAddress, TaskResponse>響應狀態。 HashMaps的密鑰是String,這對每個Task對象都是唯一的。與Java while循環和Thread.sleep()方法

所以我的代碼是這樣的:

HashMap<String, Map<InetAddress, TaskResponse>> taskResponseMap = new HashMap<>(); 
HashMap<String, Task> taskMap = new HashMap<>(); 

public void endpointHasRespondedCallback(TaskResponse taskResponse, InetAddress from) { 
    // Callback threads gets blocked here! 
    synchronized(taskResponseMap) { 
     synchronized (taskMap) { 
      Map<InetAddress, TaskResponse> taskResponses = taskResponseMap.get(taskResponse.taskUuid); 
      if (taskResponses == null || !taskResponses.containsKey(from)) { 
       // The response does not exists probably due to timeout 
       return; 
      } 
      taskResponses.put(from, taskResponse); 
     } 
    } 
} 

public void sendTaskToAllEndpoints(Task task) { 
    long taskStartedAt = System.currentTimeMillis(); 
    HashMap<InetAddress, TaskResponse> taskResponses = new HashMap<>(); 
    taskResponseMap.put(task.taskUuid, taskResponses); 
    taskMap.put(task.taskUuid, task); 

    for (InetAddress dest : getDestinationNodes()) { 
     sendTaskTo(dest, task); 
     messageResponses.put(dest, TaskResponse.emptyTaskResponse()); 
    } 

    // Should wait for response to comeback till the timeout is over 
    while (System.currentTimeMillis() < taskStartedAt + timeoutInMillis) { 
     Thread.sleep(1000); 

     synchronized(taskResponseMap) { 
      synchronized (taskMap) { 
       if(isTaskOver(task.taskUuid)) { 
        Map<InetAddress, TaskResponse> responses = taskResponseMap.remove(task.taskUuid); 
        taskMap.remove(task.taskUuid); 

        task.taskIsDone(responses); 
        return; 
       } 
      } 
     } 
    } 

    // If the task is still sitting there, then it must have timed out! 
    synchronized(taskResponseMap) { 
     synchronized (taskMap) { 
      taskResponseMap.remove(task.taskUuid); 
      taskMap.remove(task.taskUuid); 
     } 
    } 
} 

// Do not synchronize purposefully since it is only being called in synchronized methods 
public boolean isTaskOver(String taskUuid) { 
    Task task = taskMap.get(taskUuid); 
    if (task == null || !taskResponseMap.containsKey(task.taskUuid)) { 
     return true; 
    } else { 
     for (TaskResponse value : taskResponseMap.get(task.taskUuid).values()) { 
      if (value.status != TaskResponseStatus.SUCCESSFUL) { 
       return false; 
      } 
     } 
    } 
    return true; 
} 

因此從概念上解釋我的代碼做什麼,sendTaskToAllEndpoints()方法發送Task對象遠程端點,並等待,直到while循環內超時。 只要遠程端點響應,它就會執行endpointHasRespondedCallback()方法,以便它標記爲在TaskResponseMap中完成。 早在sendTaskToAllEndPoints()方法,我檢查的任務是通過使用isTaskDone() helper方法做,如果沒有,我只是繼續和調用Thread.sleep(1000)等待一秒鐘,直到我下一次檢查。

我面臨的問題是,即使我看到endpointHasRespondedCallback()方法正在執行的所有我調度到(我用日誌語句驗證)的遠程節點,它等待​​塊外,只有在那裏無論何時在sendTaskToAllEndpoints()方法中發生超時,因此即使所有節點都已正確響應,我的任務仍會超時。

這是我沒有想到的,因爲即使我在while循環中獲取兩個對象上的鎖,我在睡覺前解鎖它,並且我假設sendTaskToAllEndpoints()線程進入睡眠狀態時,等待鎖的其他線程應獲取endpointHasRespondedCallback()方法中的鎖,並將Task標記爲按預期完成。

我大概可以實現我用更好的方式來解決這個問題的方案,但我想知道什麼是對這種行爲的合乎邏輯的解釋。

+0

你的同步代碼看起來很好,儘管你不需要'taskMap'上的同步,因爲只有一個線程會改變它。(此外,如果你搞砸了一些東西,那麼擁有依賴獲取多個鎖的同步邏輯是一種簡單的方法,最終會導致死鎖;你的目標應該是鎖定對象的數量更少而不是更多,同時仍然保持線程安全。)你是否肯定沒有其他代碼(你沒有發佈)正在獲取任何一個對象的鎖? – Tim

+0

isTaskOver是公開的,但不是線程安全的!你應該改變爲私人。順便說一句。避免使用循環+ Thread.sleep進行主動等待。更好地使用條件等待(wait + notifyAll)。 – isnot2bad

+0

我爲使其工作的唯一更改是通過刪除while循環內的同步塊。這應該意味着阻止我的另一個線程運行的只是while循環內的synchronized塊。我知道在C中,編譯器可能[改變正在執行的東西的順序](http://stackoverflow.com/questions/4437527/why-do-we-use-volatile-keyword-in-c),以優化該程序影響了多線程程序的功能。在Java中也有類似的東西嗎? – YShin

回答

0

好的,不要在意發生了什麼,我在task.taskIsDone(responses)裏面調用了sendTaskToAllEndpoints()的方法,在while循環中有效地保存了鎖,直到新的被調用的任務也結束。

我通過創建一個AtomicInteger對象來增加每次我輸入同步塊時遞增的對象,並在我離開時遞減它(並將它們封裝在try-finally塊中)。第一項任務完成後,我觀察到AtomicInteger對象沒有降到0,並且它們在1和2之間上下移動。

看起來第一個任務成功結束,因爲沒有阻止回調執行,但一旦第一個任務已完成,並task.taskIsDone()叫,這種方法本身產生的Task另一個實例,並呼籲sendTaskToAllEndpoints(),因此,阻斷所有回調調用的第二個任務,並依此類推,直到他們與超時結束。

謝謝大家的建議。