2014-12-05 50 views
3

我對GPars演員的理解可能會關閉,所以如果我錯了,請糾正我。我有一個Groovy應用程序來輪詢Web服務的工作。當找到一個或多個作業時,它會將每個作業發送到我創建的DynamicDispatchActor,並處理作業。作業是完全獨立的,不需要將任何東西返回到主線程。當一次有多個工作進來時,我希望他們能夠並行處理,但無論我嘗試什麼配置,都需要首先進行處理。我該如何平行GPars Actors?

爲了給出一個代碼示例:

def poolGroup = new DefaultPGroup(new DefaultPool(true, 5)) 

def actor = poolGroup.messageHandler { 
    when {Integer msg -> 
     println("I'm number ${msg} on thread ${Thread.currentThread().name}") 
     Thread.sleep(1000) 
    } 
} 

def integers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] 

integers.each { 
    actor << it 
} 

此打印出:

 
I'm number 1 on thread Actor Thread 31 
I'm number 2 on thread Actor Thread 31 
I'm number 3 on thread Actor Thread 31 
I'm number 4 on thread Actor Thread 31 
I'm number 5 on thread Actor Thread 31 
I'm number 6 on thread Actor Thread 31 
I'm number 7 on thread Actor Thread 31 
I'm number 8 on thread Actor Thread 31 
I'm number 9 on thread Actor Thread 31 
I'm number 10 on thread Actor Thread 31 

隨着每個打印出之間的輕微的暫停。另請注意,每個打印輸出都來自同一個Actor /線程。

我想在這裏看到的是前5個數字會立即打印出來,因爲線程池被設置爲5,然後接下來的5個數字會被釋放。我完全離開這裏嗎?

回答

3

,使其運行如您所願有一些變化,使:

import groovyx.gpars.group.DefaultPGroup 
import groovyx.gpars.scheduler.DefaultPool 

def poolGroup = new DefaultPGroup(new DefaultPool(true, 5)) 

def closure = { 
    when {Integer msg -> 
     println("I'm number ${msg} on thread ${Thread.currentThread().name}") 
     Thread.sleep(1000) 
     stop() 
    } 
} 

def integers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] 

def actors = integers.collect { poolGroup.messageHandler(closure) << it } 
actors*.join() 

全部要點文件:https://gist.github.com/wololock/7f1348e04f68710e42d2

然後輸出將是:

I'm number 5 on thread Actor Thread 5 
I'm number 4 on thread Actor Thread 4 
I'm number 1 on thread Actor Thread 1 
I'm number 3 on thread Actor Thread 3 
I'm number 2 on thread Actor Thread 2 
I'm number 6 on thread Actor Thread 3 
I'm number 9 on thread Actor Thread 4 
I'm number 7 on thread Actor Thread 2 
I'm number 8 on thread Actor Thread 5 
I'm number 10 on thread Actor Thread 1 

現在,讓我們一看什麼改變了。首先在你之前的例子中,你只在一個演員身上工作過。您正確定義了poolGroup,但是然後您創建了一個actor,並將計算轉移到此單一實例。要並行運行這些計算,您必須依賴poolGroup,並且只向某個消息處理程序發送輸入 - 池組將處理參與者創建及其生命週期管理。這是我們在做什麼:

def actors = integers.collect { poolGroup.messageHandler(closure) << it } 

它會創建一個以給定輸入開始的actor的集合。池組將注意不超過指定的池大小。然後你必須join每個演員,這可以通過使用groovy的神奇來完成:actors*.join()。感謝應用程序將等待終止,直到所有演員停止計算。這就是爲什麼我們必須添加stop()方法到when關閉消息處理程序的正文 - 沒有它,它不會終止,因爲池組不知道演員是否工作 - 他們可能會等待。爲另一個消息。

替代解決方案

我們也可以考慮使用GPars並行迭代替代解決方案:

import groovyx.gpars.GParsPool 

// This example is dummy, but let's assume that this processor is 
// stateless and shared between threads component. 
class Processor { 
    void process(int number) { 
     println "${Thread.currentThread().name} starting with number ${number}" 
     Thread.sleep(1000) 
    } 
} 

def integers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] 

Processor processor = new Processor() 

GParsPool.withPool 5, { 
    integers.eachParallel { processor.process(it) } 
} 

在這個例子中,你有一個無狀態組件Processor和並聯使用與無國籍Processor一個實例計算多個輸入值。

我試圖找出你在評論中提到的情況,但我不確定單個演員是否可以一次處理多個消息。一個演員的無狀態只意味着它在處理消息的過程中不會改變它的內部狀態,也不能在演員範圍內存儲任何其他信息。如果有人能糾正我,如果我的推理不正確,這將是非常好的:)

我希望這會幫助你。最好!

+0

所以我想我明白你在做什麼,但是如果我們必須每次都創造一個無國籍的演員,那麼這似乎無法擊敗使用無國籍演員的目的? 你會建議也許找到一種替代方法來解決這個問題? – 2014-12-08 14:32:07

+0

嘿帕特里克,感謝您的評論!我已經用不同方法的例子更新了答案,可能更適合您的需求。 – 2014-12-08 15:46:09