2014-10-09 35 views
1

我想用芹菜插入大數據在我的mongodb,但問題是併發在MongoDB中。如果我一次發送多個任務到芹菜,一部分數據將被插入到mongodb中,而另一些數據則不會。我想這是因爲mongodb在插入操作時鎖定了數據庫,但我需要一種解決方案來發送相同類型的多個任務以在數據庫中插入數據。像檢查數據庫是否被鎖定,如果它正在等待它解鎖。這裏是我的代碼的一部分:如何使用芹菜插入數據到mongodb使用mongoengine

@celery.task(name='celery_tasks.add_book_product') 
def add_book_product(product_dict, store_id): 

    connect(DefaultConfig.MONGODB_DB, host=DefaultConfig.MONGODB_HOST) 

    store_obj = Store.objects.get(pk=store_id) 

    try: 
     book = Books.objects.get(pk=product_dict['RawBook']) 

     try: 
      product_obj = Product.objects.get(store=store_obj, related_book=book, kind='book') 
      print("Product {} found for store {}".format(product_obj.id, store_obj.id)) 
      product_obj.count = int(product_dict['count']) 
      product_obj.buy_price = int(product_dict['buy_book']) 
      product_obj.sell_price = int(product_dict['sell_book']) 

      product_obj.save() 

     except (DoesNotExist, ValidationError): 
      product_obj = Product(store=store_obj, 
            related_book=book, 
            kind='book', 
            count=int(product_dict['count']), 
            buy_price=int(product_dict['buy_book']), 
            sell_price=int(product_dict['sell_book']), 
            name=book.name_fa) 

      product_obj.save() 

      print("Appending books to store obj...") 
      store_obj.products.append(product_obj) 
      store_obj.save() 
      print("Appending books to store obj done") 

     return "Product {} saved for store {}".format(product_obj.id, store_obj.id) 
    except (DoesNotExist, ValidationError): 
     traceback.print_exc() 
     return "Product with raw book {} does not exist.".format(product_dict['RawBook']) 

回答

2

默認情況下,多處理用於執行芹菜任務的併發執行。但有兩種方法可以確保在任何給定時間只執行一項任務。

解決方案1:

當你開始

celery -A your_app worker -l info 

芹菜工人默認併發等於你的機器有內核的數量。因此,如果你開始這樣的工作人員

celery -A your_app worker -l info -c 1 

它在任何給定的時間只運行一個任務。如果您還有其他一些必須執行的任務,您可以啓動一個新的隊列並分配一個工作人員來完成。

解決方案2:

這是有點複雜。你需要在你的任務中使用鎖,就像這樣。

if acquire_lock(): 
    try: 
     #do something 
    finally: 
     release_lock() 
    return 

你可以在Celery documentation瞭解更多。