Tornado 0103 - 用户指南: 协程


协程(Coroutines)

协程是在 Tornado 中编写异步代码的推荐方法。协程使用 Python await 或 yield 关键字来挂起和恢复执行而不是一系列回调(在 gevent 这样的框架中看到的协作轻量级线程有时也被称为协程,但在 Tornado 中所有协程都使用显式上下文切换并被称为异步函数)。

协程几乎和同步代码一样简单,但不需要花费任何线程成本。它们还通过减少上下文切换可能发生的位置数量,使 并发性 更易于推理。

Example:

async def fetch_coroutine(url):
    http_client = AsyncHTTPClient()
    response = await http_client.fetch(url)
    return response.body

原生协程与装饰器协程(Native vs decorated coroutines)

Python 3.5 引入了async 和 await 关键字(使用这些关键字的函数也称为“原生协程(native coroutines)”)。为了与旧版本的 Python 兼容,您可以使用 tornado.gen.coroutine 装饰器来使用“装饰”或“基于 yield ”的协同程序。

尽可能使用原生协程。仅在需要与旧版本的 Python 兼容时才使用修饰器协程。Tornado 文档中的示例通常使用原生形式。

两种形式之间的翻译通常很简单:

# Decorated:                    # Native:

# Normal function declaration
# with decorator                # "async def" keywords
@gen.coroutine
def a():                        async def a():
    # "yield" all async funcs       # "await" all async funcs
    b = yield c()                   b = await c()
    # "return" and "yield"
    # cannot be mixed in
    # Python 2, so raise a
    # special exception.            # Return normally
    raise gen.Return(b)             return b

两种形式的协程之间的其他差异是:

  • 原生协程通常更快。
  • 原生协程可以使用 async for 和 async with 语句,这使得某些模式更加简单。
  • 除非 await 或 yield 它们,否则原生协程根本不会运行。装饰器协程一旦被调用就可以“在后台”开始运行。请注意,对于这两种协程,使用 await 或 yield 很重要,这样任何异常都可以使用。
  • 装饰器协程与 concurrent.futures 包有额外的集成,允许直接生成 executor.submit 的结果。对于原生协程,请改用 IOLoop.run_in_executor。
  • 装饰器协程通过产生列表或字典来支持等待多个对象的一些简写。在原生协程中则使用tornado.gen.multi 执行此操作。
  • 装饰器协程可以支持与其他软件包的集成,包括通过转换函数注册表的 Twisted。要在原生协程中访问此功能,请使用 tornado.gen.convert_yielded。
  • 装饰器协程总是返回一个 Future 对象。原生协程返回一个 awaitable 对象,而不是 Future。在 Tornado 中,两者大多可以互换。

怎么运行的

本节介绍装饰器协程的操作。原生协程在概念上是相似的,但由于与 Python 运行时的额外集成而稍微复杂一些。

包含 yield 的函数是生成器。所有生成器都是异步的,在调用时,它们返回一个生成器对象而不是运行完成。 @gen.coroutine 装饰器通过 yield 表达式与生成器通信,并通过返回 Future 与协程的调用者通信。

这是 coroutine 装饰器内循环的简化版本:

# Simplified inner loop of tornado.gen.Runner
def run(self):
    # send(x) makes the current yield return x.
    # It returns when the next yield is reached
    future = self.gen.send(self.next)
    def callback(f):
        self.next = f.result()
        self.run()
    future.add_done_callback(callback)

装饰器从生成器接收 Future,等待(不阻塞)该 Future 完成,然后 “unwraps” Future 并将结果作为 yield 表达式的结果发送回生成器。大多数异步代码从不直接接触 Future 类,除非立即将异步函数返回的 Future 传递给 yield 表达式。

如何调用协程

协程不会以正常方式引发异常:它们引发的任何异常都将被困在等待(awaitable)对象中,直到它被 yielded 为止。这意味着以正确的方式调用协同程序很重要,否则您可能会发现未被注意到的错误:

async def divide(x, y):
    return x / y

def bad_call():
    # This should raise a ZeroDivisionError, but it won't because
    # the coroutine is called incorrectly.
    divide(1, 0)

在几乎所有情况下,任何调用协程的函数都必须是协程本身,并在调用中使用 await 或 yield 关键字。当覆盖超类中定义的方法时,请查阅文档以查看是否允许协程(文档应该说方法(method)“可能是协程”或“可能返回 Future”):

async def good_call():
    # await will unwrap the object returned by divide() and raise
    # the exception.
    await divide(1, 0)

有时你可能想要“解雇并忘记”一个协程而不等待它的结果。在这种情况下,建议使用IOLoop.spawn_callback,这使得 IOLoop 负责调用。如果失败,IOLoop 将记录堆栈跟踪:

# The IOLoop will catch the exception and print a stack trace in
# the logs. Note that this doesn't look like a normal call, since
# we pass the function object to be called by the IOLoop.
IOLoop.current().spawn_callback(divide, 1, 0)

对于使用 @gen.coroutine 的函数,IOLoop.spawn_callback 这种方式是建议使用,但是对于使用 async def 的函数则是必须使用(否则协程运行程序将无法启动)。

最后,在程序的顶层,如果 IOLoop 尚未运行,你可以启动 IOLoop,运行协程,然后使用IOLoop.run_sync 方法停止 IOLoop。这通常用于启动面向批处理的程序的 main 函数:

# run_sync() doesn't take arguments, so we must wrap the
# call in a lambda.
IOLoop.current().run_sync(lambda: divide(1, 0))

协程模式

调用阻塞函数

从协程中调用阻塞函数的最简单方法是使用IOLoop.run_in_executor,它返回与协程兼容的 Futures:

async def call_blocking():
    await IOLoop.current().run_in_executor(blocking_func, args)

并行

multi 函数接受其值为 Futures 的 lists 和 dicts,并且并行等待所有这些 Futures:

from tornado.gen import multi

async def parallel_fetch(url1, url2):
    resp1, resp2 = await multi([http_client.fetch(url1),
                                http_client.fetch(url2)])

async def parallel_fetch_many(urls):
    responses = await multi ([http_client.fetch(url) for url in urls])
    # responses is a list of HTTPResponses in the same order

async def parallel_fetch_dict(urls):
    responses = await multi({url: http_client.fetch(url)
                             for url in urls})
    # responses is a dict {url: HTTPResponse}

在装饰器协程中,可以直接 yield list 或 dict:

@gen.coroutine
def parallel_fetch_decorated(url1, url2):
    resp1, resp2 = yield [http_client.fetch(url1),
                          http_client.fetch(url2)])

交错

有时去保存 Future 而不是立即 yield 它,是会很有用的,因此你可以在等待之前启动另一个操作。

from tornado.gen import convert_yielded

async def get(self):
    # convert_yielded() starts the native coroutine in the background.
    # This is equivalent to asyncio.ensure_future() (both work in Tornado).
    fetch_future = convert_yielded(self.fetch_next_chunk())
    while True:
        chunk = yield fetch_future
        if chunk is None: break
        self.write(chunk)
        fetch_future = convert_yielded(self.fetch_next_chunk())
        yield self.flush()

这对于装饰器协程来说更容易一些,因为它们在被调用时就立即启动:

@gen.coroutine
def get(self):
    fetch_future = self.fetch_next_chunk()
    while True:
        chunk = yield fetch_future
        if chunk is None: break
        self.write(chunk)
        fetch_future = self.fetch_next_chunk()
        yield self.flush()

循环

在原生协程中,可以使用 async for。在旧版本的 Python 中,使用协程进行循环是很棘手的,因为无法在 for 循环或 while 循环的每次迭代中产生并捕获 yield 的结果。相反,您需要将循环条件与访问结果分开,如本例中的 Motor

import motor
db = motor.MotorClient().test

@gen.coroutine
def loop_example(collection):
    cursor = db.collection.find()
    while (yield cursor.fetch_next):
        doc = cursor.next_object()

在后台运行

PeriodicCallback 通常不与协程一起使用。相反,一个协程可以包含一个 while True:循环并使用 tornado.gen.sleep:

async def minute_loop():
    while True:
        await do_something()
        await gen.sleep(60)

# Coroutines that loop forever are generally started with
# spawn_callback().
IOLoop.current().spawn_callback(minute_loop)

有时可能需要更复杂的循环。例如,上面这个循环需要设计为每 60 + N 秒运行一次,其中 N 是 do_something() 的运行时间。要完全每 60 秒运行一次,请使用上面的交错模式:

async def minute_loop2():
    while True:
        nxt = gen.sleep(60)   # Start the clock.
        await do_something()  # Run while the clock is ticking.
        await nxt             # Wait for the timer to run out.