2017-10-28 45 views
1

我有500個鏈接要下載,並且想要通過例如10個項目對它們進行批處理。luigi批處理模塊用於直批處理任務

這個僞代碼是怎麼樣的?

class BatchJobTask(luigi.Task) 
    items = luigi.Parameter() 
    def run(self): 
     listURLs = [] 
     with ('urls_chunk', 'r') as urls 
      for line in urls: 
       listURLs.append('http://ggg'+line+'.org') 
      10_urls = listURLs[0:items] #10 items here 
      for i in 10_urls: 
       req = request.get(url) 
       req.contents 
    def output(self): 
     return self.LocalTarger("downloaded_filelist.txt") 

class BatchWorker(luigi.Task) 
    def run(self) 
     # Here I should run BatchJobTask from 0 to 10, next 11 - 21 new etc... 

會是怎樣?

+0

你的網址列表在哪裏? – MattMcKnight

+0

我已經更新了第一篇文章 – GarfieldCat

+0

我的意思是這個URL列表存儲在哪裏?在一個隊列中,一個數據庫,一個文件?你需要做的是弄清楚那件東西有多少,然後從那裏建立你的大塊。我將在下面舉一個例子,但由於您未指定問題的相關部分,因此它不太可能與您的問題相關。 – MattMcKnight

回答

1

這是一種做你喜歡的東西的方法,但是將字符串列表存儲爲一個文件中的單獨行。

import luigi 
import requests 

BATCH_SIZE = 10 


class BatchProcessor(luigi.Task): 
    items = luigi.ListParameter() 
    max = luigi.IntParameter() 

    def requires(self): 
     return None 

    def output(self): 
     return luigi.LocalTarget('processed'+str(max)+'.txt') 

    def run(self): 
     for item in self.items: 
      req = requests.get('http://www.'+item+'.org') 
      # do something useful here 
      req.contents 
     open("processed"+str(max)+".txt",'w').close() 


class BatchCreator(luigi.Task): 
    file_with_urls = luigi.Parameter() 

    def requires(self): 
     required_tasks = [] 
     f = open(self.file_with_urls) 
     batch_index = 0 
     total_index = 0 
     lines = [] 
     while True: 
      line = f.readline() 
      if not line: break 
      total_index += 1 
      if batch_index < BATCH_SIZE: 
       lines.append(line) 
       batch_index += 1 
      else: 
       required_tasks.append(BatchProcessor(batch=lines)) 
       lines = [line] 
       batch_index = 1 
     return required_tasks 

    def output(self): 
     return luigi.LocalTarget(str(self.file_with_urls) + 'processed') 

    def run(self): 
     open(str(self.file_with_urls) + 'processed', 'w').close() 
1

我做到了。

class GetListtask(luigi.Task) 
    def run(self): 
     ... 
    def output(self): 
    return luigi.LocalTarget(self.outputfile) 

class GetJustOneFile(luigi.Task): 
    fid = luigi.IntParameter() 
    def requires(self): 
     pass 

    def run(self): 
     url = 'http://my-server.com/test' + str(self.fid) + '.txt' 
     download_file = requests.get(url, stream=True) 
     with self.output().open('w') as downloaded_file: 
      downloaded_file.write(str(download_file.content)) 

    def output(self): 
     return luigi.LocalTarget("test{}.txt".format(self.fid)) 


class GetAllFiles(luigi.WrapperTask): 
    def requires(self): 
     listoffiles = [] # 0..999 
     for i in range(899): 
      listoffiles.append(i) 
     return [GetJustOneFile(fid=fileid) for fileid in listoffiles] 

這段代碼可怕嗎?

+0

嗯,它不會做配料,但它應該工作。 – MattMcKnight

+0

如何在GetAllFiles而不是預定義列表中從GetListTask輸入文件? – GarfieldCat

+0

這就是我在BatchCreator任務的'require'方法中展示的內容,假設您有一個文件,其中每行文件都是變化的URN組件。 – MattMcKnight