2017-01-13 23 views
0

需要一些幫助。看看Gpars數據流/管道,但我不明白的東西gpars dataflowQueues處理或管道似乎只能在df.val請求上觸發

如果你看看下面的例子(我已經完成了這與運營商,piplines,chainWith和同樣的問題)。

在這個例子中,我已經使用過任務,但可能很容易就沒有,同樣的問題也體現出來。在這個例子中,我設置了兩個DataflowQueue,一個用於初始條件,另一個用於對謂詞進行評估的結果。然後我佈置一個管道,用於評估反對對謂詞的輸入的輸入(甚至測試),並存儲結果的輸出結果隊列

具有設置管道和張貼的某些條目到第一隊列我認爲該條目將在數據可用的情況下處理(這不適用於操作員版本),如您所見,在將條目寫入sessionQ之後,我將resultQ的大小測試爲零(如果我刪除了任務,它仍然爲真)。所以寫入數據不會'觸發'處理。

第一個任務將一些條目保存到一個隊列中。

import groovyx.gpars.dataflow.Dataflow 
import groovyx.gpars.dataflow.DataflowQueue 
import groovyx.gpars.dataflow.DataflowVariable 
import groovyx.gpars.dataflow.Promise 

/** 
* Created by will on 13/01/2017. 
*/ 

def iValues = [1,2,3,4,5] 

DataflowQueue sessionQ = new DataflowQueue() 
DataflowQueue resultQ = new DataflowQueue() 

Dataflow.task { 
    println "setup task: set initial conditions list for rule predicate " 
    iValues.each {sessionQ << it} 
} 

Closure evenPredicate = {it %2 == 0} 

//layout pipeline 
sessionQ | evenPredicate | resultQ 

assert resultQ.iterator().size() == 0 

Promise ans = Dataflow.task { 
    println "result task : get three values from result q " 
    def outlist = [] 
    3.times { 
     def res = resultQ.val 
     println "got result $res" 
     outlist << res 
    } 
    assert sessionQ.iterator().size() == 0 
    assert resultQ.iterator().size() == 2 
    outlist 
} 

println "ans list is $ans.val" 
assert resultQ.iterator().size() == 2 

其僅在第二任務/ chainWith等 - 在第二隊列,你調用一個.VAL(或得到()),發動機開始運行,所有的條目從第一隊列處理,結果綁定到resultQ。

您可以從斷言中看到這一點,因爲一旦第一個觸發器(.val)同步調用使引擎運行並處理啓動sessionQ中的所有綁定條目。

這是一個問題,直到你運行第一個.val調用 - 如果你做一個poll()或者resultQ.interator.size(),例如它是空的並且是unbound,size()= 0。所以你不能寫

for (dfRes in resultQ) {//do something with dfRes} 

它始終爲空,直到你消耗sessionQ中的第一項。我不明白爲什麼?在條目綁定到第一個數據流隊列後,我認爲這些項目會在它們變得可用(被綁定)時被使用 - 但它們不是。

現在這很棘手,因爲您無法獲取條目,檢查結果的大小,對resultQ執行poll(),因爲它將會失敗,直到讀取到sessionQ中的第一個DF。

我已經結束了不得不使用初始值數組的大小(告訴我保存到隊列中的條目)作爲唯一的方法來讀取相同的數字從resultQ清空它(在上面我只從結果Q中消耗了3條記錄,並且斷言顯示resultQ中仍有2條記錄(但是隻有在第一個.val調用完成之後,如果您註釋該行,則所有斷言都會失敗)

我試圖與Dataflow.operator,管道等,並得到同樣的問題。爲什麼不爲每個輸入綁定到SessionQ工作得到處理?

最後在流水線的情況下,那裏有一個.complete ( )方法,如果您在管道中處理閉包{},則保持打開狀態(!complete()),但是當您運行像.binaryChoice()這樣的方法時,它會將管道標記爲完整,並且無法添加任何進一步的操作。爲什麼這樣做?

我不明白這個狀態是什麼意思(當然不會有更多的處理會發生),如果你嘗試在這種方法之後嘗試另一個步驟,就會拋出一個異常。

無論哪種方式 - 我想這樣

Pipeline pipeLine = new Pipeline(Q) 
pipeLine.tap(log).binaryChoice(evenPathQ, oddPathQ) {println "$it %2 is ${it%2 ==0}"; (it%2 == 0) } 

管道線路然而,當您將值綁定至Q什麼也沒有發生 - 直到你吃像

odd.val 

輸出時,一下子把管道'運行'並處理存儲在Q中的所有DF項目。

沒有我試過kickstarts工作的調度 - 除了第一個.val消耗

可以解釋爲什麼這是,我必須在這裏忽略這一點,但是這個'什麼也不做',直到第一個條目被讀取,並不是我期望的,並且使任何大小的評估(.iterator.size(), poll()等)在DataflowWriteChannel目標上調用類型。

我會欣賞這方面的任何幫助 - 我已經爲此掙扎了兩天,並且無處可去。我也查看了所有Gpars測試,他們只是調用.val與輸入綁定的次數相同 - 所以不要顯示我描述的問題。

瓦茨拉夫·佩赫,或任何其他Gpars大師誰看的問題,我將不勝感激任何幫助的洞察力,讓我在這個駝峯

問候提前

回答

1

一個小的修改(添加延遲)就在你聲稱大小爲0之前,將顯示計算是由寫入的數據觸發的:

//layout pipeline 
sessionQ | evenPredicate | resultQ 
sleep 5000 
assert resultQ.iterator().size() == 0 
+0

謝謝瓦茨拉夫,我早上在火車上,也許是時候讓房地產'趕上'將清除我的問題。我嘗試了幾次攻擊,看起來像沒有按預期處理工作 - 回報各種'嘗試'模式的調查結果我嘗試過,看看是否有竅門 –

+0

我增加了睡眠差距500毫秒,然後開始看例子正確和在其他線程的工作有機會做背景的東西。儘管 –

+0

完整狀態限制管道構建器只允許構建有效的管道,但我仍然不確定管道上的「完整」狀態如何保護。當您不能再向其添加操作員時,管道將視爲已完成。例如,通過調用binaryChoice(),將兩個通道掛接到管道末端,並且所有其他管道操作都必須附加到這兩個通道中的任一個,而不是原始管道。 –