2014-06-18 68 views
2

我現在有大量文檔要處理,並且正在使用Python RQ來並行化任務。Python RQ:回調模式

我想在每個文檔上執行不同的操作時完成工作流程。例如:A - >B - >C表示在A完成後,將文檔傳遞到功能A,繼續執行B,並返回C

但是,Python RQ似乎並沒有很好地支持管道的東西。

這是一個簡單但有點骯髒的做到這一點。總之,管道中的每個函數都以嵌套的方式調用其下一個函數。

例如,對於管道ABC

在頂層,有些代碼是這樣寫的:

q.enqueue(A, the_doc)

其中q是Queue實例,並在功能A有這樣的代碼:

q.enqueue(B, the_doc)

而且在B中,有這樣的事情:

q.enqueue(C, the_doc)

有沒有其他方式比這更優雅?例如一些代碼ONE功能:

q.enqueue(A, the_doc) q.enqueue(B, the_doc, after = A) q.enqueue(C, the_doc, after= B)

depends_on參數是一個最接近我的要求,但是,運行類似:

A_job = q.enqueue(A, the_doc) q.enqueue(B, depends_on=A_job)

將無法​​正常工作。在執行A_job = q.enqueue(A, the_doc)後立即執行q.enqueue(B, depends_on=A_job)。在B入隊時,A的結果可能沒有準備好,因爲它需要時間來處理。

PS:

如果Python的RQ是不是在這個真的很不錯,我可以用什麼其他工具在Python來達到同樣的目的:

  1. 循環賽並行
  2. 流水線處理支持
+1

使用3個隊列怎麼樣?一個用於工作a,另一個用於工作b,最後一個用於工作c,唯一的是當工作a結束時,文檔在工作b隊列中排隊,等等...... – Marcel

+0

即使'q.enqueue (B,depends_on = A_job)'作業B將在A完成後處理。當它*處理*而不是*入隊*時,重要的不是什麼? –

+0

@Marcel如何使用3個隊列與使用單個隊列不同?它將如何解決這個問題? –

回答

0

depends_on參數是一個最接近我的要求,但是, 運行是這樣的:

A_job = q.enqueue(A,the_doc)q.enqueue(B,depends_on = A_job)

將無法​​正常工作。至於q。在執行A_job = q.enqueue(A,the_doc)後立即執行入隊(B,depends_on = A_job) 。在B被 排隊時,A的結果可能沒有準備好,因爲 過程需要時間。

對於這種情況,q.enqueue(B,depends_on = A_job)將在A_job完成後運行。如果結果沒有準備好,q.enqueue(B,depends_on = A_job)將一直等到它準備好。


它不支持開箱即用,但使用其他技術是可能的。在我的情況下,我使用緩存來跟蹤鏈中的前一個作業,所以當我們想要排隊一個新函數(在後面運行)時,我們可以在調用enqueue時正確設置它的'depends_on'參數( )

請注意使用附加參數排隊:'timeout,result_ttl,ttl'。因爲我在rq上長時間工作,所以使用了這些。你可以參考他們在python rq文檔中的使用。

我用其從python rq

# main.py 
    def process_job(): 
     ... 

     # Create a cache key for every chain of methods you want to call. 
     # NOTE: I used this for web development, in your case you may want 
     # to use a variable or a database, not caching 

     # Number of time to cache and keep the results in rq 
     TWO_HRS = 60 * 60 * 2 

     cache_key = 'update-data-key-%s' % obj.id 
     previous_job_id = cache.get(cache_key) 
     job = django_rq.enqueue(update_metadata, 
           campaign=campaign, 
           list=chosen_list, 
           depends_on=previous_job_id, 
           timeout=TWO_HRS, 
           result_ttl=TWO_HRS, 
           ttl=TWO_HRS) 

     # Set the value for the most recent finished job, so the next function 
     # in the chain can set the proper value for 'depends_on' 
     cache.set(token_key, job.id, TWO_HRS) 

    # utils.py 
    def update_metadata(campaign, list): 
     # Your code goes here to update the campaign object with the list object 
     pass 

'depends_on' 衍生django_rq.enqueue() - 從所述rq docs

depends_on - 指定另一個作業(或作業ID),必須完成 作業將被排隊等待

1

在B入隊時,A的結果可能沒有準備好,因爲它需要時間來處理。

我不確定這是否真的如果您最初發布的問題,但在任何情況下,現在不是這樣。實際上,depends_on功能完全適用於您所描述的工作流程。

確實,這兩個函數是連續立即執行的。

A_job = q.enqueue(A, the_doc) 
B_job = q.enqueue(B, depends_on=A_job) 

但工人將不會執行B直到A完成。直到A_job成功執行,B.status == 'deferred'。一旦A.status == 'finished',則B將開始運行。

這意味着BC可以訪問和他們的依賴關係是這樣的結果操作:

import time 
from rq import Queue, get_current_job 
from redis import StrictRedis 

conn = StrictRedis() 
q = Queue('high', connection=conn) 

def A(): 
    time.sleep(100) 
    return 'result A' 

def B(): 
    time.sleep(100) 
    current_job = get_current_job(conn) 
    a_job_id = current_job.dependencies[0].id 
    a_job_result = q.fetch_job(a_job_id).result 
    assert a_job_result == 'result A' 
    return a_job_result + ' result B' 


def C(): 
    time.sleep(100) 
    current_job = get_current_job(conn) 
    b_job_id = current_job.dependencies[0].id 
    b_job_result = q.fetch_job(b_job_id).result 
    assert b_job_result == 'result A result B' 
    return b_job_result + ' result C' 

工人最終將打印'result A result B result C'

另外,如果隊列中有許多作業,並且B在執行之前可能會等待一段時間,則可能要顯着增加result_ttl或使其與result_ttl=-1無限期地變化。否則,在爲result_ttl設置了很多秒後,A的結果將被清除,在這種情況下,B將不再能夠訪問它並返回所需的結果。

但是,設置result_ttl=-1具有重要的內存影響。這意味着您的作業結果將永遠不會自動清除,內存將按比例增長,直到您手動從Redis中刪除這些結果爲止。