2016-03-04 156 views
1

我修改了示例隊列,生產者&消費者來自這個Tornado documentation,但它似乎並沒有傳遞給get()的超時參數可以工作,因爲消費者不會等待10拋出異常之前的幾秒鐘。理想情況下,生產者和消費者將同時運行。另外,我不知道是否在超時參數爲秒或毫秒傳:龍捲風異步隊列不等待

from tornado import gen 
from tornado.ioloop import IOLoop 
from tornado.queues import Queue 

q = Queue() 

@gen.coroutine 
def consumer(): 
    try: 
     while True: 
      item = yield q.get(timeout=10000) 
      try: 
       print('Doing work on %s' % item)  
      finally: 
       q.task_done() 
    except gen.TimeoutError: 
     print('timeout') 
     return 

@gen.coroutine 
def producer(): 
    for item in range(5): 
     yield q.put(item) 
     print('Put %s' % item) 
     yield gen.sleep(2) 

@gen.coroutine 
def main(): 
    # Start consumer without waiting (since it never finishes). 
    IOLoop.current().spawn_callback(consumer) 
    yield producer()  # Wait for producer to put all tasks. 
    yield q.join()  # Wait for consumer to finish all tasks. 
    print('Done') 

IOLoop.current().run_sync(main) 

,這裏是它的執行:

Put 0 
Doing work on 0 
timeout 
Put 1 
Put 2 
Put 3 
Put 4 

回答

4

超時

,你可以讀於Tornado' Queue.get docs

返回一個未來,它解決了o有一個項目可用,或在超時後引發tornado.gen.TimeoutError。

但它可能是相當誤導,因爲timeout實際上是一個deadline。因此,它必須要麼datetime.timedelta object指定:

import datetime 
yield q.get(timeout=datetime.timedelta(seconds=1)) 

或絕對時間:

timeout = 1.5 # in seconds, floats acceptable 
deadline = IOLoop.current().time() + timeout 
# in most cases IOLoop time is just time.time() 
# I've used separate variables only for the notion 

yield q.get(timeout=deadline) 

Toro,這是合併到龍捲風,這種說法被稱爲deadline

在您的代碼中,您指定了超時10000,即截止日期爲Thu, 01 Jan 1970 02:46:40 GMT

消費者循環

既然你對整個功能try/except塊,包括while環,當發生TimeoutError你消費停止工作。將異常處理移入while循環。

工作例如:

from tornado import gen 
from tornado.ioloop import IOLoop 
from tornado.queues import Queue 

q = Queue() 

@gen.coroutine 
def consumer(): 
    i = 0 
    while True: 
     i += 1 
     print('get cycle %s' % i) 
     try: 
      item = yield q.get(IOLoop.instance().time() + 3) 
      try: 
       print('Doing work on %s' % item) 
      finally: 
       q.task_done() 
     except gen.TimeoutError: 
      print('timeout') 

@gen.coroutine 
def producer(): 
    for item in range(5): 
     yield q.put(item) 
     print('Put %s' % item) 
     yield gen.sleep(2) 

@gen.coroutine 
def main(): 
    # Start consumer without waiting (since it never finishes). 
    IOLoop.current().spawn_callback(consumer) 
    yield producer()  # Wait for producer to put all tasks. 
    yield q.join()  # Wait for consumer to finish all tasks. 
    print('Done')