2014-09-24 18 views
1

請求我已經寫在 Web服務,裹着WSGIContainer並通過使用其FallbackHandler機制龍捲風服務。我的一個路線在燒瓶webservice中運行了一個非常長的操作(需要大約5分鐘才能完成),並且當此路由被觸發時,任何其他路由的任何呼叫都會被阻止,直到操作完成。我如何解決這個問題?製作龍捲風擔任一個單獨的線程

這裏是我的應用程序如何使用龍捲風服務:

parse_command_line() 

    frontend_path = os.path.join(os.path.dirname(__file__),"..","webapp") 

    rest_app = WSGIContainer(app) 
    tornado_app = Application(
     [ 
      (r"/api/(.*)", FallbackHandler, dict(fallback=rest_app)), 
      (r"/app/(.*)", StaticFileHandler, dict(path=frontend_path)) 
     ] 
    ) 

回答

1

您可以考慮使用tornado-threadpool,在這種情況下,您的請求將立即返回,任務將完成在後臺。

from thread_pool import in_thread_pool 
from flask import flash 

@app.route('/wait') 
def wait(): 
    time_consuming_task() 
    flash('Time consuming task running in backround...') 
    return render_template('index.html') 

@in_thread_pool 
def time_consuming_task(): 
    import time 
    time.sleep(5) 
+1

如果您不需要耗費時間的任務的結果,這將起作用,但如果您需要其結果呈現您的index.html,則必須阻止它。在Tornado的WSGI模式下不可能阻止整個IOLoop;您需要一個多線程WSGI服務器。 – 2014-09-24 14:25:36

+0

@BenDarnell你是對的。然而,這是我知道的唯一方法,而不建議切換到另一臺服務器。此外,在這種情況下,耗時的任務可能會將會話或g對象的結果放入下一個請求中。 – 2014-09-24 14:31:39

+0

謝謝Mehdi Sadeghi,但我在@BenDarnell談論的情況。 – 2014-09-25 04:46:20

3

龍捲風的WSGI容器不是很可擴展性,當你有一個具體的理由WSGI和龍捲風應用在相同的進程結合起來,才應使用。龍捲風不支持長時間運行的WSGI請求,不會阻塞;任何需要很長時間的事情都需要使用Tornado的本地異步接口而不是WSGI。

warning in the docs

WSGI是同步接口,而龍捲風的併發模型基於單線程異步執行。這意味着使用Tornado的WSGIContainer運行WSGI應用程序的可擴展性要低於在多線程WSGI服務器(如gunicorn或uwsgi)中運行相同的應用程序。只有在將Tornado和WSGI組合在一個超過減少的可伸縮性的同一過程中有益時才使用WSGIContainer。

+0

是的,我現在可以看到Ben,但我對官方Flask頁面推薦這種方式將Flask應用程序與Tornado連接的方式感到非常失望,並且他們的描述看起來很不真實:http://flask.pocoo.org/docs /0.10/deploying/wsgi-standalone/ – 2014-09-25 04:42:00

+0

是的,我之前曾建議他們刪除該部分,並且我剛剛提交了一個撤消請求的請求。 – 2014-09-25 13:36:55

0

對於這些長時間操作,您可以使用Ladon的任務類型方法。

它爲這些類型的情況提供了一個框架解決方案。

Ladon Tasks documentation

3

我創建其支持通過使用ThreadPoolExecutor在龍捲風WSGI應用多線程的請求的自定義WSGIHandler。所有對WSGI應用程序的調用都是在不同的線程中執行的,所以即使您的WSGI響應需要很長時間,主循環也會保持空閒狀態。下面的代碼是基於this Gist和擴展,使得:

  • 您可以從WSGI應用程序流直接的響應(使用迭代器響應)或大文件到客戶端,這樣你就可以保持內存佔用低,甚至當產生大的迴應時。
  • 您可以上傳大文件。如果請求體超過1 MB,則整個請求體將被轉儲到臨時文件中,然後傳遞給WSGI應用程序。

目前代碼只被測試過哪些Python 3.4,所以我不知道它是否適用於Python 2.7。它還沒有經過壓力測試,但似乎到目前爲止工作得很好。

# tornado_wsgi.py 

import itertools 
import logging 
import sys 
import tempfile 
from concurrent import futures 
from io import BytesIO 

from tornado import escape, gen, web 
from tornado.iostream import StreamClosedError 
from tornado.wsgi import to_wsgi_str 

_logger = logging.getLogger(__name__) 


@web.stream_request_body 
class WSGIHandler(web.RequestHandler): 
    thread_pool_size = 20 

    def initialize(self, wsgi_application): 
     self.wsgi_application = wsgi_application 

     self.body_chunks = [] 
     self.body_tempfile = None 

    def environ(self, request): 
     """ 
     Converts a `tornado.httputil.HTTPServerRequest` to a WSGI environment. 
     """ 
     hostport = request.host.split(":") 
     if len(hostport) == 2: 
      host = hostport[0] 
      port = int(hostport[1]) 
     else: 
      host = request.host 
      port = 443 if request.protocol == "https" else 80 

     if self.body_tempfile is not None: 
      body = self.body_tempfile 
      body.seek(0) 
     elif self.body_chunks: 
      body = BytesIO(b''.join(self.body_chunks)) 
     else: 
      body = BytesIO() 

     environ = { 
      "REQUEST_METHOD": request.method, 
      "SCRIPT_NAME": "", 
      "PATH_INFO": to_wsgi_str(escape.url_unescape(request.path, encoding=None, plus=False)), 
      "QUERY_STRING": request.query, 
      "REMOTE_ADDR": request.remote_ip, 
      "SERVER_NAME": host, 
      "SERVER_PORT": str(port), 
      "SERVER_PROTOCOL": request.version, 
      "wsgi.version": (1, 0), 
      "wsgi.url_scheme": request.protocol, 
      "wsgi.input": body, 
      "wsgi.errors": sys.stderr, 
      "wsgi.multithread": False, 
      "wsgi.multiprocess": True, 
      "wsgi.run_once": False, 
     } 
     if "Content-Type" in request.headers: 
      environ["CONTENT_TYPE"] = request.headers.pop("Content-Type") 
     if "Content-Length" in request.headers: 
      environ["CONTENT_LENGTH"] = request.headers.pop("Content-Length") 
     for key, value in request.headers.items(): 
      environ["HTTP_" + key.replace("-", "_").upper()] = value 
     return environ 

    def prepare(self): 
     # Accept up to 2GB upload data. 
     self.request.connection.set_max_body_size(2 << 30) 

    @gen.coroutine 
    def data_received(self, chunk): 
     if self.body_tempfile is not None: 
      yield self.executor.submit(lambda: self.body_tempfile.write(chunk)) 
     else: 
      self.body_chunks.append(chunk) 

      # When the request body grows larger than 1 MB we dump all receiver chunks into 
      # a temporary file to prevent high memory use. All subsequent body chunks will 
      # be directly written into the tempfile. 
      if sum(len(c) for c in self.body_chunks) > (1 << 20): 
       self.body_tempfile = tempfile.NamedTemporaryFile('w+b') 
       def copy_to_file(): 
        for c in self.body_chunks: 
         self.body_tempfile.write(c) 
        # Remove the chunks to clear the memory. 
        self.body_chunks[:] = [] 
       yield self.executor.submit(copy_to_file) 

    @web.asynchronous 
    @gen.coroutine 
    def get(self): 
     data = {} 
     response = [] 

     def start_response(status, response_headers, exc_info=None): 
      data['status'] = status 
      data['headers'] = response_headers 
      return response.append 

     environ = self.environ(self.request) 
     app_response = yield self.executor.submit(self.wsgi_application, environ, start_response) 
     app_response = iter(app_response) 

     if not data: 
      raise Exception('WSGI app did not call start_response') 

     try: 
      exhausted = object() 

      def next_chunk(): 
       try: 
        return next(app_response) 
       except StopIteration: 
        return exhausted 

      for i in itertools.count(): 
       chunk = yield self.executor.submit(next_chunk) 
       if i == 0: 
        status_code, reason = data['status'].split(None, 1) 
        status_code = int(status_code) 
        headers = data['headers'] 
        self.set_status(status_code, reason) 
        for key, value in headers: 
         self.set_header(key, value) 
        c = b''.join(response) 
        if c: 
         self.write(c) 
         yield self.flush() 
       if chunk is not exhausted: 
        self.write(chunk) 
        yield self.flush() 
       else: 
        break 
     except StreamClosedError: 
      _logger.debug('stream closed early') 
     finally: 
      # Close the temporary file to make sure that it gets deleted. 
      if self.body_tempfile is not None: 
       try: 
        self.body_tempfile.close() 
       except OSError as e: 
        _logger.warning(e) 

      if hasattr(app_response, 'close'): 
       yield self.executor.submit(app_response.close) 

    post = put = delete = head = options = get 

    @property 
    def executor(self): 
     cls = type(self) 
     if not hasattr(cls, '_executor'): 
      cls._executor = futures.ThreadPoolExecutor(cls.thread_pool_size) 
     return cls._executor 

以下是一個簡單的Flask應用程序,它演示了WSGIHandlerhello()功能塊會持續一秒,所以如果您的ThreadPoolExecutor使用20個線程,您將能夠同時(一秒內)加載20個請求。

stream()函數創建一個迭代器響應,並在5秒內向客戶端傳輸50個數據塊。應該注意的是,在這裏可能無法使用Flask的stream_with_context裝飾器:由於來自迭代器的每個負載都會產生新的executor.submit(),因此很有可能不同流的響應塊將從不同的線程加載,從而打破Flask的使用的線程本地人。

import time 
from flask import Flask, Response 
from tornado import ioloop, log, web 
from tornado_wsgi import WSGIHandler 

def main(): 
    app = Flask(__name__) 

    @app.route("/") 
    def hello(): 
     time.sleep(1) 
     return "Hello World!" 

    @app.route("/stream") 
    def stream(): 
     def generate(): 
      for i in range(50): 
       time.sleep(0.1) 
       yield '%d\n' % i 
     return Response(generate(), mimetype='text/plain') 

    application = web.Application([ 
     (r'/.*', WSGIHandler, {'wsgi_application': app}), 
    ]) 

    log.enable_pretty_logging() 
    application.listen(8888) 
    ioloop.IOLoop.instance().start() 

if __name__ == '__main__': 
    main()