2014-10-22 44 views
0

如果這僅僅是我理解錯誤的結果,我表示歉意。我一直在四處搜尋並閱讀文檔,並且一直未能想出適用於我的解決方案。在樹形結構上遞歸調用任務

我有一個樹結構,其中每個節點可以有任意數量的子節點。對於每個節點,一個新的芹菜任務將被實例化以創建它,然後它將查看所​​有的孩子並實例化新任務來創建這些任務。這樣做的原因是爲了更好地利用芹菜的多線程特性。只有遞歸創建整個樹的單個任務似乎利用了單個線程。

雖然我已經能夠以這種方式設置我的代碼,但我的問題是,我在創建任務中有一些依賴項,直到整個結構完成創建才能執行。代碼看起來是這樣的:

@app.task 
def initial_task(tree_data): 
    jobs = [] 
    for node in tree_data: 
     jobs.append(recursive_task.s(node)) 
    job = group(jobs) 
    result = job.apply_async() 

    # Block execution until group is finished 
    while not result.ready(): 
     time.sleep(0.5) 

    ... do dependent stuff ... 

@app.task 
def recursive_task(node, parent=None): 
    # Create node object 
    node_obj = Node(node.name, parent=parent) 

    jobs = [] 
    for child in node.children: 
     jobs.append(recursive_task.s(child, node_obj)) 
    job = group(jobs) 
    result = job.apply_async() 

    return node_obj 

我的問題是,所有的孩子們的子任務不會阻止第一組任務的完成,我不知道該怎麼給力,要成爲案子。對此事的任何幫助將非常感激。

因爲創建子項時我需要node_obj的ID,所以我不能簡單地遞歸樹並鏈接任務。

更新: 我已經改變了一些代碼,嘗試並導致結果發生變化。下面的代碼會導致所有的孩子(包括孫子,曾孫等),是頂級節點的直接子:

@app.task 
def initial_task(tree_data): 
    def _recursive_link_task(task_set, children): 
     for child in children: 
      task_set.link(create_node.s(child)) 

      if child.children: 
       _recursive_link_task(task_set, child.children) 


    for node in tree_data: 
     s = create_node.s(None, node) 
     if node.children: 
      _recursive_link_task(s, node.children) 
     s.apply_async() 

@app.task 
def create_node(parent, node): 
    node_obj = Node(node.name, parent=parent) 
    return (node_obj,) 

我本來以爲我可能有更多一點的財富與上面的代碼片段代碼 - 但因爲它只是傳遞給所有後續任務的初始節點對象,所以我仍然沒有進一步嘗試獲取此樹結構。

回答

1

使用chords執行依賴於一堆任務結果的任務。

因爲我無法準確理解您需要如何調用遞歸任務,所以我實現了合併排序的reference示例。

注意這將無法在芹菜3.2.0+作爲調用get內任務將導致異常。

from celery import Celery, chord 
app = Celery('tasks', backend='amqp', broker='amqp://') 
app.conf.CELERY_RESULT_BACKEND = 'amqp' 


def mergesort(list_obj): 
    '''normal mergesort 
    ''' 
    if len(list_obj) <= 1: 
     return list_obj 
    middle = len(list_obj)/2 
    left, right = list_obj[:middle], list_obj[middle:] 
    return list(merge(list(mergesort(left)), list(mergesort(right)))) 

def merge(left, right): 
    '''normal merge 
    ''' 
    while 1: 
     if left == []: 
      for j in right: 
       yield j 
      break 
     elif right == []: 
      for j in left: 
       yield j 
      break 
     elif left[0] < right[0]: 
      yield left.pop(0) 
     else: 
      yield right.pop(0) 

def merge2(left_r, right_r): 
    '''celery merge 
    ''' 
    left =left_r.get() 
    right = right_r.get() 
    while 1: 
     if left == []: 
      for j in right: 
       yield j 
      break 
     elif right == []: 
      for j in left: 
       yield j 
      break 
     elif left[0] < right[0]: 
      yield left.pop(0) 
     else: 
      yield right.pop(0) 
@app.task 
def merge_c(in_list): 
    '''celery merge 
    ''' 
    #unpack 
    print '*'*21 + str(in_list) 
    left, right = in_list 
    return list(merge2(left, right)) 

@app.task 
def same_object(l_obj): 
    '''helper function to convert list to `result` 
    ''' 
    return l_obj 

@app.task 
def mergesort_c(list_obj): 
    '''celery mergesort 
    ''' 
    if len(list_obj) <= 1: 
     # make sure that you return a `result` object for merge 
     return same_object.delay(list_obj) 
    middle = len(list_obj)/2 
    left, right = list_obj[:middle], list_obj[middle:] 
    # finish mergesort (left) and mergesort(right) and merge them 
    res = chord([mergesort_c.s(left), mergesort_c.s(right)])(merge_c.s()) 
    return res 

if __name__ == '__main__': 
    l = [2,1, 3] 
    #normal mergesort 
    print mergesort(l) #[1, 2, 3, 3, 5] 
    # with celery 
    res = mergesort_c(l) 
    print res.get() 
+0

感謝您的支持。如果我有機會,我會看看是否可以將它應用於我正在編寫的代碼。我試過鏈接任務 - 也許和絃會提供更好的結果。 – djbp 2014-10-28 10:35:30

+0

祝你好運。與此同時,我將努力尋找一個不會被新芹菜棄用的答案:) – srj 2014-10-28 14:27:17