Gunicorn 源码解析 -- Python Web Server





概述


Gunicorn 是一个被广泛使用的高性能的 Python WSGI UNIX HTTP 服务器。

由于源码调用了 fcntl、fork 等接口,因此只能跑在类 Unix 系统上.


Gunicorn 设计思想

Gunicorn 基于 pre-fork 的思想,有一个 master 进程,由它来管理一组 worker 进程,worker 进程负责对来自客户端的请求进行响应。gunicorn 依靠操作系统来提供负载均衡。

pre-fork 服务器会预先开启一定数量(数量由参数决定)的 worker 进程,在用户请求到来时服务器并不需要等待新的进程启动,因而能够以更快的速度应付多用户请求。



源码解析


目录结构


│  arbiter.py               # master进程,gunicorn 中称为 arbiter
│  config.py                # 配置相关,可以通过命令行参数或配置文件来配置 gunicorn
│  debug.py
│  errors.py
│  glogging.py
│  pidfile.py
│  reloader.py
│  sock.py
│  systemd.py
│  util.py
│  __init__.py
|
├─app
│     base.py               # 入口基类
│     pasterapp.py
│     wsgiapp.py            # 程序入口
│     __init__.py
│
├─http                      # 处理客户端发送过来的 http 请求
│     body.py
│     errors.py
│     message.py
│     parser.py
│     unreader.py
│     wsgi.py
│     __init__.py
│
├─instrument
│     statsd.py
│     __init__.py
│   
├─workers                   # Gunicorn 中 worker 有 sync, gthread, ggevent 等模式可选
      base.py
      base_async.py
      geventlet.py
      ggevent.py
      gthread.py
      gtornado.py
      sync.py
      workertmp.py
      __init__.py


从入口开始

程序入口 wsgiapp.py 的 run() 将会运行基类 base.py 中 BaseApplication的run():

# base.py
def run(self):
    try:
        Arbiter(self).run()
    except RuntimeError as e:
        print("\nError: %s\n" % e, file=sys.stderr)
        sys.stderr.flush()
        sys.exit(1)


即运行 arbiter.py 的 run():

# arbiter.py
def run(self):
    "Main master loop."
    self.start()              # 初始化 arbiter,为信号绑定处理函数,创建 socket 对端口进行监听
    util._setproctitle("master [%s]" % self.proc_name)

    try:
        self.manage_workers()         # 管理 worker 进程

        while True:
            self.maybe_promote_master()

            sig = self.SIG_QUEUE.pop(0) if self.SIG_QUEUE else None
            if sig is None:
                self.sleep()
                self.murder_workers()
                self.manage_workers()
                continue

            if sig not in self.SIG_NAMES:
                self.log.info("Ignoring unknown signal: %s", sig)
                continue

            signame = self.SIG_NAMES.get(sig)
            handler = getattr(self, "handle_%s" % signame, None)
            if not handler:
                self.log.error("Unhandled signal: %s", signame)
                continue
            self.log.info("Handling signal: %s", signame)
            handler()
            self.wakeup()
    except (StopIteration, KeyboardInterrupt):
        self.halt()
    except HaltServer as inst:
        self.halt(reason=inst.reason, exit_status=inst.exit_status)
    except SystemExit:
        raise
    except Exception:
        self.log.info("Unhandled exception in main loop",
                      exc_info=True)
        self.stop(False)
        if self.pidfile is not None:
            self.pidfile.unlink()
        sys.exit(-1)


首先运行 start()

# arbiter.py
def start(self):
    """\
    Initialize the arbiter. Start listening and set pidfile if needed.
    """
    self.log.info("Starting gunicorn %s", __version__)

    if 'GUNICORN_PID' in os.environ:
        self.master_pid = int(os.environ.get('GUNICORN_PID'))
        self.proc_name = self.proc_name + ".2"
        self.master_name = "Master.2"

    self.pid = os.getpid()
    if self.cfg.pidfile is not None:
        pidname = self.cfg.pidfile
        if self.master_pid != 0:
            pidname += ".2"
        self.pidfile = Pidfile(pidname)
        self.pidfile.create(self.pid)
    self.cfg.on_starting(self)

    self.init_signals()

    if not self.LISTENERS:
        fds = None
        listen_fds = systemd.listen_fds()
        if listen_fds:
            self.systemd = True
            fds = range(systemd.SD_LISTEN_FDS_START,
                        systemd.SD_LISTEN_FDS_START + listen_fds)

        elif self.master_pid:
            fds = []
            for fd in os.environ.pop('GUNICORN_FD').split(','):
                fds.append(int(fd))

        self.LISTENERS = sock.create_sockets(self.cfg, self.log, fds)

    listeners_str = ",".join([str(l) for l in self.LISTENERS])
    self.log.debug("Arbiter booted")
    self.log.info("Listening at: %s (%s)", listeners_str, self.pid)
    self.log.info("Using worker: %s", self.cfg.worker_class_str)
    systemd.sd_notify("READY=1\nSTATUS=Gunicorn arbiter booted", self.log)

    # check worker class requirements
    if hasattr(self.worker_class, "check_config"):
        self.worker_class.check_config(self.cfg, self.log)

    self.cfg.when_ready(self)


其中比较重要的是 init_signals() 和 create_sockets()

# arbiter.py
def init_signals(self):
    """\
    Initialize master signal handling. Most of the signals
    are queued. Child signals only wake up the master.
    """
    # close old PIPE
    for p in self.PIPE:
        os.close(p)

    # initialize the pipe
    self.PIPE = pair = os.pipe()
    for p in pair:
        util.set_non_blocking(p)
        util.close_on_exec(p)

    self.log.close_on_exec()

    # initialize all signals
    for s in self.SIGNALS:
        signal.signal(s, self.signal)    # 为信号注册处理函数self.signal
    signal.signal(signal.SIGCHLD, self.handle_chld)

def signal(self, sig, frame):
    if len(self.SIG_QUEUE) < 5:
        self.SIG_QUEUE.append(sig)
        self.wakeup()


其中 SIGCHLD 在 arbiter.py 中是:

SIGNALS = [getattr(signal, "SIG%s" % x)
           for x in "HUP QUIT INT TERM TTIN TTOU USR1 USR2 WINCH".split()]

  • HUP,重启所有的配置和所有的 worker 进程
  • QUIT,正常关闭,它会等待所有 worker 进程处理完各自的东西后关闭
  • INT/TERM,立即关闭,强行中止所有的处理
  • TTIN,增加一个 worker 进程
  • TTOU,减少一个 worker 进程
  • USR1,重新打开由 master 和 worker 所有的日志处理
  • USR2,重新运行 master 和 worker
  • WINCH,正常关闭所有 worker 进程,保持主控 master 进程的运行

signal() 中通过向 pipe 里写入数据来唤醒 Arbiter:

def wakeup(self):
    """\
    Wake up the arbiter by writing to the PIPE
    """
    try:
        os.write(self.PIPE[1], b'.')
    except IOError as e:
        if e.errno not in [errno.EAGAIN, errno.EINTR]:
            raise

也就是说,产生上述 8 种信号之一时,arbiter 进程都会被唤醒。

回到 arbiter.py 中的 start(), create_sockets()。它会根据配置中的地址和端口号,创建 socket,对其进行监听:

def create_sockets(conf, log, fds=None):
    """
    Create a new socket for the configured addresses or file descriptors.

    If a configured address is a tuple then a TCP socket is created.
    If it is a string, a Unix socket is created. Otherwise, a TypeError is
    raised.
    """
    listeners = []

    # get it only once
    addr = conf.address
    fdaddr = [bind for bind in addr if isinstance(bind, int)]
    if fds:
        fdaddr += list(fds)
    laddr = [bind for bind in addr if not isinstance(bind, int)]

    # check ssl config early to raise the error on startup
    # only the certfile is needed since it can contains the keyfile
    if conf.certfile and not os.path.exists(conf.certfile):
        raise ValueError('certfile "%s" does not exist' % conf.certfile)

    if conf.keyfile and not os.path.exists(conf.keyfile):
        raise ValueError('keyfile "%s" does not exist' % conf.keyfile)

    # sockets are already bound
    if fdaddr:
        for fd in fdaddr:
            sock = socket.fromfd(fd, socket.AF_UNIX, socket.SOCK_STREAM)
            sock_name = sock.getsockname()
            sock_type = _sock_type(sock_name)
            listener = sock_type(sock_name, conf, log, fd=fd)
            listeners.append(listener)

        return listeners

    # no sockets is bound, first initialization of gunicorn in this env.
    for addr in laddr:
        sock_type = _sock_type(addr)
        sock = None
        for i in range(5):
            try:
                # 创建socket并绑定地址
                sock = sock_type(addr, conf, log)
            except socket.error as e:
                if e.args[0] == errno.EADDRINUSE:
                    log.error("Connection in use: %s", str(addr))
                if e.args[0] == errno.EADDRNOTAVAIL:
                    log.error("Invalid address: %s", str(addr))
                if i < 5:
                    msg = "connection to {addr} failed: {error}"
                    log.debug(msg.format(addr=str(addr), error=str(e)))
                    log.error("Retrying in 1 second.")
                    time.sleep(1)
            else:
                break

        if sock is None:
            log.error("Can't connect to %s", str(addr))
            sys.exit(1)

        listeners.append(sock)

    return listeners

其中 _sock_type 返回:

def _sock_type(addr):
    if isinstance(addr, tuple):
        if util.is_ipv6(addr[0]):
            sock_type = TCP6Socket
        else:
            sock_type = TCPSocket
    elif isinstance(addr, (str, bytes)):
        sock_type = UnixSocket
    else:
        raise TypeError("Unable to create socket from: %r" % addr)
    return sock_type
class TCP6Socket(TCPSocket):

    FAMILY = socket.AF_INET6

    def __str__(self):
        (host, port, _, _) = self.sock.getsockname()
        return "http://[%s]:%d" % (host, port)
class TCPSocket(BaseSocket):

    FAMILY = socket.AF_INET

    def __str__(self):
        if self.conf.is_ssl:
            scheme = "https"
        else:
            scheme = "http"

        addr = self.sock.getsockname()
        return "%s://%s:%d" % (scheme, addr[0], addr[1])

    def set_options(self, sock, bound=False):
        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
        return super().set_options(sock, bound=bound)
class BaseSocket(object):

    def __init__(self, address, conf, log, fd=None):
        self.log = log
        self.conf = conf

        self.cfg_addr = address
        if fd is None:
            sock = socket.socket(self.FAMILY, socket.SOCK_STREAM)
            bound = False
        else:
            sock = socket.fromfd(fd, self.FAMILY, socket.SOCK_STREAM)
            os.close(fd)
            bound = True

        self.sock = self.set_options(sock, bound=bound)

    def __str__(self):
        return "<socket %d>" % self.sock.fileno()

    def __getattr__(self, name):
        return getattr(self.sock, name)

    def set_options(self, sock, bound=False):
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        if (self.conf.reuse_port
            and hasattr(socket, 'SO_REUSEPORT')):  # pragma: no cover
            try:
                sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
            except socket.error as err:
                if err.errno not in (errno.ENOPROTOOPT, errno.EINVAL):
                    raise
        if not bound:
            self.bind(sock)
        sock.setblocking(0)

        # make sure that the socket can be inherited
        if hasattr(sock, "set_inheritable"):
            sock.set_inheritable(True)

        sock.listen(self.conf.backlog)
        return sock

    def bind(self, sock):
        sock.bind(self.cfg_addr)

    def close(self):
        if self.sock is None:
            return

        try:
            self.sock.close()
        except socket.error as e:
            self.log.info("Error while closing socket %s", str(e))

        self.sock = None

再回到 arbiter.py 中的 run(), start() 之后会进行 murder_workers。

# arbiter.py
def murder_workers(self):
    """\
    Kill unused/idle workers
    """
    if not self.timeout:
        return
    workers = list(self.WORKERS.items())
    for (pid, worker) in workers:
        try:
            if time.time() - worker.tmp.last_update() <= self.timeout:
                continue
        except (OSError, ValueError):
            continue

        if not worker.aborted:
            self.log.critical("WORKER TIMEOUT (pid:%s)", pid)
            worker.aborted = True
            self.kill_worker(pid, signal.SIGABRT)
        else:
            self.kill_worker(pid, signal.SIGKILL)

def last_update(self):
    return os.fstat(self._tmp.fileno()).st_ctime

arbiter 是通过检查临时文件的最新修改时间来判断每个 worker 是否 timeout 的。Worker 修改这个文件的逻辑后面会进行介绍。若超过设定的 timeout,arbiter 会将这个 worker kill 掉。


arbiter 接着 manage_workers():

# arbiter.py
def manage_workers(self):
    """\
    Maintain the number of workers by spawning or killing
    as required.
    """
    if len(self.WORKERS) < self.num_workers:
        self.spawn_workers()

    workers = self.WORKERS.items()
    workers = sorted(workers, key=lambda w: w[1].age)
    while len(workers) > self.num_workers:
        (pid, _) = workers.pop(0)
        self.kill_worker(pid, signal.SIGTERM)

    active_worker_count = len(workers)
    if self._last_logged_active_worker_count != active_worker_count:
        self._last_logged_active_worker_count = active_worker_count
        self.log.debug("{0} workers".format(active_worker_count),
                       extra={"metric": "gunicorn.workers",
                              "value": active_worker_count,
                              "mtype": "gauge"})

manage_workers() 功能是管理工作进程的数量是跟配置中的 worker 数量一致,小于的话会进行 spawn_workers(),大于的话会对最旧的 worker 进行 kill_worker(pid, signal.SIGTERM)。

def spawn_worker(self):
    self.worker_age += 1
    # 根据配置中的 worker_class 创建对应的 worker instance,它会把LISTENERS传给worker
    worker = self.worker_class(self.worker_age, self.pid, self.LISTENERS,
                               self.app, self.timeout / 2.0,
                               self.cfg, self.log)
    self.cfg.pre_fork(self, worker)
    # 使用fork创建子进程后,子进程会复制父进程的数据信息,而后程序就分两个进程继续运行后面的程序。
    # 在子进程内,方法会返回0;在父进程内,方法会返回子进程的编号PID
    pid = os.fork()
    if pid != 0:
        worker.pid = pid
        # 父进程将work存进self.WORKERS[pid]里,方便管理
        self.WORKERS[pid] = worker
        return pid

    # Do not inherit the temporary files of other workers
    for sibling in self.WORKERS.values():
        sibling.tmp.close()

    # Process Child
    worker.pid = os.getpid()
    try:
        util._setproctitle("worker [%s]" % self.proc_name)
        self.log.info("Booting worker with pid: %s", workerx.pid)
        self.cfg.post_fork(self, worker)
        worker.init_process()
        sys.exit(0)
    except SystemExit:
        raise
    except AppImportError as e:
        self.log.debug("Exception while loading the application",
                       exc_info=True)
        print("%s" % e, file=sys.stderr)
        sys.stderr.flush()
        sys.exit(self.APP_LOAD_ERROR)
    except:
        self.log.exception("Exception in worker process")
        if not worker.booted:
            sys.exit(self.WORKER_BOOT_ERROR)
        sys.exit(-1)
    finally:
        self.log.info("Worker exiting (pid: %s)", worker.pid)
        try:
            worker.tmp.close()
            self.cfg.worker_exit(self, worker)
        except:
            self.log.warning("Exception during worker exit:\n%s",
                              traceback.format_exc())

worker 有 sync、gthread、eventlet、gevent 和 tornado 五种类型。默认的是 sync。就先看 sync 的代码。


继续看 worker.init_process() 函数,SyncWorker 没有 override 基类的这个方法,所以会运行基类的方法:

# workers/base.py
def init_process(self):
    """\
    If you override this method in a subclass, the last statement
    in the function should be to call this method with
    super().init_process() so that the ``run()`` loop is initiated.
    """

    # set environment' variables
    if self.cfg.env:
        for k, v in self.cfg.env.items():
            os.environ[k] = v

    util.set_owner_process(self.cfg.uid, self.cfg.gid,
                           initgroups=self.cfg.initgroups)

    # Reseed the random number generator
    util.seed()

    # For waking ourselves up
    self.PIPE = os.pipe()
    for p in self.PIPE:
        util.set_non_blocking(p)
        util.close_on_exec(p)

    # Prevent fd inheritance
    for s in self.sockets:
        util.close_on_exec(s)
    util.close_on_exec(self.tmp.fileno())

    self.wait_fds = self.sockets + [self.PIPE[0]]

    self.log.close_on_exec()

    self.init_signals()     # 注册信号量

    self.load_wsgi()        # 加载wsgi application

    # start the reloader
    if self.cfg.reload:
        def changed(fname):
            self.log.info("Worker reloading: %s modified", fname)
            self.alive = False
            os.write(self.PIPE[1], b"1")
            self.cfg.worker_int(self)
            time.sleep(0.1)
            sys.exit(0)

        reloader_cls = reloader_engines[self.cfg.reload_engine]
        self.reloader = reloader_cls(extra_files=self.cfg.reload_extra_files,
                                     callback=changed)
        self.reloader.start()

    self.cfg.post_worker_init(self)

    # Enter main run loop
    self.booted = True
    self.run()


最后会运行 run():

# workers/sync.py
def run(self):
    # if no timeout is given the worker will never wait and will
    # use the CPU for nothing. This minimal timeout prevent it.
    timeout = self.timeout or 0.5

    # self.socket appears to lose its blocking status after
    # we fork in the arbiter. Reset it here.
    for s in self.sockets:
        s.setblocking(0)

    if len(self.sockets) > 1:
        self.run_for_multiple(timeout)
    else:
        self.run_for_one(timeout)

以绑定单端口的情况为例:

# workers/sync.py
def run_for_one(self, timeout):
    listener = self.sockets[0]
    while self.alive:
        self.notify() # 通过notify向master做心跳,即修改临时文件

        # Accept a connection. If we get an error telling us
        # that no connection is waiting we fall down to the
        # select which is where we'll wait for a bit for new
        # workers to come give us some love.
        try:
            self.accept(listener)
            # Keep processing clients until no one is waiting. This
            # prevents the need to select() for every client that we
            # process.
            continue

        except EnvironmentError as e:
            if e.errno not in (errno.EAGAIN, errno.ECONNABORTED,
                    errno.EWOULDBLOCK):
                raise

        if not self.is_parent_alive():
            return

        try:
            self.wait(timeout)
        except StopWaiting:
            return
# workertmp.py
def notify(self):
    self.spinner = (self.spinner + 1) % 2
    os.fchmod(self._tmp.fileno(), self.spinner)

从 run_for_one() 中我们得知,worker 只要活着,就会不断地 accept 就绪的请求,即 self.accept(listener):

# workers/sync.py
def accept(self, listener):
    client, addr = listener.accept()
    client.setblocking(1)             # 将 socket 设置为阻塞模式,该 worker 进程会一直 block 直到请求处理完成
    util.close_on_exec(client)
    self.handle(listener, client, addr)

handle(listener, client, addr) 中会对来自 client 请求进行处理:

# workers/sync.py
def handle(self, listener, client, addr):
    req = None
    try:
        if self.cfg.is_ssl:
            client = ssl.wrap_socket(client, server_side=True,
                **self.cfg.ssl_options)

        parser = http.RequestParser(self.cfg, client)
        req = next(parser)
        self.handle_request(listener, req, client, addr)
    except http.errors.NoMoreData as e:
        self.log.debug("Ignored premature client disconnection. %s", e)
    except StopIteration as e:
        self.log.debug("Closing connection. %s", e)
    except ssl.SSLError as e:
        if e.args[0] == ssl.SSL_ERROR_EOF:
            self.log.debug("ssl connection closed")
            client.close()
        else:
            self.log.debug("Error processing SSL request.")
            self.handle_error(req, client, addr, e)
    except EnvironmentError as e:
        if e.errno not in (errno.EPIPE, errno.ECONNRESET):
            self.log.exception("Socket error processing request.")
        else:
            if e.errno == errno.ECONNRESET:
                self.log.debug("Ignoring connection reset")
            else:
                self.log.debug("Ignoring EPIPE")
    except Exception as e:
        self.handle_error(req, client, addr, e)
    finally:
        util.close(client)
# workers/sync.py
def handle_request(self, listener, req, client, addr):
    environ = {}
    resp = None
    try:
        self.cfg.pre_request(self, req)
        request_start = datetime.now()
        # 解析request,保存在environ中,初始化response
        resp, environ = wsgi.create(req, client, addr,
                listener.getsockname(), self.cfg)
        # Force the connection closed until someone shows
        # a buffering proxy that supports Keep-Alive to
        # the backend.
        resp.force_close()
        self.nr += 1
        if self.nr >= self.max_requests:
            self.log.info("Autorestarting worker after current request.")
            self.alive = False
        # wsgi接口,传递environ 和 start_response到 wsgi application
        respiter = self.wsgi(environ, resp.start_response)
        try:
            if isinstance(respiter, environ['wsgi.file_wrapper']):
                resp.write_file(respiter)
            else:
                for item in respiter:
                    resp.write(item)
            resp.close()
            request_time = datetime.now() - request_start
            self.log.access(resp, req, environ, request_time)
        finally:
            if hasattr(respiter, "close"):
                respiter.close()
    except EnvironmentError:
        # pass to next try-except level
        util.reraise(*sys.exc_info())
    except Exception:
        if resp and resp.headers_sent:
            # If the requests have already been sent, we should close the
            # connection to indicate the error.
            self.log.exception("Error handling request")
            try:
                client.shutdown(socket.SHUT_RDWR)
                client.close()
            except EnvironmentError:
                pass
            raise StopIteration()
        raise
    finally:
        try:
            self.cfg.post_request(self, req, environ, resp)
        except Exception:
            self.log.exception("Exception in post_request hook")

如果没有在等待的请求,accept() 会抛出 WOULDBLOCK 或  EAGAIN,因为被 except catch 住,会进入到 self.wait(timeout),使用 select 监听 wait_fds,直到某个 socket 就绪:

def wait(self, timeout):
    try:
        self.notify()
        ret = select.select(self.wait_fds, [], [], timeout)
        if ret[0]:
            if self.PIPE[0] in ret[0]:
                os.read(self.PIPE[0], 1)
            return ret[0]

    except select.error as e:
        if e.args[0] == errno.EINTR:
            return self.sockets
        if e.args[0] == errno.EBADF:
            if self.nr < 0:
                return self.sockets
            else:
                raise StopWaiting
        raise
至此 sync 模式的 wsgi server 主要流程就介绍完了。

sync 模式中,每个 worker 进程只有一个线程,阻塞地处理 client 请求。也就是说一个 worker 同一时间只能处理一个 client 的请求。因此效率有限。

Gunicorn 中一种更高效的 aync worker 是 gevent worker,它是基于协程的。

Gevent 官方文档:http://www.gevent.org/contents.html

GeventWorker 继承了 AyncWorker,AsyncWorker 继承 BaseWorker。 GeventWorker override了init_process():
# workers/ggevent.py
def init_process(self):
    self.patch()     # monkey patch ,将所有阻塞实现换成gevent库的async实现
    hub.reinit()     # 为gevent hub在新创建的子进程中启动做准备。os.fork()后子进程必须调用
    super().init_process()

def patch(self):
    monkey.patch_all()

    # monkey patch sendfile to make it none blocking
    patch_sendfile()

    # patch sockets
    sockets = []
    for s in self.sockets:
        sockets.append(socket.socket(s.FAMILY, socket.SOCK_STREAM,
            fileno=s.sock.fileno()))
    self.sockets = sockets
super().init_process() 调用基类的 init_process()。init_process() 最后会调用 ggevent 的 run():
# workers/ggevent.py
def run(self):
    servers = []
    ssl_args = {}

    if self.cfg.is_ssl:
        ssl_args = dict(server_side=True, **self.cfg.ssl_options)

    for s in self.sockets:
        # 设置为非阻塞
        s.setblocking(1)
        # 用来限制单个线程内可以并发的gevent.greenlet的数目,
        # 即一个worker可以维持的最大连接数,默认是1000,如果超过,请求可能被kill
        pool = Pool(self.worker_connections)
        if self.server_class is not None:
            environ = base_environ(self.cfg)
            environ.update({
                "wsgi.multithread": True,
                "SERVER_SOFTWARE": VERSION,
            })
            server = self.server_class(
                s, application=self.wsgi, spawn=pool, log=self.log,
                handler_class=self.wsgi_handler, environ=environ,
                **ssl_args)
        else:
            # self.handle是请求处理函数
            hfun = partial(self.handle, s)
            # 创建gevent StreamServer,当socket监听到端口的请求时,会创建一个gevent.greenlet,
            # 运行hfun函数对其进行处理
            server = StreamServer(s, handle=hfun, spawn=pool, **ssl_args)

        # 启动server
        server.start()
        servers.append(server)

    # 每秒向Arbiter发送心跳,代表自己还活着
    while self.alive:
        self.notify()
        gevent.sleep(1.0)

    try:
        # Stop accepting requests
        for server in servers:
            if hasattr(server, 'close'):  # gevent 1.0
                server.close()
            if hasattr(server, 'kill'):  # gevent < 1.0
                server.kill()

        # Handle current requests until graceful_timeout
        ts = time.time()
        while time.time() - ts <= self.cfg.graceful_timeout:
            accepting = 0
            for server in servers:
                if server.pool.free_count() != server.pool.size:
                    accepting += 1

            # if no server is accepting a connection, we can exit
            if not accepting:
                return

            self.notify()
            gevent.sleep(1.0)

        # Force kill all active the handlers
        self.log.warning("Worker graceful timeout (pid:%s)" % self.pid)
        for server in servers:
            server.stop(timeout=1)
    except:
        pass
def handle(self, listener, client, addr):
    # Connected socket timeout defaults to socket.getdefaulttimeout().
    # This forces to blocking mode.
    # 设置为非阻塞
    client.setblocking(1)
    super().handle(listener, client, addr)

handle(listener, client, addr) 会运行 AsyncWorker 的 handle():

# workers/base_async.py
def handle(self, listener, client, addr):
    req = None
    try:
        parser = http.RequestParser(self.cfg, client)
        try:
            listener_name = listener.getsockname()
            # keepalive: 在keep-alive连接上等待请求的秒数
            if not self.cfg.keepalive:
                req = next(parser)
                self.handle_request(listener_name, req, client, addr)
            else:
                # keepalive loop
                proxy_protocol_info = {}
                while True:
                    req = None
                    with self.timeout_ctx():
                        req = next(parser)
                    if not req:
                        break
                    if req.proxy_protocol_info:
                        proxy_protocol_info = req.proxy_protocol_info
                    else:
                        req.proxy_protocol_info = proxy_protocol_info
                    self.handle_request(listener_name, req, client, addr)
        except http.errors.NoMoreData as e:
            self.log.debug("Ignored premature client disconnection. %s", e)
        except StopIteration as e:
            self.log.debug("Closing connection. %s", e)
        except ssl.SSLError:
            # pass to next try-except level
            util.reraise(*sys.exc_info())
        except EnvironmentError:
            # pass to next try-except level
            util.reraise(*sys.exc_info())
        except Exception as e:
            self.handle_error(req, client, addr, e)
    except ssl.SSLError as e:
        if e.args[0] == ssl.SSL_ERROR_EOF:
            self.log.debug("ssl connection closed")
            client.close()
        else:
            self.log.debug("Error processing SSL request.")
            self.handle_error(req, client, addr, e)
    except EnvironmentError as e:
        if e.errno not in (errno.EPIPE, errno.ECONNRESET):
            self.log.exception("Socket error processing request.")
        else:
            if e.errno == errno.ECONNRESET:
                self.log.debug("Ignoring connection reset")
            else:
                self.log.debug("Ignoring EPIPE")
    except Exception as e:
        self.handle_error(req, client, addr, e)
    finally:
        util.close(client)
# workers/base_async.py
def handle_request(self, listener_name, req, sock, addr):
    request_start = datetime.now()
    environ = {}
    resp = None
    try:
        self.cfg.pre_request(self, req)
        resp, environ = wsgi.create(req, sock, addr,
                listener_name, self.cfg)
        environ["wsgi.multithread"] = True
        self.nr += 1
        if self.alive and self.nr >= self.max_requests:
            self.log.info("Autorestarting worker after current request.")
            resp.force_close()
            self.alive = False

        if not self.cfg.keepalive:
            resp.force_close()

        respiter = self.wsgi(environ, resp.start_response)
        if self.is_already_handled(respiter):
            return False
        try:
            if isinstance(respiter, environ['wsgi.file_wrapper']):
                resp.write_file(respiter)
            else:
                for item in respiter:
                    resp.write(item)
            resp.close()
            request_time = datetime.now() - request_start
            self.log.access(resp, req, environ, request_time)
        finally:
            if hasattr(respiter, "close"):
                respiter.close()
        if resp.should_close():
            raise StopIteration()
    except StopIteration:
        raise
    except EnvironmentError:
        # If the original exception was a socket.error we delegate
        # handling it to the caller (where handle() might ignore it)
        util.reraise(*sys.exc_info())
    except Exception:
        if resp and resp.headers_sent:
            # If the requests have already been sent, we should close the
            # connection to indicate the error.
            self.log.exception("Error handling request")
            try:
                sock.shutdown(socket.SHUT_RDWR)
                sock.close()
            except EnvironmentError:
                pass
            raise StopIteration()
        raise
    finally:
        try:
            self.cfg.post_request(self, req, environ, resp)
        except Exception:
            self.log.exception("Exception in post_request hook")
    return True







reference

https://www.jianshu.com/p/1e5feccb37d9