2017-04-25 25 views
0

我有一個桶,其中有一些新的對象會隨着間隔的增加而隨着創建時間的不同而增加。例如:當一個新文件到達S3時,觸發luigi任務

's3://my-bucket/mass/%s/%s/%s/%s/%s_%s.csv' % (time.strftime('%Y'), time.strftime('%m'), time.strftime('%d'), time.strftime('%H'), name, the_time) 

事實上,這些都是Scrapy抓取的輸出。我想觸發一個匹配這些抓取的任務到我有的一​​個主.csv產品目錄文件(稱爲「product_catalog.csv」),該文件也會定期更新。

現在,我有幾個Python腳本,我用每次運行此過程時填寫的全局變量編寫。那些需要成爲導入的屬性。

所以這是需要做的:

1)新的CSV文件顯示了在「S3://我的桶/質量/ ......」一個獨特的鍵名基於時間抓取完成。路易吉看到並開始。
2)「clean.py」由luigi在新文件中運行,因此需要在運行時爲其提供「cleaning.py」參數(S3中顯示的文件)。除了傳遞到下一步之外,結果還保存在S3中。
3)「product_catalog.csv」的最新版本是從數據庫中抽取,並在「matching.py」

使用「cleaning.py」的結果,我知道這可能不會讓完整意義上的。我將根據需要提供編輯以使其更加清晰。

編輯

根據初步的答案,我已經完成了這個是一個拉操作,節省沿途步驟。但現在我很迷茫。應該指出的是,這是我第一次將Python項目捆綁在一起,所以有些東西,比如包括init .py,我正在學習,因爲我這樣做。像往常一樣,這是一個激動人心的成功之路,接下來的路障立即引起混淆。

這裏是我的問題:
1)如何從Scrapy導入蜘蛛對我來說還不清楚。我有大約十幾個人,目標是讓luigi管理所有人的爬行>清理>匹配過程。 Scrapy文件說明包括:

class MySpider(scrapy.Spider): 
    # Your spider definition 

這是什麼意思?在控制蜘蛛的腳本中重寫蜘蛛?這沒有意義,他們的例子是沒有用的。

2)我已經配置Scrapy管道輸出到S3,但luigi似乎也使用output()來做到這一點。我應該使用哪一個,如何讓他們一起玩?

3)Luigi說,CrawlTask​​()成功運行,但這是錯誤的,因爲它在幾秒鐘內完成,抓取通常需要幾分鐘。也沒有對應於成功的輸出文件。

4)我在哪裏提供S3的憑證?

這是我的代碼。我已經評論過那些不能代替我認爲更好的東西。但我的感覺是,有一個偉大的架構,我想做什麼,我只是不明白。

import luigi 
from luigi.s3 import S3Target, S3Client 
import my_matching 
from datetime import datetime 
import os 
import scrapy 
from twisted.internet import reactor 
from scrapy.crawler import CrawlerProcess 
from scrapy.utils.project import get_project_settings 
from my_crawlers.my_crawlers.spiders import my_spider 

class CrawlTask(luigi.Task): 
    crawltime = datetime.now() 
    spider = luigi.Parameter() 
    #vertical = luigi.Parameter() 

    def requires(self): 
     pass 

    def output(self): 
     return luigi.LocalTarget("actual_data_staging/crawl_luigi_test_{}.csv".format(self.crawltime)) 
     #return S3Target("s3://my-bucket/mass/crawl_luigi_test_{}.csv".format(self.crawltime)) 

    def run(self): 
     os.system("scrapy crawl %s" % self.spider) 
     #process = CrawlerProcess(get_project_settings()) 
     #process.crawl("%s" % self.spider) 
     #process.start() 

class FetchPC(luigi.Task): 
    vertical = luigi.Parameter() 

    def output(self): 
     if self.vertical == "product1": 
      return "actual_data_staging/product1_catalog.csv" 
     elif self.vertical == "product2": 
      return "actual_data_staging/product2_catalog.csv" 

class MatchTask(luigi.Task): 
    crawltime = CrawlTask.crawltime 
    vertical = luigi.Parameter() 
    spider = luigi.Parameter() 

    def requires(self): 
     return CrawlTask(spider=self.spider) 
     return FetchPC(vertical=self.vertical) 

    def output(self): 
     return luigi.LocalTarget("actual_data_staging/crawl_luigi_test_matched_{}.csv".format(self.crawltime)) 
     #return S3Target("s3://my-bucket/mass/crawl_luigi_test_matched_{}.csv".format(CrawlTask.crawltime)) 

    def run(self): 
     if self.vertical == 'product1': 
      switch_board(requires.CrawlTask(), requires.FetchPC()) 

MatchTask是指我寫的一個python腳本,它將刮取的產品與我的產品目錄進行比較。它看起來像這樣:

def create_search(value): 
... 
def clean_column(column): 
... 
def color_false_positive(): 
... 
def switch_board(scrape, product_catalog): 
# this function coordinates the whole script 

回答

0

你可以做的是創建一個更大的系統,封裝你的爬網和處理。這樣你就不必檢查s3是否有新的對象。我之前沒有使用luigi,但是也許你可以把你的scrapy工作變成一項任務,當它完成時,執行你的處理任務。無論如何,我不認爲'檢查's3是否是一個好主意,因爲1.你將不得不使用大量的API調用,2.你需要編寫一堆代碼來檢查是否有新的東西'或不,可能會變得毛茸茸的。

+0

對於這個較大的系統,你會推薦什麼?所有東西都是用Python寫的,如果有幫助的話,我會把東西封裝在Docker容器中。 – thaneofcawdor

+0

你不能使用Luigi任務嗎?將數據管道中的每一步都作爲一項任務實施,並根據需要啓動任務。我以前從未使用Luigi,我使用http://www.celeryproject.org/芹菜作爲這個東西,但它也是一個定義和實現任務的系統。至於使用docker,你應該仍然可以使用docker完成所有這些工作,但你可能需要使用docker網絡,並正確配置你的容器。容器被動態地分配IP,因此您可能需要某種方式來執行「服務發現」來查找IP。我相信碼頭工人已經建立了工具來做到這一點。 –

+0

我是否可以讓scrapy向luigi發送一個包含scrapy創建的s3鍵的Task.requires對象的信號?然後luigi可以從s3中檢索到這個密鑰。這看起來很愚蠢,但我如何獲得Python代碼片段(爬蟲和luigi)相互交談並傳遞信息呢? – thaneofcawdor

1

下面是它如何看起來非常粗略的輪廓。我認爲與你的大綱相比,luigi作爲一個拉動系統的主要區別在於,你首先指定了你想要的輸出,然後觸發該輸出依賴的其他任務。所以,與爬行時間結束時的命名不同,在一開始就知道一些事情後,更容易命名。有可能以其他方式來做,只是很多不必要的複雜。

class CrawlTask(luigi.Task): 
    crawltime = luigi.DateParameter() 

    def requires(self): 
     pass 

    def get_filename(self): 
     return "s3://my-bucket/crawl_{}.csv".format(self.crawltime) 

    def output(self): 
     return S3Target(self.get_filename()) 

    def run(self): 
     perform_crawl(s3_filename=self.get_filename()) 


class CleanTask(luigi.Task): 
    crawltime = luigi.DateParameter() 

    def requires(self): 
     return CrawlTask(crawltime=self.crawltime) 

    def get_filename(self): 
     return "s3://my-bucket/clean_crawl_{}.csv".format(self.crawltime) 

    def output(self): 
     return S3Target(self.get_filename()) 

    def run(self): 
     perform_clean(input_file=self.input().path, output_filename=self.get_filename()) 


class MatchTask(luigi.Task): 
    crawltime = luigi.DateParameter() 

    def requires(self): 
     return CleanTask(crawltime=self.crawltime) 

    def output(self): 
     return ##?? whatever output of this task is 

    def run(self): 
     perform_match(input_file=self.input().path) 
+1

那麼更好的方式來考慮它是我想安排最終結果每4小時完成一次,這將導致luigi觸發所有要求來提供結果,而不是每4小時安排一次爬網,在爬網完成後是否會觸發其他進程? – thaneofcawdor

+0

是的,這是正確的思考方式。 – MattMcKnight

+0

那麼我在哪裏引入實際執行這項工作的matching.py和clean.py腳本?我如何將luigi處理的變量(如每個步驟的結果)提供給matching.py?實質上,這些不同的.py腳本應該以.csv文件的形式將大熊貓數據框交給另一個,這些文件可以使用read_csv – thaneofcawdor

相關問題