2015-09-10 68 views
0

我有4個基本上構建查詢並執行它們的函數。我想讓它們使用asyncio同時運行。我的asyncio實現似乎是正確的,因爲非MongoDB任務運行時應該如此(例如asyncio.sleep())。以下是代碼:使用asyncio運行併發mongoengine查詢

loop = asyncio.new_event_loop() 
asyncio.set_event_loop(loop) 
tasks = [ 
       service.async_get_associate_opportunity_count_by_user(me, criteria), 
       service.get_new_associate_opportunity_count_by_user(me, criteria), 
       service.async_get_associate_favorites_count(me, criteria=dict()), 
       service.get_group_matched_opportunities_count_by_user(me, criteria) 
      ] 

available, new, favorites, group_matched = loop.run_until_complete(asyncio.gather(*tasks)) 

stats['opportunities']['available'] = available 
stats['opportunities']['new'] = new 
stats['opportunities']['favorites'] = favorites 
stats['opportunities']['group_matched'] = group_matched 

loop.close() 


# functions written in other file 

@asyncio.coroutine 
def async_get_ass(self, user, criteria=None, **kwargs): 
    start_time = time.time() 
    query = **query that gets built from some other functions** 

    opportunities = Opportunity.objects(query).count() 
    run_time = time.time() - start_time 
    print("runtime of available: {}".format(run_time)) 
    yield from asyncio.sleep(2) 
    return opportunities 

@asyncio.coroutine 
def get_new_associate_opportunity_count_by_user(self, user, criteria=None, **kwargs): 
    start_time = time.time() 
    query = **query that gets built from some other functions** 
    opportunities = Opportunity.objects(query).count() 
    run_time = time.time() - start_time 
    print("runtime of new: {}".format(run_time)) 
    yield from asyncio.sleep(2) 
    return opportunities 

@asyncio.coroutine 
def async_get_associate_favorites_count(self, user, criteria={}, **kwargs): 
    start_time = time.time() 
    query = **query that gets built from some other functions** 
    favorites = Opportunity.objects(query).count() 
    run_time = time.time() - start_time 
    print("runtime of favorites: {}".format(run_time)) 
    yield from asyncio.sleep(2) 
    return favorites 

@asyncio.coroutine 
def get_group_matched_opportunities_count_by_user(self, user, criteria=None, **kwargs): 
    start_time = time.time() 
    query = **query that gets built from some other functions** 
    opportunities = Opportunity.objects(query).count() 
    run_time = time.time() - start_time 
    print("runtime of group matched: {}".format(run_time)) 
    yield from asyncio.sleep(2) 
    return opportunities 

yield from asyncio.sleep(2)只是表明函數是異步運行的。這裏是在終端上的輸出:

組的運行時間相匹配:喜好0.11431598663330078 運行時:0.0029871463775634766 時間戳功能的運行時間:新0.0004897117614746094 運行時:0.13006806373596191 總運行時間:可用0.15225648880004883 運行時2403.2700061798096 根據我的理解,除了由於睡眠功能而增加到總運行時間的2000ms之外,由於所有功能中的最大運行時間都是該值,所以它不應該超過155-160ms。

我目前正在研究motorengine(mongoengine 0.9.0的一個端口),它顯然可以啓用異步mongodb查詢,但我認爲我將無法使用它,因爲我的模型已經使用mongoengine定義。有沒有解決這個問題的方法?

回答

1

您的查詢不是並行運行的原因是因爲無論何時在您的協同程序中運行Opportunity.objects(query).count(),整個事件循環都會阻塞,因爲這些方法正在阻止IO。

所以你需要一個可以做異步/非阻塞IO的mongodb驅動程序。您嘗試使用motorengine的方法正確,但據我所知它是爲Tornado異步框架編寫的。要使它與asyncio一起工作,您將不得不連接 Tornado和asycnio。見,http://tornado.readthedocs.org/en/latest/asyncio.html關於如何做到這一點。

另一種選擇是使用asyncio-mongo,但它沒有mongoengine兼容ORM,因此您可能必須重寫大部分代碼。