我創建其支持通過使用ThreadPoolExecutor
在龍捲風WSGI應用多線程的請求的自定義WSGIHandler
。所有對WSGI應用程序的調用都是在不同的線程中執行的,所以即使您的WSGI響應需要很長時間,主循環也會保持空閒狀態。下面的代碼是基於this Gist和擴展,使得:
- 您可以從WSGI應用程序流直接的響應(使用迭代器響應)或大文件到客戶端,這樣你就可以保持內存佔用低,甚至當產生大的迴應時。
- 您可以上傳大文件。如果請求體超過1 MB,則整個請求體將被轉儲到臨時文件中,然後傳遞給WSGI應用程序。
目前代碼只被測試過哪些Python 3.4,所以我不知道它是否適用於Python 2.7。它還沒有經過壓力測試,但似乎到目前爲止工作得很好。
# tornado_wsgi.py
import itertools
import logging
import sys
import tempfile
from concurrent import futures
from io import BytesIO
from tornado import escape, gen, web
from tornado.iostream import StreamClosedError
from tornado.wsgi import to_wsgi_str
_logger = logging.getLogger(__name__)
@web.stream_request_body
class WSGIHandler(web.RequestHandler):
thread_pool_size = 20
def initialize(self, wsgi_application):
self.wsgi_application = wsgi_application
self.body_chunks = []
self.body_tempfile = None
def environ(self, request):
"""
Converts a `tornado.httputil.HTTPServerRequest` to a WSGI environment.
"""
hostport = request.host.split(":")
if len(hostport) == 2:
host = hostport[0]
port = int(hostport[1])
else:
host = request.host
port = 443 if request.protocol == "https" else 80
if self.body_tempfile is not None:
body = self.body_tempfile
body.seek(0)
elif self.body_chunks:
body = BytesIO(b''.join(self.body_chunks))
else:
body = BytesIO()
environ = {
"REQUEST_METHOD": request.method,
"SCRIPT_NAME": "",
"PATH_INFO": to_wsgi_str(escape.url_unescape(request.path, encoding=None, plus=False)),
"QUERY_STRING": request.query,
"REMOTE_ADDR": request.remote_ip,
"SERVER_NAME": host,
"SERVER_PORT": str(port),
"SERVER_PROTOCOL": request.version,
"wsgi.version": (1, 0),
"wsgi.url_scheme": request.protocol,
"wsgi.input": body,
"wsgi.errors": sys.stderr,
"wsgi.multithread": False,
"wsgi.multiprocess": True,
"wsgi.run_once": False,
}
if "Content-Type" in request.headers:
environ["CONTENT_TYPE"] = request.headers.pop("Content-Type")
if "Content-Length" in request.headers:
environ["CONTENT_LENGTH"] = request.headers.pop("Content-Length")
for key, value in request.headers.items():
environ["HTTP_" + key.replace("-", "_").upper()] = value
return environ
def prepare(self):
# Accept up to 2GB upload data.
self.request.connection.set_max_body_size(2 << 30)
@gen.coroutine
def data_received(self, chunk):
if self.body_tempfile is not None:
yield self.executor.submit(lambda: self.body_tempfile.write(chunk))
else:
self.body_chunks.append(chunk)
# When the request body grows larger than 1 MB we dump all receiver chunks into
# a temporary file to prevent high memory use. All subsequent body chunks will
# be directly written into the tempfile.
if sum(len(c) for c in self.body_chunks) > (1 << 20):
self.body_tempfile = tempfile.NamedTemporaryFile('w+b')
def copy_to_file():
for c in self.body_chunks:
self.body_tempfile.write(c)
# Remove the chunks to clear the memory.
self.body_chunks[:] = []
yield self.executor.submit(copy_to_file)
@web.asynchronous
@gen.coroutine
def get(self):
data = {}
response = []
def start_response(status, response_headers, exc_info=None):
data['status'] = status
data['headers'] = response_headers
return response.append
environ = self.environ(self.request)
app_response = yield self.executor.submit(self.wsgi_application, environ, start_response)
app_response = iter(app_response)
if not data:
raise Exception('WSGI app did not call start_response')
try:
exhausted = object()
def next_chunk():
try:
return next(app_response)
except StopIteration:
return exhausted
for i in itertools.count():
chunk = yield self.executor.submit(next_chunk)
if i == 0:
status_code, reason = data['status'].split(None, 1)
status_code = int(status_code)
headers = data['headers']
self.set_status(status_code, reason)
for key, value in headers:
self.set_header(key, value)
c = b''.join(response)
if c:
self.write(c)
yield self.flush()
if chunk is not exhausted:
self.write(chunk)
yield self.flush()
else:
break
except StreamClosedError:
_logger.debug('stream closed early')
finally:
# Close the temporary file to make sure that it gets deleted.
if self.body_tempfile is not None:
try:
self.body_tempfile.close()
except OSError as e:
_logger.warning(e)
if hasattr(app_response, 'close'):
yield self.executor.submit(app_response.close)
post = put = delete = head = options = get
@property
def executor(self):
cls = type(self)
if not hasattr(cls, '_executor'):
cls._executor = futures.ThreadPoolExecutor(cls.thread_pool_size)
return cls._executor
以下是一個簡單的Flask應用程序,它演示了WSGIHandler
。 hello()
功能塊會持續一秒,所以如果您的ThreadPoolExecutor
使用20個線程,您將能夠同時(一秒內)加載20個請求。
stream()
函數創建一個迭代器響應,並在5秒內向客戶端傳輸50個數據塊。應該注意的是,在這裏可能無法使用Flask的stream_with_context
裝飾器:由於來自迭代器的每個負載都會產生新的executor.submit()
,因此很有可能不同流的響應塊將從不同的線程加載,從而打破Flask的使用的線程本地人。
import time
from flask import Flask, Response
from tornado import ioloop, log, web
from tornado_wsgi import WSGIHandler
def main():
app = Flask(__name__)
@app.route("/")
def hello():
time.sleep(1)
return "Hello World!"
@app.route("/stream")
def stream():
def generate():
for i in range(50):
time.sleep(0.1)
yield '%d\n' % i
return Response(generate(), mimetype='text/plain')
application = web.Application([
(r'/.*', WSGIHandler, {'wsgi_application': app}),
])
log.enable_pretty_logging()
application.listen(8888)
ioloop.IOLoop.instance().start()
if __name__ == '__main__':
main()
如果您不需要耗費時間的任務的結果,這將起作用,但如果您需要其結果呈現您的index.html,則必須阻止它。在Tornado的WSGI模式下不可能阻止整個IOLoop;您需要一個多線程WSGI服務器。 – 2014-09-24 14:25:36
@BenDarnell你是對的。然而,這是我知道的唯一方法,而不建議切換到另一臺服務器。此外,在這種情況下,耗時的任務可能會將會話或g對象的結果放入下一個請求中。 – 2014-09-24 14:31:39
謝謝Mehdi Sadeghi,但我在@BenDarnell談論的情況。 – 2014-09-25 04:46:20