2016-07-31 30 views
0

我正在使用python2.7安裝期貨模塊。如何用龍捲風實現多線程?

我想使用ThreadPoolExecutor在龍捲風中實現多線程。

這是我實施的代碼。

from __future__ import absolute_import 
from base_handler import BaseHandler 
from tornado import gen 
from pyrestful import mediatypes 
from pyrestful.rest import get, post, put, delete 
from bson.objectid import ObjectId 
from spark_map import Map 
from concurrent import futures 
import tornado 
class MapService(BaseHandler): 

    MapDB = dict() 
    executor = futures.ProcessPoolExecutor(max_workers=3) 

    @tornado.web.asynchronous 
    @gen.coroutine 
    @post(_path='/map', _type=[str, str]) 
    def postMap(self, inp, out): 
     db = self.settings['db'] 
     function = lambda (x,y): (x,y[0]*2) 
     future = yield db.MapInfo.insert({'input': inp, 'output': out, 'input_function': str(function)}) 
     response = {"inserted ID": str(future)} 
     self.write(response) 

     m = Map(inp, out, function, appName=str(future)) 
     futuree = self.executor.submit(m.operation()) 
     self.MapDB[str(future)] = {'map_object': m, 'running_process_future_object': futuree} 
     self.finish() 

    @tornado.web.asynchronous 
    @gen.coroutine 
    @delete(_path='/map/{_id}', _types=[str]) 
    def deleteMap(self, _id): 
     db = self.settings['db'] 
     future = yield db.MapInfo.find_one({'_id': ObjectId(_id)}) 
     if future is None: 
      raise AttributeError('No entry exists in the database with the provided ID') 
     chk = yield db.MapInfo.remove(future) 
     response = { "Succes": "OK" } 
     self.write(response) 

     self.MapDB[_id]['map_object'].stop() 
     del self.MapDB[_id] 
     self.finish() 

在上面的代碼中,我使用inp和out中的post請求接收兩個輸入。然後我跟他們做一些操作。此操作應持續到收到刪除請求以停止並刪除進程。

我面臨的問題是多個請求。它僅執行第一個請求,而其他請求等待第一個請求完成,從而阻止主IOLoop。

所以,我想運行在一個單獨的線程中的每個進程。我應該如何實現它?

+0

m.operation()是做什麼的? –

+0

m.operation()是我實現的自定義類Map的一個函數。它基本上對一些數據執行一些計算。該操作應該運行,直到收到針對該特定操作的刪除請求。 – vidhan

+0

聽起來就像你在m.operation()中阻塞,你需要重寫它爲異步。然而,我們需要看到一些操作()的代碼是確定的。或者你可以在operation()調用之前和之後放一個「print」。 –

回答

1

看來,m.operation()是阻塞,所以你需要在一個線程上運行它。你這樣做塊主線程同時呼籲m.operation()的方式,併產生一個線程後:

self.executor.submit(m.operation()) 

你想要的,而是給函數傳遞給一個線程將執行它:

self.executor.submit(m.operation) 

沒有parens。