2017-08-27 46 views
0

我熟悉事件驅動的編程,但是我遇到了這個問題,我終止了可能的解決方案。我讀了龍捲風的文檔,我試着用:使用Tornado web框架執行異步任務

  1. 期貨
  2. gen.coroutine
  3. 異步
  4. add_timeout

,但我沒能解決以下問題:

  • 我有一個websocket服務器這只是聽新郵件 並根據消息類型調用特定的功能

    類WebSocketHandler(tornado.websocket.WebSocketHandler):

    ... 
    
    def on_message(self, message): 
        if message['type'] is X: 
         self.write(functionA(message['data'])) 
        elif message['type'] is Y: 
         self.write(functionB(message['data'])) 
    ... 
    

問題是當一個計算代價函數是執行,讓說功能A,它可能需要長達5分鐘終止

def functionA(message): 
    params = extract_params(message) 
    cmd = "computationally_expensive_tool" 
    out = check_output(cmd, shell=True, stderr=STDOUT, cwd=working_dir) 
    ... 
    return json.dumps({ 
         "error": False, 
         "message": "computationally_expensive_tool_execution_terminated", 
         "type": X 
        }) 

我的問題是我該如何執行該函數n以異步的方式處理,這樣我仍然可以處理其他消息和functionA的結果。

+0

你能提供一個你的功能是什麼樣子的例子嗎? –

+0

嗨@ notorious.no,謝謝你的關注。我添加了一些關於函數A和我想實現的目標的細節。所以functionA調用這個工具,當它結束時,我希望能夠通過發送消息來通知客戶。 – MrGoodKat

回答

1

如果functionA是不能進行異步阻塞函數,你可能想在一個線程池運行:

executor = concurrent.futures.ThreadPoolExecutor() 

@gen.coroutine 
def on_message(self, message): 
    if message['type'] is X: 
     yield executor.submit(functionA, message['data']) 
    elif message['type'] is Y: 
     functionB(message['data']) 

這將阻止該網頁套接字直到functionA結束,但允許其他連接繼續工作。如果需要繼續處理來自相同連接的其他類型的消息,而functionA則運行,則需要更復雜的安排,可能涉及tornado.queues.Queue

+0

謝謝你的回答!是的,即使functionA仍在運行,我仍需要爲同一個客戶端提供服務。有沒有辦法異步執行functionA並附加一個回調函數?所以我可以做一些像functionA_hadler(resultA):#做一些東西(如結果準備好後回覆) – MrGoodKat

+0

是的,你可以做一些像'IOLoop.current()。add_future(executor.submit(functionA,message [ 'data']),回調)'。 –

+0

非常感謝!有用。這絕對是解決我的問題的方法。 – MrGoodKat