我有一個桶,其中有一些新的對象會隨着間隔的增加而隨着創建時間的不同而增加。例如:當一個新文件到達S3時,觸發luigi任務
's3://my-bucket/mass/%s/%s/%s/%s/%s_%s.csv' % (time.strftime('%Y'), time.strftime('%m'), time.strftime('%d'), time.strftime('%H'), name, the_time)
事實上,這些都是Scrapy抓取的輸出。我想觸發一個匹配這些抓取的任務到我有的一個主.csv產品目錄文件(稱爲「product_catalog.csv」),該文件也會定期更新。
現在,我有幾個Python腳本,我用每次運行此過程時填寫的全局變量編寫。那些需要成爲導入的屬性。
所以這是需要做的:
1)新的CSV文件顯示了在「S3://我的桶/質量/ ......」一個獨特的鍵名基於時間抓取完成。路易吉看到並開始。
2)「clean.py」由luigi在新文件中運行,因此需要在運行時爲其提供「cleaning.py」參數(S3中顯示的文件)。除了傳遞到下一步之外,結果還保存在S3中。
3)「product_catalog.csv」的最新版本是從數據庫中抽取,並在「matching.py」
使用「cleaning.py」的結果,我知道這可能不會讓完整意義上的。我將根據需要提供編輯以使其更加清晰。
編輯
根據初步的答案,我已經完成了這個是一個拉操作,節省沿途步驟。但現在我很迷茫。應該指出的是,這是我第一次將Python項目捆綁在一起,所以有些東西,比如包括init .py,我正在學習,因爲我這樣做。像往常一樣,這是一個激動人心的成功之路,接下來的路障立即引起混淆。
這裏是我的問題:
1)如何從Scrapy導入蜘蛛對我來說還不清楚。我有大約十幾個人,目標是讓luigi管理所有人的爬行>清理>匹配過程。 Scrapy文件說明包括:
class MySpider(scrapy.Spider):
# Your spider definition
這是什麼意思?在控制蜘蛛的腳本中重寫蜘蛛?這沒有意義,他們的例子是沒有用的。
2)我已經配置Scrapy管道輸出到S3,但luigi似乎也使用output()來做到這一點。我應該使用哪一個,如何讓他們一起玩?
3)Luigi說,CrawlTask()成功運行,但這是錯誤的,因爲它在幾秒鐘內完成,抓取通常需要幾分鐘。也沒有對應於成功的輸出文件。
4)我在哪裏提供S3的憑證?
這是我的代碼。我已經評論過那些不能代替我認爲更好的東西。但我的感覺是,有一個偉大的架構,我想做什麼,我只是不明白。
import luigi
from luigi.s3 import S3Target, S3Client
import my_matching
from datetime import datetime
import os
import scrapy
from twisted.internet import reactor
from scrapy.crawler import CrawlerProcess
from scrapy.utils.project import get_project_settings
from my_crawlers.my_crawlers.spiders import my_spider
class CrawlTask(luigi.Task):
crawltime = datetime.now()
spider = luigi.Parameter()
#vertical = luigi.Parameter()
def requires(self):
pass
def output(self):
return luigi.LocalTarget("actual_data_staging/crawl_luigi_test_{}.csv".format(self.crawltime))
#return S3Target("s3://my-bucket/mass/crawl_luigi_test_{}.csv".format(self.crawltime))
def run(self):
os.system("scrapy crawl %s" % self.spider)
#process = CrawlerProcess(get_project_settings())
#process.crawl("%s" % self.spider)
#process.start()
class FetchPC(luigi.Task):
vertical = luigi.Parameter()
def output(self):
if self.vertical == "product1":
return "actual_data_staging/product1_catalog.csv"
elif self.vertical == "product2":
return "actual_data_staging/product2_catalog.csv"
class MatchTask(luigi.Task):
crawltime = CrawlTask.crawltime
vertical = luigi.Parameter()
spider = luigi.Parameter()
def requires(self):
return CrawlTask(spider=self.spider)
return FetchPC(vertical=self.vertical)
def output(self):
return luigi.LocalTarget("actual_data_staging/crawl_luigi_test_matched_{}.csv".format(self.crawltime))
#return S3Target("s3://my-bucket/mass/crawl_luigi_test_matched_{}.csv".format(CrawlTask.crawltime))
def run(self):
if self.vertical == 'product1':
switch_board(requires.CrawlTask(), requires.FetchPC())
MatchTask是指我寫的一個python腳本,它將刮取的產品與我的產品目錄進行比較。它看起來像這樣:
def create_search(value):
...
def clean_column(column):
...
def color_false_positive():
...
def switch_board(scrape, product_catalog):
# this function coordinates the whole script
對於這個較大的系統,你會推薦什麼?所有東西都是用Python寫的,如果有幫助的話,我會把東西封裝在Docker容器中。 – thaneofcawdor
你不能使用Luigi任務嗎?將數據管道中的每一步都作爲一項任務實施,並根據需要啓動任務。我以前從未使用Luigi,我使用http://www.celeryproject.org/芹菜作爲這個東西,但它也是一個定義和實現任務的系統。至於使用docker,你應該仍然可以使用docker完成所有這些工作,但你可能需要使用docker網絡,並正確配置你的容器。容器被動態地分配IP,因此您可能需要某種方式來執行「服務發現」來查找IP。我相信碼頭工人已經建立了工具來做到這一點。 –
我是否可以讓scrapy向luigi發送一個包含scrapy創建的s3鍵的Task.requires對象的信號?然後luigi可以從s3中檢索到這個密鑰。這看起來很愚蠢,但我如何獲得Python代碼片段(爬蟲和luigi)相互交談並傳遞信息呢? – thaneofcawdor