2012-03-09 50 views
0

在我的應用程序中,我實現了由5個不同「處理單元」組成的非常粗糙的工作流程。目前代碼的結構如下:Python:分支工作流程/管道的現有解決方案?

def run(self, result_first_step=None, result_second_step=None): 

    config = read_workflow_config("config.ini") 

    if config.first_step: 
     result_first_step = run_process_1() 

    if config.second_step and result_first_step is not None: 
     result_second_step = run_process_2(result_first_step) 
    else: 
     raise Exception("Missing required data") 

    if config.third_step: 
     result_third_step = run_process_3(result_first_step, result_second_step) 
    else: 
     result_third_step = None 

    collect_results(result_first_step, result_second_step, result_third_step) 

等等。代碼的工作原理很難維護,而且非常脆弱(處理比這個簡單的例子複雜得多)。因此,我一直在考慮採用另一種策略,即制定適當的工作流程:

  • 短路:我可以不給任何數據給啓動過程或兩種不同類型的數據。在後一種情況下,工作流的短路,並跳過一些處理
  • 普通的對象:東西一樣配置提供給所有單元
  • 條件:取決於配置,一些位可以被跳過

是否有一個Python庫可用於執行這些類型的工作流程,還是我應該推出自己的?我一直在嘗試pyutilib.workflow,但它不能正確地支持一個通用配置對象,而不能將它傳遞給所有工作者(乏味)。

注意:這是針對庫/命令行應用程序的,因此基於Web的工作流程解決方案並不合適。

+0

你有沒有試着用搜索引擎這個問題?你發現了什麼問題? – Marcin 2012-03-09 17:13:05

+0

你寫的方式看起來像'run_process_2',除非你已經有'run_process_1'。真的嗎? – katrielalex 2012-03-09 17:13:17

+0

的確,我會調整它以更好地展示我的想法。編輯:更改示例如何可以短路。 – Einar 2012-03-09 17:16:49

回答

0

您可以將運行方法變成一個生成器;

def run(self) 
    result_first_step = run_process_1() 
    yield result_first_step 
    result_second_step = run_process_2(result_first_step) 
    yield result_second_step 
    result_third_step = run_process_3(result_first_step, result_second_step) 
    collect_results(result_first_step, result_second_step, result_third_step) 
0

有相當的範圍內半個頁面的方法在Python, 管道向...
這裏的主要思想: 頂部,放置在一個字典中所有的步驟定義;
然後管道(例如, 「C A T」)做的步驟C,A,T。

class Pipelinesimple: 
    """p = Pipelinesimple(funcdict); p.run("C A T") = C(X) | A | T 

    funcdict = dict(A = Afunc, B = Bfunc ... Z = Zfunc) 
    pipeline = Pipelinesimple(funcdict) 
    cat = pipeline.run("C A T", X) # C(X) | A | T, i.e. T(A(C(X))) 
    dog = pipeline.run("D O G", X, **kw) # D(X, **kw) | O(**kw) | G(**kw) 
    """ 

def __init__(self, funcdict): 
    self.funcdict = funcdict # funcs or functors of X 

def run(self, steps, X, **commonargs): 
    """ steps "C A T" or ["C", "A", "T"] 
     all funcs(X, **commonargs) 
    """ 

    if isinstance(steps, basestring): 
     steps = steps.split() # "C A T" -> ["C", "A", "T"] 
    for step in steps: 
     func = self.funcdict(step) 
     # if X is None: ... 
     X = func(X, **commonargs) 
    return X 

接着,有給予不同的參數 到的不同步驟的幾種方法。

的一種方法是解析多行字符串如

""" C ca=5 cb=6 ... 
    A aa=1 ... 
    T ... 
""" 

另一種方法是採取的功能/功能名稱/ PARAM類型的字典的列表,像

pipeline.run(["C", dict(ca=5, cb=6), lambda ..., "T", dict(ta=3) ]) 

第三種是分裂PARAMS 「A__aa B__ba ...」的方式 sklearn.pipeline.Pipeline。 呢。 (這是面向機器學習,但你可以複製管道部件。)

這些都有相當明顯的優點和缺點。

一個大型人才社區可以拿出一打原型 很快解決問題[管道]。
但是減少到二三個需要永遠。

無論你採用何種方式, 提供記錄所有參數運行的方式。

參見:
FilterPype
nipype

+0

有趣的做法,我會分支我的代碼,並給這個旋轉。 – Einar 2012-04-27 08:39:11

+0

@Einar,你會選擇哪種方法? – denis 2012-04-27 11:14:15