2012-08-09 62 views
1

我使用Celery運行抓取一些數據的網絡蜘蛛,之後我需要將這些數據保存在數據庫的某個地方(例如SQLite),但據我所知,我無法在芹菜工作人員之間共享SQLAlchemy會話。你怎麼解決這個問題?哪種方式很常見?在芹菜工人內部存儲數據的常見方法是什麼?

目前我正在嘗試使用Redis作爲數據的中間存儲。

@celery.task 
def run_spider(spider, task): 
    # setup worker 
    logger = logging.getLogger('Spider: %s' % spider.url) 
    spider.meta.update({'logger': logger, 'task_id': int(task.id)}) 

    # push task data inside worker 
    spider.meta.update({'task_request': run_spider.request}) 

    spider.run() 

    task.state = "inactive" 
    task.resolved = datetime.datetime.now() 
    db.session.add(task) 
    db.session.commit() 

編輯:其實我錯了,我不需要共享會話,我需要爲每個芹菜進程/任務新的數據庫連接。

回答

4

我也曾在一個大型芹菜應用程序中使用redis進行持久化。

是很常見的我的任務看起來像這樣:

@task 
def MyTask(sink, *args, **kwargs): 
    data_store = sharded_redis.ShardedRedis(sink) 
    key_helper = helpers.KeyHelper() 
    my_dictionary = do_work() 
    data_store.hmset(key_helper.key_for_my_hash(), my_dictionary) 
  • sharded_redis只是幾個Redis的碎片通過客戶端處理分片鍵的抽象。
  • sink是在確定分片之後用於進行適當連接的元組的列表(host, port)

本質上你是連接和斷開與每個任務(很便宜)從redis,而不是創建一個連接池。使用連接池可以工作,但是你要真正使用芹菜(運行很多併發任務),那麼使用這種方法會更好(在我看來),因爲你冒着耗盡你的風險連接池,特別是如果你在做任何需要更長時間的redis(比如將大型數據集讀入內存)。

與redis的連接相當便宜,所以這應該很好地擴展。我們在一些情況下每分鐘處理數十萬個任務。

+0

證實,Redis已接近完美。 – istinspring 2012-08-09 20:10:22

0

其實我錯了,我不需要共享會話,我需要爲每個芹菜進程/任務創建新數據庫連接。