2015-05-11 57 views
1

好吧,全新的gpars,請原諒我,如果這有一個明顯的答案。完成所有工作後終止現有游泳池

這是我的場景。我們目前有一段代碼被包裝在一個Thread.start {}塊中。它這樣做可以將消息發送到後臺的消息隊列,而不會阻止用戶請求。我們最近碰到的一個問題是大量的工作,用戶可能會執行另一個操作,導致這個塊再次執行。由於它是線程化的,第二批消息可能在第一批導致損壞的數據之前發送。

我想改變這個過程作爲一個隊列流與gpars一起工作。我見過創建池,如

def pool = GParsPool.createPool() 

def pool = new ForkJoinPool() 

,然後用池

GParsPool.withExistingPool(pool) { 
    ... 
} 

這似乎將佔到案件的例子,如果用戶再次執行操作,我可以重新使用創建的池,並且操作不會按順序執行,前提是我的池大小爲1。

我的問題是,這是用gpars做到這一點的最好方法嗎?此外,我怎麼知道游泳池何時完成其所有工作?所有工作完成後它會終止嗎?如果是這樣,是否有一種方法可以用來檢查池是否已完成/終止以知道我需要一個新的?

任何幫助,將不勝感激。

回答

0

以下是我們遇到的問題的當前解決方案。應當指出的是,我們遵循了這一路線,由於我們的要求

  • 工作是由一些背景
  • 一個給定的範圍內工作正在有序進行分組
  • 一個給定的環境中工作是同步
  • 其他工作對於上下文應該在前面的工作後執行
  • 工作不應該阻止用戶請求
  • 上下文在彼此之間是異步的
  • 一旦上下文工作結束後,上下文應該清理後本身

鑑於上述情況,我們已經實現了以下內容:

class AsyncService { 
    def queueContexts 


    def AsyncService() { 
     queueContexts = new QueueContexts() 
    } 

    def queue(contextString, closure) { 
     queueContexts.retrieveContextWithWork(contextString, true).send(closure) 
    } 

    class QueueContexts { 
     def contextMap = [:] 

     def synchronized retrieveContextWithWork(contextString, incrementWork) { 
      def context = contextMap[contextString] 

      if (context) { 
       if (!context.hasWork(incrementWork)) { 
        contextMap.remove(contextString) 
        context.terminate() 
       } 
      } else { 
       def queueContexts = this 
       contextMap[contextString] = new QueueContext({-> 
        queueContexts.retrieveContextWithWork(contextString, false) 
       }) 
      } 

      contextMap[contextString] 
     } 

     class QueueContext { 
      def workCount 
      def actor 

      def QueueContext(finishClosure) { 
       workCount = 1 
       actor = Actors.actor { 
        loop { 
         react { closure -> 
          try { 
           closure() 
          } catch (Throwable th) { 
           log.error("Uncaught exception in async queue context", th) 
          } 

          finishClosure() 
         } 
        } 
       } 
      } 

      def send(closure) { 
       actor.send(closure) 
      } 

      def terminate(){ 
       actor.terminate() 
      } 

      def hasWork(incrementWork) { 
       workCount += (incrementWork ? 1 : -1) 
       workCount > 0 
      } 
     } 
    } 
} 
0

不,顯式創建的池不會自行終止。你必須明確地調用shutdown()。

但是,使用withPool(){}命令可以保證在代碼塊完成後池被銷燬。

+0

我很困惑。它似乎都是withPool和withExistingPool塊的執行。池完成後,我執行了一些關閉之後做了一些spock測試和邏輯。這不是我想要的。我不希望游泳池阻止執行。 – Taplar

+0

那麼也許你可能需要一個不同的抽象。一般來說,數據流任務和承諾給你更多的靈活性。 –

+0

我相信我們有一個使用演員的工作解決方案。一旦我們確信我們對此感到滿意,我會發布我們的最終解決方案,:) – Taplar