2013-12-18 173 views
1

我正在嘗試使用尾遞歸函數對SynchronizedQueue進行一些處理。這個函數似乎可以正常工作,但我對併發性的看法越多,我相信在使用不同線程訪問此隊列時,我可能會遇到一些競爭條件。這是我認爲我可以使用一些幫助功能:Racy尾遞歸函數

val unsavedMessages = new SynchronizedQueue[CachedMessage]() 

val MAX_BATCH = 256 
val rowCount = new AtomicInteger() 

    private def validateCacheSize() = if (unsavedMessages.length > MAX_BATCH) { 
    implicit val batch = createBatch 
    val counter = rowCount.getAndIncrement 
    @tailrec 
    def processQueue(queue: SynchronizedQueue[CachedMessage]): Unit = if (queue.nonEmpty) { 
     val cm = queue.dequeue 
     addToBatch(cm.request, cm.timestamp, cm.brokerId, counter) 
     processQueue(queue) 
    } 
    processQueue(unsavedMessages) 
    executeBatch 
    resetQueue 
    } 

    def resetQueue = unsavedMessages.clear 

多個線程調用這個函數:

def add(request: WebserviceRuleMatch, timestamp: Long, brokerId: String) = { 
    validateCacheSize 
    //log.info("enquing request "+ unsavedMessages.length) 
    unsavedMessages.enqueue(CachedMessage(request, timestamp, brokerId)) 
    } 

有沒有人對如何改進本的指針所以恐怕不太可能競賽條件?

+0

你能告訴我們爲什麼你認爲這段代碼可能有競爭條件嗎? –

+1

add函數從未來被調用,所以我覺得好像queue.nonempty和queue.dequeue之間的隊列可能被清空。另外我認爲消息可以通過processQueue和resetQueue之間的線程添加。 – chiappone

回答

1

有可能是一個機會,隊列被queue.nonempty和queue.dequeue之間空

  • 避免調用必須在代碼中同步多個隊列操作。使用SynchronizedQueue的力量來執行原子線程安全操作。例如。避免調用queue.nonempty共(替代尾遞歸):

    for (cm <- unsavedMessages.dequeueAll(_ => true)) 
        addToBatch(cm.request, cm.timestamp, cm.brokerId, counter) 
    executeBatch 
    //resetQueue -- Don't do this! Not thread-safe 
    

我認爲消息可以通過processQueue和resetQueue

之間的線程被添加
  • 總是會有一個指出你的代碼已經對隊列進行了「快照」並將其清空。我之前的觀點確保'快照'和清空是單個原子操作。如果新條目在該原子的「快照&空白」操作之後的任意點入隊 - 沒有問題。你的'快照&空'必須發生在某個地方,新項目排隊是生活中的事實。決定允許新項目在「快照&空」後的任何點進入隊列。他們將在下一個週期處理。即超出上述要求的任何額外需求。

羅賓格林:(順便說一下,這種方法似乎有一個非常誤導的名字!)

  • 他說! :)
1

add函數從未來調用gets,所以我覺得好像有可能在queue.nonempty和queue.dequeue之間清空隊列。

是的,它可以。您可以使用雙重檢查鎖定,使單線程validateCacheSize。 (順便說一句,該方法似乎有一個非常令人誤解的名字!)

此外,我認爲消息可以通過processQueue和resetQueue之間的線程添加。

是的,他們可以。但爲什麼你需要撥打unsavedMessages.clearqueue.dequeue已將其從隊列中刪除。所以應該存在於隊列中的唯一unsavedMessages是仍然需要處理的那些。