我正在嘗試使用尾遞歸函數對SynchronizedQueue進行一些處理。這個函數似乎可以正常工作,但我對併發性的看法越多,我相信在使用不同線程訪問此隊列時,我可能會遇到一些競爭條件。這是我認爲我可以使用一些幫助功能:Racy尾遞歸函數
val unsavedMessages = new SynchronizedQueue[CachedMessage]()
val MAX_BATCH = 256
val rowCount = new AtomicInteger()
private def validateCacheSize() = if (unsavedMessages.length > MAX_BATCH) {
implicit val batch = createBatch
val counter = rowCount.getAndIncrement
@tailrec
def processQueue(queue: SynchronizedQueue[CachedMessage]): Unit = if (queue.nonEmpty) {
val cm = queue.dequeue
addToBatch(cm.request, cm.timestamp, cm.brokerId, counter)
processQueue(queue)
}
processQueue(unsavedMessages)
executeBatch
resetQueue
}
def resetQueue = unsavedMessages.clear
多個線程調用這個函數:
def add(request: WebserviceRuleMatch, timestamp: Long, brokerId: String) = {
validateCacheSize
//log.info("enquing request "+ unsavedMessages.length)
unsavedMessages.enqueue(CachedMessage(request, timestamp, brokerId))
}
有沒有人對如何改進本的指針所以恐怕不太可能競賽條件?
你能告訴我們爲什麼你認爲這段代碼可能有競爭條件嗎? –
add函數從未來被調用,所以我覺得好像queue.nonempty和queue.dequeue之間的隊列可能被清空。另外我認爲消息可以通過processQueue和resetQueue之間的線程添加。 – chiappone