2016-11-28 32 views
1

我有一些pickle文件,每個日期在2005年到2010年之間。每個文件都包含一個單詞詞典,它們各自的頻率爲該日期。我還有一個「主文件」,包含整個時期的所有獨特詞彙。總共有大約500萬字。具有多個輸入的luigi任務的體系結構

我需要獲取所有數據併爲每個單詞生成一個CSV文件,每個日期將有一個CSV文件。例如,例如文件some_word.txt

2005-01-01,0.0003 
2005-01-02,0.00034 
2005-01-03,0.008 

我有麻煩組織與路易吉框架這一進程。我目前的頂級任務需要一個字,查找每個日期的關聯頻率並將結果存儲在CSV文件中。我想我可以循環訪問我的主文件中的每個單詞並用該單詞運行任務,但我估計這需要幾個月,如果不是更長的話。這是我的頂級AggregateTokenFreqs任務的簡化版本。

class AggregateTokenFreqs(luigi.Task): 
    word = luigi.Parameter() 

    def requires(self): 
     pass # not sure what to require here, master file? 

    def output(self): 
     return luigi.LocalTarget('data/{}.csv'.format(self.word)) 

    def run(self): 
     results = [] 
     for date_ in some_list_of_dates: 
      with open('pickles/{}.p'.format(date_), 'rb') as f: 
       freqs = pickle.load(f) 
       results.append((date_, freqs.get(self.word)) 

     # Write results list to output CSV file 
+1

您需要進行的處理是什麼?例如,您是否計劃在新的一天的數據到達時重新運行日常流程?如果你只需要運行一次,運行luigi可能沒有意義。無論如何,你最好使用多處理技術。 – MattMcKnight

回答

0

@MattMcKnight說你可能會更好使用多處理。但是,如果您想要使用Luigi,您可以執行以下操作:

  • Luigi具有您配置的工人的概念。這是本地進程並行運行不同任務的數量。
  • 你可以模擬任務,而不是通過所有醬菜「循環」,傳遞一個單一的醬菜到任務(作爲參數)。您必須將結果寫入具有唯一名稱的目錄中的TSV。
  • 有一個循環,每個pickle(日期)創建一個任務。配置工人數量(即5)。這樣你就可以同時處理5個文件。
  • 您將需要一個額外的任務,將所有單個CSV文件「加入」爲一個。

希望這會有所幫助。

相關問題