概述
Gunicorn 是一个被广泛使用的高性能的 Python WSGI UNIX HTTP 服务器。
由于源码调用了 fcntl、fork 等接口,因此只能跑在类 Unix 系统上.
Gunicorn 设计思想
Gunicorn 基于 pre-fork 的思想,有一个 master 进程,由它来管理一组 worker 进程,worker 进程负责对来自客户端的请求进行响应。gunicorn 依靠操作系统来提供负载均衡。
pre-fork 服务器会预先开启一定数量(数量由参数决定)的 worker 进程,在用户请求到来时服务器并不需要等待新的进程启动,因而能够以更快的速度应付多用户请求。
源码解析
目录结构
│ arbiter.py # master进程,gunicorn 中称为 arbiter │ config.py # 配置相关,可以通过命令行参数或配置文件来配置 gunicorn │ debug.py │ errors.py │ glogging.py │ pidfile.py │ reloader.py │ sock.py │ systemd.py │ util.py │ __init__.py | ├─app │ base.py # 入口基类 │ pasterapp.py │ wsgiapp.py # 程序入口 │ __init__.py │ ├─http # 处理客户端发送过来的 http 请求 │ body.py │ errors.py │ message.py │ parser.py │ unreader.py │ wsgi.py │ __init__.py │ ├─instrument │ statsd.py │ __init__.py │ ├─workers # Gunicorn 中 worker 有 sync, gthread, ggevent 等模式可选 base.py base_async.py geventlet.py ggevent.py gthread.py gtornado.py sync.py workertmp.py __init__.py
从入口开始
程序入口 wsgiapp.py 的 run() 将会运行基类 base.py 中 BaseApplication的run():
# base.py def run(self): try: Arbiter(self).run() except RuntimeError as e: print("\nError: %s\n" % e, file=sys.stderr) sys.stderr.flush() sys.exit(1)
即运行 arbiter.py 的 run():
# arbiter.py def run(self): "Main master loop." self.start() # 初始化 arbiter,为信号绑定处理函数,创建 socket 对端口进行监听 util._setproctitle("master [%s]" % self.proc_name) try: self.manage_workers() # 管理 worker 进程 while True: self.maybe_promote_master() sig = self.SIG_QUEUE.pop(0) if self.SIG_QUEUE else None if sig is None: self.sleep() self.murder_workers() self.manage_workers() continue if sig not in self.SIG_NAMES: self.log.info("Ignoring unknown signal: %s", sig) continue signame = self.SIG_NAMES.get(sig) handler = getattr(self, "handle_%s" % signame, None) if not handler: self.log.error("Unhandled signal: %s", signame) continue self.log.info("Handling signal: %s", signame) handler() self.wakeup() except (StopIteration, KeyboardInterrupt): self.halt() except HaltServer as inst: self.halt(reason=inst.reason, exit_status=inst.exit_status) except SystemExit: raise except Exception: self.log.info("Unhandled exception in main loop", exc_info=True) self.stop(False) if self.pidfile is not None: self.pidfile.unlink() sys.exit(-1)
首先运行 start()
# arbiter.py def start(self): """\ Initialize the arbiter. Start listening and set pidfile if needed. """ self.log.info("Starting gunicorn %s", __version__) if 'GUNICORN_PID' in os.environ: self.master_pid = int(os.environ.get('GUNICORN_PID')) self.proc_name = self.proc_name + ".2" self.master_name = "Master.2" self.pid = os.getpid() if self.cfg.pidfile is not None: pidname = self.cfg.pidfile if self.master_pid != 0: pidname += ".2" self.pidfile = Pidfile(pidname) self.pidfile.create(self.pid) self.cfg.on_starting(self) self.init_signals() if not self.LISTENERS: fds = None listen_fds = systemd.listen_fds() if listen_fds: self.systemd = True fds = range(systemd.SD_LISTEN_FDS_START, systemd.SD_LISTEN_FDS_START + listen_fds) elif self.master_pid: fds = [] for fd in os.environ.pop('GUNICORN_FD').split(','): fds.append(int(fd)) self.LISTENERS = sock.create_sockets(self.cfg, self.log, fds) listeners_str = ",".join([str(l) for l in self.LISTENERS]) self.log.debug("Arbiter booted") self.log.info("Listening at: %s (%s)", listeners_str, self.pid) self.log.info("Using worker: %s", self.cfg.worker_class_str) systemd.sd_notify("READY=1\nSTATUS=Gunicorn arbiter booted", self.log) # check worker class requirements if hasattr(self.worker_class, "check_config"): self.worker_class.check_config(self.cfg, self.log) self.cfg.when_ready(self)
其中比较重要的是 init_signals() 和 create_sockets()
# arbiter.py def init_signals(self): """\ Initialize master signal handling. Most of the signals are queued. Child signals only wake up the master. """ # close old PIPE for p in self.PIPE: os.close(p) # initialize the pipe self.PIPE = pair = os.pipe() for p in pair: util.set_non_blocking(p) util.close_on_exec(p) self.log.close_on_exec() # initialize all signals for s in self.SIGNALS: signal.signal(s, self.signal) # 为信号注册处理函数self.signal signal.signal(signal.SIGCHLD, self.handle_chld) def signal(self, sig, frame): if len(self.SIG_QUEUE) < 5: self.SIG_QUEUE.append(sig) self.wakeup()
其中 SIGCHLD 在 arbiter.py 中是:
SIGNALS = [getattr(signal, "SIG%s" % x)
for x in "HUP QUIT INT TERM TTIN TTOU USR1 USR2 WINCH".split()]
- HUP,重启所有的配置和所有的 worker 进程
- QUIT,正常关闭,它会等待所有 worker 进程处理完各自的东西后关闭
- INT/TERM,立即关闭,强行中止所有的处理
- TTIN,增加一个 worker 进程
- TTOU,减少一个 worker 进程
- USR1,重新打开由 master 和 worker 所有的日志处理
- USR2,重新运行 master 和 worker
- WINCH,正常关闭所有 worker 进程,保持主控 master 进程的运行
signal() 中通过向 pipe 里写入数据来唤醒 Arbiter:
def wakeup(self): """\ Wake up the arbiter by writing to the PIPE """ try: os.write(self.PIPE[1], b'.') except IOError as e: if e.errno not in [errno.EAGAIN, errno.EINTR]: raise
也就是说,产生上述 8 种信号之一时,arbiter 进程都会被唤醒。
回到 arbiter.py 中的 start(), create_sockets()。它会根据配置中的地址和端口号,创建 socket,对其进行监听:
def create_sockets(conf, log, fds=None):
"""
Create a new socket for the configured addresses or file descriptors.
If a configured address is a tuple then a TCP socket is created.
If it is a string, a Unix socket is created. Otherwise, a TypeError is
raised.
"""
listeners = []
# get it only once
addr = conf.address
fdaddr = [bind for bind in addr if isinstance(bind, int)]
if fds:
fdaddr += list(fds)
laddr = [bind for bind in addr if not isinstance(bind, int)]
# check ssl config early to raise the error on startup
# only the certfile is needed since it can contains the keyfile
if conf.certfile and not os.path.exists(conf.certfile):
raise ValueError('certfile "%s" does not exist' % conf.certfile)
if conf.keyfile and not os.path.exists(conf.keyfile):
raise ValueError('keyfile "%s" does not exist' % conf.keyfile)
# sockets are already bound
if fdaddr:
for fd in fdaddr:
sock = socket.fromfd(fd, socket.AF_UNIX, socket.SOCK_STREAM)
sock_name = sock.getsockname()
sock_type = _sock_type(sock_name)
listener = sock_type(sock_name, conf, log, fd=fd)
listeners.append(listener)
return listeners
# no sockets is bound, first initialization of gunicorn in this env.
for addr in laddr:
sock_type = _sock_type(addr)
sock = None
for i in range(5):
try:
# 创建socket并绑定地址
sock = sock_type(addr, conf, log)
except socket.error as e:
if e.args[0] == errno.EADDRINUSE:
log.error("Connection in use: %s", str(addr))
if e.args[0] == errno.EADDRNOTAVAIL:
log.error("Invalid address: %s", str(addr))
if i < 5:
msg = "connection to {addr} failed: {error}"
log.debug(msg.format(addr=str(addr), error=str(e)))
log.error("Retrying in 1 second.")
time.sleep(1)
else:
break
if sock is None:
log.error("Can't connect to %s", str(addr))
sys.exit(1)
listeners.append(sock)
return listeners
其中 _sock_type 返回:
def _sock_type(addr): if isinstance(addr, tuple): if util.is_ipv6(addr[0]): sock_type = TCP6Socket else: sock_type = TCPSocket elif isinstance(addr, (str, bytes)): sock_type = UnixSocket else: raise TypeError("Unable to create socket from: %r" % addr) return sock_type
class TCP6Socket(TCPSocket):
FAMILY = socket.AF_INET6
def __str__(self):
(host, port, _, _) = self.sock.getsockname()
return "http://[%s]:%d" % (host, port)
class TCPSocket(BaseSocket):
FAMILY = socket.AF_INET
def __str__(self):
if self.conf.is_ssl:
scheme = "https"
else:
scheme = "http"
addr = self.sock.getsockname()
return "%s://%s:%d" % (scheme, addr[0], addr[1])
def set_options(self, sock, bound=False):
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
return super().set_options(sock, bound=bound)
class BaseSocket(object):
def __init__(self, address, conf, log, fd=None):
self.log = log
self.conf = conf
self.cfg_addr = address
if fd is None:
sock = socket.socket(self.FAMILY, socket.SOCK_STREAM)
bound = False
else:
sock = socket.fromfd(fd, self.FAMILY, socket.SOCK_STREAM)
os.close(fd)
bound = True
self.sock = self.set_options(sock, bound=bound)
def __str__(self):
return "<socket %d>" % self.sock.fileno()
def __getattr__(self, name):
return getattr(self.sock, name)
def set_options(self, sock, bound=False):
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
if (self.conf.reuse_port
and hasattr(socket, 'SO_REUSEPORT')): # pragma: no cover
try:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
except socket.error as err:
if err.errno not in (errno.ENOPROTOOPT, errno.EINVAL):
raise
if not bound:
self.bind(sock)
sock.setblocking(0)
# make sure that the socket can be inherited
if hasattr(sock, "set_inheritable"):
sock.set_inheritable(True)
sock.listen(self.conf.backlog)
return sock
def bind(self, sock):
sock.bind(self.cfg_addr)
def close(self):
if self.sock is None:
return
try:
self.sock.close()
except socket.error as e:
self.log.info("Error while closing socket %s", str(e))
self.sock = None
再回到 arbiter.py 中的 run(), start() 之后会进行 murder_workers。
# arbiter.py def murder_workers(self): """\ Kill unused/idle workers """ if not self.timeout: return workers = list(self.WORKERS.items()) for (pid, worker) in workers: try: if time.time() - worker.tmp.last_update() <= self.timeout: continue except (OSError, ValueError): continue if not worker.aborted: self.log.critical("WORKER TIMEOUT (pid:%s)", pid) worker.aborted = True self.kill_worker(pid, signal.SIGABRT) else: self.kill_worker(pid, signal.SIGKILL) def last_update(self): return os.fstat(self._tmp.fileno()).st_ctime
arbiter 是通过检查临时文件的最新修改时间来判断每个 worker 是否 timeout 的。Worker 修改这个文件的逻辑后面会进行介绍。若超过设定的 timeout,arbiter 会将这个 worker kill 掉。
arbiter 接着 manage_workers():
# arbiter.py def manage_workers(self): """\ Maintain the number of workers by spawning or killing as required. """ if len(self.WORKERS) < self.num_workers: self.spawn_workers() workers = self.WORKERS.items() workers = sorted(workers, key=lambda w: w[1].age) while len(workers) > self.num_workers: (pid, _) = workers.pop(0) self.kill_worker(pid, signal.SIGTERM) active_worker_count = len(workers) if self._last_logged_active_worker_count != active_worker_count: self._last_logged_active_worker_count = active_worker_count self.log.debug("{0} workers".format(active_worker_count), extra={"metric": "gunicorn.workers", "value": active_worker_count, "mtype": "gauge"})
manage_workers() 功能是管理工作进程的数量是跟配置中的 worker 数量一致,小于的话会进行 spawn_workers(),大于的话会对最旧的 worker 进行 kill_worker(pid, signal.SIGTERM)。
def spawn_worker(self): self.worker_age += 1 # 根据配置中的 worker_class 创建对应的 worker instance,它会把LISTENERS传给worker worker = self.worker_class(self.worker_age, self.pid, self.LISTENERS, self.app, self.timeout / 2.0, self.cfg, self.log) self.cfg.pre_fork(self, worker) # 使用fork创建子进程后,子进程会复制父进程的数据信息,而后程序就分两个进程继续运行后面的程序。 # 在子进程内,方法会返回0;在父进程内,方法会返回子进程的编号PID pid = os.fork() if pid != 0: worker.pid = pid # 父进程将work存进self.WORKERS[pid]里,方便管理 self.WORKERS[pid] = worker return pid # Do not inherit the temporary files of other workers for sibling in self.WORKERS.values(): sibling.tmp.close() # Process Child worker.pid = os.getpid() try: util._setproctitle("worker [%s]" % self.proc_name) self.log.info("Booting worker with pid: %s", workerx.pid) self.cfg.post_fork(self, worker) worker.init_process() sys.exit(0) except SystemExit: raise except AppImportError as e: self.log.debug("Exception while loading the application", exc_info=True) print("%s" % e, file=sys.stderr) sys.stderr.flush() sys.exit(self.APP_LOAD_ERROR) except: self.log.exception("Exception in worker process") if not worker.booted: sys.exit(self.WORKER_BOOT_ERROR) sys.exit(-1) finally: self.log.info("Worker exiting (pid: %s)", worker.pid) try: worker.tmp.close() self.cfg.worker_exit(self, worker) except: self.log.warning("Exception during worker exit:\n%s", traceback.format_exc())
worker 有 sync、gthread、eventlet、gevent 和 tornado 五种类型。默认的是 sync。就先看 sync 的代码。
继续看 worker.init_process() 函数,SyncWorker 没有 override 基类的这个方法,所以会运行基类的方法:
# workers/base.py def init_process(self): """\ If you override this method in a subclass, the last statement in the function should be to call this method with super().init_process() so that the ``run()`` loop is initiated. """ # set environment' variables if self.cfg.env: for k, v in self.cfg.env.items(): os.environ[k] = v util.set_owner_process(self.cfg.uid, self.cfg.gid, initgroups=self.cfg.initgroups) # Reseed the random number generator util.seed() # For waking ourselves up self.PIPE = os.pipe() for p in self.PIPE: util.set_non_blocking(p) util.close_on_exec(p) # Prevent fd inheritance for s in self.sockets: util.close_on_exec(s) util.close_on_exec(self.tmp.fileno()) self.wait_fds = self.sockets + [self.PIPE[0]] self.log.close_on_exec() self.init_signals() # 注册信号量 self.load_wsgi() # 加载wsgi application # start the reloader if self.cfg.reload: def changed(fname): self.log.info("Worker reloading: %s modified", fname) self.alive = False os.write(self.PIPE[1], b"1") self.cfg.worker_int(self) time.sleep(0.1) sys.exit(0) reloader_cls = reloader_engines[self.cfg.reload_engine] self.reloader = reloader_cls(extra_files=self.cfg.reload_extra_files, callback=changed) self.reloader.start() self.cfg.post_worker_init(self) # Enter main run loop self.booted = True self.run()
最后会运行 run():
# workers/sync.py def run(self): # if no timeout is given the worker will never wait and will # use the CPU for nothing. This minimal timeout prevent it. timeout = self.timeout or 0.5 # self.socket appears to lose its blocking status after # we fork in the arbiter. Reset it here. for s in self.sockets: s.setblocking(0) if len(self.sockets) > 1: self.run_for_multiple(timeout) else: self.run_for_one(timeout)
以绑定单端口的情况为例:
# workers/sync.py
def run_for_one(self, timeout):
listener = self.sockets[0]
while self.alive:
self.notify() # 通过notify向master做心跳,即修改临时文件
# Accept a connection. If we get an error telling us
# that no connection is waiting we fall down to the
# select which is where we'll wait for a bit for new
# workers to come give us some love.
try:
self.accept(listener)
# Keep processing clients until no one is waiting. This
# prevents the need to select() for every client that we
# process.
continue
except EnvironmentError as e:
if e.errno not in (errno.EAGAIN, errno.ECONNABORTED,
errno.EWOULDBLOCK):
raise
if not self.is_parent_alive():
return
try:
self.wait(timeout)
except StopWaiting:
return
# workertmp.py def notify(self): self.spinner = (self.spinner + 1) % 2 os.fchmod(self._tmp.fileno(), self.spinner)
从 run_for_one() 中我们得知,worker 只要活着,就会不断地 accept 就绪的请求,即 self.accept(listener):
# workers/sync.py def accept(self, listener): client, addr = listener.accept() client.setblocking(1) # 将 socket 设置为阻塞模式,该 worker 进程会一直 block 直到请求处理完成 util.close_on_exec(client) self.handle(listener, client, addr)
handle(listener, client, addr) 中会对来自 client 请求进行处理:
# workers/sync.py def handle(self, listener, client, addr): req = None try: if self.cfg.is_ssl: client = ssl.wrap_socket(client, server_side=True, **self.cfg.ssl_options) parser = http.RequestParser(self.cfg, client) req = next(parser) self.handle_request(listener, req, client, addr) except http.errors.NoMoreData as e: self.log.debug("Ignored premature client disconnection. %s", e) except StopIteration as e: self.log.debug("Closing connection. %s", e) except ssl.SSLError as e: if e.args[0] == ssl.SSL_ERROR_EOF: self.log.debug("ssl connection closed") client.close() else: self.log.debug("Error processing SSL request.") self.handle_error(req, client, addr, e) except EnvironmentError as e: if e.errno not in (errno.EPIPE, errno.ECONNRESET): self.log.exception("Socket error processing request.") else: if e.errno == errno.ECONNRESET: self.log.debug("Ignoring connection reset") else: self.log.debug("Ignoring EPIPE") except Exception as e: self.handle_error(req, client, addr, e) finally: util.close(client)
# workers/sync.py
def handle_request(self, listener, req, client, addr):
environ = {}
resp = None
try:
self.cfg.pre_request(self, req)
request_start = datetime.now()
# 解析request,保存在environ中,初始化response
resp, environ = wsgi.create(req, client, addr,
listener.getsockname(), self.cfg)
# Force the connection closed until someone shows
# a buffering proxy that supports Keep-Alive to
# the backend.
resp.force_close()
self.nr += 1
if self.nr >= self.max_requests:
self.log.info("Autorestarting worker after current request.")
self.alive = False
# wsgi接口,传递environ 和 start_response到 wsgi application
respiter = self.wsgi(environ, resp.start_response)
try:
if isinstance(respiter, environ['wsgi.file_wrapper']):
resp.write_file(respiter)
else:
for item in respiter:
resp.write(item)
resp.close()
request_time = datetime.now() - request_start
self.log.access(resp, req, environ, request_time)
finally:
if hasattr(respiter, "close"):
respiter.close()
except EnvironmentError:
# pass to next try-except level
util.reraise(*sys.exc_info())
except Exception:
if resp and resp.headers_sent:
# If the requests have already been sent, we should close the
# connection to indicate the error.
self.log.exception("Error handling request")
try:
client.shutdown(socket.SHUT_RDWR)
client.close()
except EnvironmentError:
pass
raise StopIteration()
raise
finally:
try:
self.cfg.post_request(self, req, environ, resp)
except Exception:
self.log.exception("Exception in post_request hook")
如果没有在等待的请求,accept() 会抛出 WOULDBLOCK 或 EAGAIN,因为被 except catch 住,会进入到 self.wait(timeout),使用 select 监听 wait_fds,直到某个 socket 就绪:
def wait(self, timeout): try: self.notify() ret = select.select(self.wait_fds, [], [], timeout) if ret[0]: if self.PIPE[0] in ret[0]: os.read(self.PIPE[0], 1) return ret[0] except select.error as e: if e.args[0] == errno.EINTR: return self.sockets if e.args[0] == errno.EBADF: if self.nr < 0: return self.sockets else: raise StopWaiting raise
# workers/ggevent.py def init_process(self): self.patch() # monkey patch ,将所有阻塞实现换成gevent库的async实现 hub.reinit() # 为gevent hub在新创建的子进程中启动做准备。os.fork()后子进程必须调用 super().init_process() def patch(self): monkey.patch_all() # monkey patch sendfile to make it none blocking patch_sendfile() # patch sockets sockets = [] for s in self.sockets: sockets.append(socket.socket(s.FAMILY, socket.SOCK_STREAM, fileno=s.sock.fileno())) self.sockets = sockets
# workers/ggevent.py def run(self): servers = [] ssl_args = {} if self.cfg.is_ssl: ssl_args = dict(server_side=True, **self.cfg.ssl_options) for s in self.sockets: # 设置为非阻塞 s.setblocking(1) # 用来限制单个线程内可以并发的gevent.greenlet的数目, # 即一个worker可以维持的最大连接数,默认是1000,如果超过,请求可能被kill pool = Pool(self.worker_connections) if self.server_class is not None: environ = base_environ(self.cfg) environ.update({ "wsgi.multithread": True, "SERVER_SOFTWARE": VERSION, }) server = self.server_class( s, application=self.wsgi, spawn=pool, log=self.log, handler_class=self.wsgi_handler, environ=environ, **ssl_args) else: # self.handle是请求处理函数 hfun = partial(self.handle, s) # 创建gevent StreamServer,当socket监听到端口的请求时,会创建一个gevent.greenlet, # 运行hfun函数对其进行处理 server = StreamServer(s, handle=hfun, spawn=pool, **ssl_args) # 启动server server.start() servers.append(server) # 每秒向Arbiter发送心跳,代表自己还活着 while self.alive: self.notify() gevent.sleep(1.0) try: # Stop accepting requests for server in servers: if hasattr(server, 'close'): # gevent 1.0 server.close() if hasattr(server, 'kill'): # gevent < 1.0 server.kill() # Handle current requests until graceful_timeout ts = time.time() while time.time() - ts <= self.cfg.graceful_timeout: accepting = 0 for server in servers: if server.pool.free_count() != server.pool.size: accepting += 1 # if no server is accepting a connection, we can exit if not accepting: return self.notify() gevent.sleep(1.0) # Force kill all active the handlers self.log.warning("Worker graceful timeout (pid:%s)" % self.pid) for server in servers: server.stop(timeout=1) except: pass
def handle(self, listener, client, addr):
# Connected socket timeout defaults to socket.getdefaulttimeout().
# This forces to blocking mode.
# 设置为非阻塞
client.setblocking(1)
super().handle(listener, client, addr)
handle(listener, client, addr) 会运行 AsyncWorker 的 handle():
# workers/base_async.py def handle(self, listener, client, addr): req = None try: parser = http.RequestParser(self.cfg, client) try: listener_name = listener.getsockname() # keepalive: 在keep-alive连接上等待请求的秒数 if not self.cfg.keepalive: req = next(parser) self.handle_request(listener_name, req, client, addr) else: # keepalive loop proxy_protocol_info = {} while True: req = None with self.timeout_ctx(): req = next(parser) if not req: break if req.proxy_protocol_info: proxy_protocol_info = req.proxy_protocol_info else: req.proxy_protocol_info = proxy_protocol_info self.handle_request(listener_name, req, client, addr) except http.errors.NoMoreData as e: self.log.debug("Ignored premature client disconnection. %s", e) except StopIteration as e: self.log.debug("Closing connection. %s", e) except ssl.SSLError: # pass to next try-except level util.reraise(*sys.exc_info()) except EnvironmentError: # pass to next try-except level util.reraise(*sys.exc_info()) except Exception as e: self.handle_error(req, client, addr, e) except ssl.SSLError as e: if e.args[0] == ssl.SSL_ERROR_EOF: self.log.debug("ssl connection closed") client.close() else: self.log.debug("Error processing SSL request.") self.handle_error(req, client, addr, e) except EnvironmentError as e: if e.errno not in (errno.EPIPE, errno.ECONNRESET): self.log.exception("Socket error processing request.") else: if e.errno == errno.ECONNRESET: self.log.debug("Ignoring connection reset") else: self.log.debug("Ignoring EPIPE") except Exception as e: self.handle_error(req, client, addr, e) finally: util.close(client)
# workers/base_async.py
def handle_request(self, listener_name, req, sock, addr):
request_start = datetime.now()
environ = {}
resp = None
try:
self.cfg.pre_request(self, req)
resp, environ = wsgi.create(req, sock, addr,
listener_name, self.cfg)
environ["wsgi.multithread"] = True
self.nr += 1
if self.alive and self.nr >= self.max_requests:
self.log.info("Autorestarting worker after current request.")
resp.force_close()
self.alive = False
if not self.cfg.keepalive:
resp.force_close()
respiter = self.wsgi(environ, resp.start_response)
if self.is_already_handled(respiter):
return False
try:
if isinstance(respiter, environ['wsgi.file_wrapper']):
resp.write_file(respiter)
else:
for item in respiter:
resp.write(item)
resp.close()
request_time = datetime.now() - request_start
self.log.access(resp, req, environ, request_time)
finally:
if hasattr(respiter, "close"):
respiter.close()
if resp.should_close():
raise StopIteration()
except StopIteration:
raise
except EnvironmentError:
# If the original exception was a socket.error we delegate
# handling it to the caller (where handle() might ignore it)
util.reraise(*sys.exc_info())
except Exception:
if resp and resp.headers_sent:
# If the requests have already been sent, we should close the
# connection to indicate the error.
self.log.exception("Error handling request")
try:
sock.shutdown(socket.SHUT_RDWR)
sock.close()
except EnvironmentError:
pass
raise StopIteration()
raise
finally:
try:
self.cfg.post_request(self, req, environ, resp)
except Exception:
self.log.exception("Exception in post_request hook")
return True
reference
https://www.jianshu.com/p/1e5feccb37d9