掌握 Python 异步:使用协程和事件循环提升应用程序性能
python 的异步编程改变了构建高性能应用程序的游戏规则。我已经使用它很多年了,如果使用得当,它的强大功能总是让我惊叹不已。
python 异步模型的核心是协程和事件循环。协程是可以暂停和恢复执行的特殊函数,可以在没有线程开销的情况下实现高效的多任务处理。另一方面,事件循环是驱动这些协程、管理其执行和处理 i/o 操作的引擎。
让我们从协程开始。在 python 中,我们使用 async def 语法定义它们。这是一个简单的例子:
async def greet(name): print(f"hello, {name}!") await asyncio.sleep(1) print(f"goodbye, {name}!")
这个协程向一个人打招呼,等待一秒钟,然后说再见。 wait 关键字在这里至关重要 - 它允许协程暂停其执行并将控制权交还给事件循环。
但是协程在幕后是如何工作的呢?它们实际上是构建在 python 的生成器功能之上的。当您调用协程时,它不会立即运行。相反,它返回一个协程对象。这个对象可以发送值并且可以产生值,就像生成器一样。
事件循环负责实际运行这些协程。它维护一个任务队列(它们是协程的包装器)并逐个执行它们。当协程遇到await语句时,事件循环将挂起它并继续执行下一个任务。这就是协作式多任务处理的本质——任务自愿放弃控制权,让其他任务运行。
这是事件循环如何工作的简化版本:
class eventloop: def __init__(self): self.ready = deque() self.sleeping = [] def call_soon(self, callback): self.ready.append(callback) def call_later(self, delay, callback): deadline = time.time() + delay heapq.heappush(self.sleeping, (deadline, callback)) def run_forever(self): while true: self.run_once() def run_once(self): now = time.time() while self.sleeping and self.sleeping[0][0] <= now: _, callback = heapq.heappop(self.sleeping) self.ready.append(callback) if self.ready: callback = self.ready.popleft() callback() else: time.sleep(0.1) # avoid busy waiting
此事件循环管理两种类型的任务:准备运行的任务(在就绪双端队列中)和正在休眠的任务(在休眠列表中)。 run_forever 方法会持续运行任务,直到没有剩余任务为止。
现在我们来谈谈任务调度。 python 中的 asyncio 模块提供了具有高级调度功能的更复杂的事件循环。它可以处理 i/o 操作、运行子进程,甚至与其他事件循环集成。
以下是如何使用 asyncio 同时运行多个协程:
import asyncio async def task1(): print("task 1 starting") await asyncio.sleep(2) print("task 1 finished") async def task2(): print("task 2 starting") await asyncio.sleep(1) print("task 2 finished") async def main(): await asyncio.gather(task1(), task2()) asyncio.run(main())
此脚本将启动两个任务,但任务 2 将在任务 1 之前完成,因为它休眠的时间较短。
异步编程最强大的应用之一是网络操作。让我们看一个简单的异步 web 服务器:
import asyncio async def handle_client(reader, writer): data = await reader.read(100) message = data.decode() addr = writer.get_extra_info('peername') print(f"received {message!r} from {addr!r}") response = f"echo: {message} " writer.write(response.encode()) await writer.drain() print("close the connection") writer.close() async def main(): server = await asyncio.start_server( handle_client, '127.0.0.1', 8888) addr = server.sockets[0].getsockname() print(f'serving on {addr}') async with server: await server.serve_forever() asyncio.run(main())
该服务器无需使用线程即可同时处理多个客户端,效率很高。
但是异步编程不仅仅适用于服务器。它对于客户端也非常有用,特别是当您需要发出多个网络请求时。这是一个简单的网络抓取工具,可以同时获取多个页面:
import asyncio import aiohttp async def fetch_page(session, url): async with session.get(url) as response: return await response.text() async def main(): urls = [ 'http://example.com', 'http://example.org', 'http://example.net' ] async with aiohttp.clientsession() as session: tasks = [fetch_page(session, url) for url in urls] pages = await asyncio.gather(*tasks) for url, page in zip(urls, pages): print(f"page from {url}: {len(page)} bytes") asyncio.run(main())
该抓取工具可以同时获取多个页面,与同步方法相比,显着加快了处理速度。
现在,让我们深入探讨一些更高级的概念。 python 异步模型的一项有趣功能是您可以创建自己的事件循环。如果您需要将异步代码与其他框架集成,或者想要针对特定用例进行优化,这会很有用。
这是一个简单的自定义事件循环,可以运行同步和异步回调:
import asyncio from collections import deque class customeventloop(asyncio.abstracteventloop): def __init__(self): self._ready = deque() self._stopping = false def call_soon(self, callback, *args): self._ready.append((callback, args)) def run_forever(self): while not self._stopping: self._run_once() def _run_once(self): ntodo = len(self._ready) for _ in range(ntodo): if not self._ready: break callback, args = self._ready.popleft() callback(*args) def run_until_complete(self, future): asyncio.futures.ensure_future(future, loop=self) self.run_forever() def stop(self): self._stopping = true # usage loop = customeventloop() asyncio.set_event_loop(loop) async def hello(): print("hello, world!") loop.run_until_complete(hello())
这个自定义循环非常基础,但它演示了核心原则。您可以扩展它来处理更复杂的场景,例如 i/o 操作或计时器。
调试异步代码可能具有挑战性,尤其是在处理复杂的应用程序时。我发现有用的一项技术是使用 asyncio 的调试模式。您可以像这样启用它:
import asyncio asyncio.run(main(), debug=true)
这将提供更详细的错误消息和警告,例如从未完成的协程或运行时间过长的回调。
另一个有用的调试技术是使用 asyncio 的任务自省功能。例如,您可以获得所有正在运行的任务的列表:
import asyncio async def main(): task1 = asyncio.create_task(asyncio.sleep(1)) task2 = asyncio.create_task(asyncio.sleep(2)) for task in asyncio.all_tasks(): print(f"task: {task.get_name()}, done: {task.done()}") await task1 await task2 asyncio.run(main())
这可以帮助您了解程序在任何特定时刻正在做什么。
在优化异步代码时,一个关键原则是最大限度地减少同步操作所花费的时间。任何长时间运行的同步代码都会阻塞事件循环,从而阻止其他协程运行。如果您有 cpu 密集型任务,请考虑在单独的线程或进程中运行它们。
另一种优化技术是当您有多个可以同时运行的协程时使用 asyncio.gather。这比一一等待更有效率:
# Less efficient result1 = await coro1() result2 = await coro2() # More efficient result1, result2 = await asyncio.gather(coro1(), coro2())
最后,请记住,异步编程并不总是最好的解决方案。对于需要大量等待的 i/o 密集型任务,它可以提供显着的性能改进。但对于 cpu 密集型任务,传统的多线程或多处理可能更合适。
总之,python 的异步编程模型基于协程和事件循环,提供了一种编写高效、可扩展应用程序的强大方法。无论您是构建 web 服务器、网络客户端还是数据处理管道,理解这些概念都可以帮助您充分利用 python 的异步功能。与任何强大的工具一样,它需要练习和仔细思考才能有效使用,但结果确实令人印象深刻。
我们的创作
一定要看看我们的创作:
投资者中心 | 智能生活 | 时代与回响 | 令人费解的谜团 | 印度教 | 精英开发 | js学校
我们在媒体上
科技考拉洞察 | 时代与回响世界 | 投资者中央媒体 | 令人费解的谜团 | 科学与时代媒介 | 现代印度教