首页 后端开发 Python教程 掌握 Python 协程:为强大的并发应用程序创建自定义异步工具

掌握 Python 协程:为强大的并发应用程序创建自定义异步工具

Nov 29, 2024 pm 12:18 PM

Master Python Coroutines: Create Custom Async Tools for Powerful Concurrent Apps

Python 中的协程是编写异步代码的强大工具。它们彻底改变了我们处理并发操作的方式,使构建可扩展且高效的应用程序变得更加容易。我花了很多时间使用协程,并且很高兴能分享一些关于创建自定义异步原语的见解。

让我们从基础开始。协程是可以暂停和恢复的特殊函数,允许协作式多任务处理。它们是 Python 的 async/await 语法的基础。当您定义协程时,您实际上是在创建一个函数,该函数可以将控制权交还给事件循环,从而允许其他任务运行。

要创建自定义可等待对象,您需要实现 await 方法。该方法应该返回一个迭代器。这是一个简单的例子:

class CustomAwaitable:
    def __init__(self, value):
        self.value = value

    def __await__(self):
        yield
        return self.value

async def use_custom_awaitable():
    result = await CustomAwaitable(42)
    print(result)  # Output: 42
登录后复制
登录后复制

这个CustomAwaitable类可以与await关键字一起使用,就像内置的awaitables一样。当等待时,它会产生一次控制,然后返回它的值。

但是如果我们想创建更复杂的异步原语怎么办?让我们看看如何实现自定义信号量。信号量用于控制多个协程对共享资源的访问:

import asyncio

class CustomSemaphore:
    def __init__(self, value=1):
        self._value = value
        self._waiters = []

    async def acquire(self):
        while self._value <= 0:
            fut = asyncio.get_running_loop().create_future()
            self._waiters.append(fut)
            await fut
        self._value -= 1

    def release(self):
        self._value += 1
        if self._waiters:
            asyncio.get_running_loop().call_soon_threadsafe(self._waiters.pop().set_result, None)

    async def __aenter__(self):
        await self.acquire()
        return self

    async def __aexit__(self, exc_type, exc, tb):
        self.release()

async def worker(semaphore, num):
    async with semaphore:
        print(f"Worker {num} acquired the semaphore")
        await asyncio.sleep(1)
    print(f"Worker {num} released the semaphore")

async def main():
    semaphore = CustomSemaphore(2)
    tasks = [asyncio.create_task(worker(semaphore, i)) for i in range(5)]
    await asyncio.gather(*tasks)

asyncio.run(main())
登录后复制
登录后复制

此 CustomSemaphore 类实现获取和释放方法,以及异步上下文管理器协议(aenteraexit)。它允许最多两个协程同时获取信号量。

现在,我们来谈谈创建高效的事件循环。虽然 Python 的 asyncio 提供了强大的事件循环实现,但在某些情况下您可能需要自定义一个。这是自定义事件循环的基本示例:

import time
from collections import deque

class CustomEventLoop:
    def __init__(self):
        self._ready = deque()
        self._stopping = False

    def call_soon(self, callback, *args):
        self._ready.append((callback, args))

    def run_forever(self):
        while not self._stopping:
            self._run_once()

    def _run_once(self):
        ntodo = len(self._ready)
        for _ in range(ntodo):
            callback, args = self._ready.popleft()
            callback(*args)

    def stop(self):
        self._stopping = True

    def run_until_complete(self, coro):
        def _done_callback(fut):
            self.stop()

        task = self.create_task(coro)
        task.add_done_callback(_done_callback)
        self.run_forever()
        return task.result()

    def create_task(self, coro):
        task = Task(coro, self)
        self.call_soon(task._step)
        return task

class Task:
    def __init__(self, coro, loop):
        self._coro = coro
        self._loop = loop
        self._done = False
        self._result = None
        self._callbacks = []

    def _step(self):
        try:
            if self._done:
                return
            result = self._coro.send(None)
            if isinstance(result, SleepHandle):
                result._task = self
                self._loop.call_soon(result._wake_up)
            else:
                self._loop.call_soon(self._step)
        except StopIteration as e:
            self.set_result(e.value)

    def set_result(self, result):
        self._result = result
        self._done = True
        for callback in self._callbacks:
            self._loop.call_soon(callback, self)

    def add_done_callback(self, callback):
        if self._done:
            self._loop.call_soon(callback, self)
        else:
            self._callbacks.append(callback)

    def result(self):
        if not self._done:
            raise RuntimeError('Task is not done')
        return self._result

class SleepHandle:
    def __init__(self, duration):
        self._duration = duration
        self._task = None
        self._start_time = time.time()

    def _wake_up(self):
        if time.time() - self._start_time >= self._duration:
            self._task._loop.call_soon(self._task._step)
        else:
            self._task._loop.call_soon(self._wake_up)

async def sleep(duration):
    return SleepHandle(duration)

async def example():
    print("Start")
    await sleep(1)
    print("After 1 second")
    await sleep(2)
    print("After 2 more seconds")
    return "Done"

loop = CustomEventLoop()
result = loop.run_until_complete(example())
print(result)
登录后复制

这个自定义事件循环实现了基本功能,例如运行任务、处理协程,甚至是简单的睡眠功能。它不像 Python 的内置事件循环那样功能丰富,但它演示了核心概念。

编写异步代码的挑战之一是管理任务优先级。虽然Python的asyncio没有为任务提供内置的优先级队列,但我们可以实现自己的:

import asyncio
import heapq

class PriorityEventLoop(asyncio.AbstractEventLoop):
    def __init__(self):
        self._ready = []
        self._stopping = False
        self._clock = 0

    def call_at(self, when, callback, *args, context=None):
        handle = asyncio.Handle(callback, args, self, context)
        heapq.heappush(self._ready, (when, handle))
        return handle

    def call_later(self, delay, callback, *args, context=None):
        return self.call_at(self._clock + delay, callback, *args, context=context)

    def call_soon(self, callback, *args, context=None):
        return self.call_at(self._clock, callback, *args, context=context)

    def time(self):
        return self._clock

    def stop(self):
        self._stopping = True

    def is_running(self):
        return not self._stopping

    def run_forever(self):
        while self._ready and not self._stopping:
            self._run_once()

    def _run_once(self):
        if not self._ready:
            return
        when, handle = heapq.heappop(self._ready)
        self._clock = when
        handle._run()

    def create_task(self, coro):
        return asyncio.Task(coro, loop=self)

    def run_until_complete(self, future):
        asyncio.futures._chain_future(future, self.create_future())
        self.run_forever()
        if not future.done():
            raise RuntimeError('Event loop stopped before Future completed.')
        return future.result()

    def create_future(self):
        return asyncio.Future(loop=self)

async def low_priority_task():
    print("Low priority task started")
    await asyncio.sleep(2)
    print("Low priority task finished")

async def high_priority_task():
    print("High priority task started")
    await asyncio.sleep(1)
    print("High priority task finished")

async def main():
    loop = asyncio.get_event_loop()
    loop.call_later(0.1, loop.create_task, low_priority_task())
    loop.call_later(0, loop.create_task, high_priority_task())
    await asyncio.sleep(3)

asyncio.run(main())
登录后复制

此 PriorityEventLoop 使用堆队列根据任务的计划执行时间来管理任务。您可以通过安排具有不同延迟的任务来分配优先级。

优雅地处理取消是使用协程的另一个重要方面。以下是如何实现可取消任务的示例:

import asyncio

async def cancellable_operation():
    try:
        print("Operation started")
        await asyncio.sleep(5)
        print("Operation completed")
    except asyncio.CancelledError:
        print("Operation was cancelled")
        # Perform any necessary cleanup
        raise  # Re-raise the CancelledError

async def main():
    task = asyncio.create_task(cancellable_operation())
    await asyncio.sleep(2)
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        print("Main: task was cancelled")

asyncio.run(main())
登录后复制

在此示例中,cancellable_operation 捕获 CancelledError,执行任何必要的清理,然后重新引发异常。这允许优雅地处理取消,同时仍然传播取消状态。

让我们探索实现自定义异步迭代器。这些对于创建可以异步迭代的序列非常有用:

class CustomAwaitable:
    def __init__(self, value):
        self.value = value

    def __await__(self):
        yield
        return self.value

async def use_custom_awaitable():
    result = await CustomAwaitable(42)
    print(result)  # Output: 42
登录后复制
登录后复制

这个 AsyncRange 类实现了异步迭代器协议,允许它在异步 for 循环中使用。

最后,让我们看看如何实现自定义异步上下文管理器。这些对于管理需要异步获取和释放的资源很有用:

import asyncio

class CustomSemaphore:
    def __init__(self, value=1):
        self._value = value
        self._waiters = []

    async def acquire(self):
        while self._value <= 0:
            fut = asyncio.get_running_loop().create_future()
            self._waiters.append(fut)
            await fut
        self._value -= 1

    def release(self):
        self._value += 1
        if self._waiters:
            asyncio.get_running_loop().call_soon_threadsafe(self._waiters.pop().set_result, None)

    async def __aenter__(self):
        await self.acquire()
        return self

    async def __aexit__(self, exc_type, exc, tb):
        self.release()

async def worker(semaphore, num):
    async with semaphore:
        print(f"Worker {num} acquired the semaphore")
        await asyncio.sleep(1)
    print(f"Worker {num} released the semaphore")

async def main():
    semaphore = CustomSemaphore(2)
    tasks = [asyncio.create_task(worker(semaphore, i)) for i in range(5)]
    await asyncio.gather(*tasks)

asyncio.run(main())
登录后复制
登录后复制

这个 AsyncResource 类实现了 aenteraexit 方法,允许它与 async with 语句一起使用。

总之,Python 的协程系统为构建自定义异步原语提供了强大的基础。通过了解底层机制和协议,您可以针对特定的异步挑战创建量身定制的解决方案,优化复杂并发场景中的性能,并扩展 Python 的异步功能。请记住,虽然这些自定义实现非常适合学习和特定用例,但 Python 的内置 asyncio 库经过了高度优化,应该成为大多数场景的首选。快乐编码!


我们的创作

一定要看看我们的创作:

投资者中心 | 智能生活 | 时代与回响 | 令人费解的谜团 | 印度教 | 精英开发 | JS学校


我们在媒体上

科技考拉洞察 | 时代与回响世界 | 投资者中央媒体 | 令人费解的谜团 | 科学与时代媒介 | 现代印度教

以上是掌握 Python 协程:为强大的并发应用程序创建自定义异步工具的详细内容。更多信息请关注PHP中文网其他相关文章!

本站声明
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn

热AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover

AI Clothes Remover

用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool

Undress AI Tool

免费脱衣服图片

Clothoff.io

Clothoff.io

AI脱衣机

Video Face Swap

Video Face Swap

使用我们完全免费的人工智能换脸工具轻松在任何视频中换脸!

热门文章

<🎜>:泡泡胶模拟器无穷大 - 如何获取和使用皇家钥匙
4 周前 By 尊渡假赌尊渡假赌尊渡假赌
北端:融合系统,解释
4 周前 By 尊渡假赌尊渡假赌尊渡假赌
Mandragora:巫婆树的耳语 - 如何解锁抓钩
3 周前 By 尊渡假赌尊渡假赌尊渡假赌

热工具

记事本++7.3.1

记事本++7.3.1

好用且免费的代码编辑器

SublimeText3汉化版

SublimeText3汉化版

中文版,非常好用

禅工作室 13.0.1

禅工作室 13.0.1

功能强大的PHP集成开发环境

Dreamweaver CS6

Dreamweaver CS6

视觉化网页开发工具

SublimeText3 Mac版

SublimeText3 Mac版

神级代码编辑软件(SublimeText3)

热门话题

Java教程
1671
14
CakePHP 教程
1428
52
Laravel 教程
1329
25
PHP教程
1276
29
C# 教程
1256
24
Python与C:学习曲线和易用性 Python与C:学习曲线和易用性 Apr 19, 2025 am 12:20 AM

Python更易学且易用,C 则更强大但复杂。1.Python语法简洁,适合初学者,动态类型和自动内存管理使其易用,但可能导致运行时错误。2.C 提供低级控制和高级特性,适合高性能应用,但学习门槛高,需手动管理内存和类型安全。

Python和时间:充分利用您的学习时间 Python和时间:充分利用您的学习时间 Apr 14, 2025 am 12:02 AM

要在有限的时间内最大化学习Python的效率,可以使用Python的datetime、time和schedule模块。1.datetime模块用于记录和规划学习时间。2.time模块帮助设置学习和休息时间。3.schedule模块自动化安排每周学习任务。

Python vs.C:探索性能和效率 Python vs.C:探索性能和效率 Apr 18, 2025 am 12:20 AM

Python在开发效率上优于C ,但C 在执行性能上更高。1.Python的简洁语法和丰富库提高开发效率。2.C 的编译型特性和硬件控制提升执行性能。选择时需根据项目需求权衡开发速度与执行效率。

学习Python:2小时的每日学习是否足够? 学习Python:2小时的每日学习是否足够? Apr 18, 2025 am 12:22 AM

每天学习Python两个小时是否足够?这取决于你的目标和学习方法。1)制定清晰的学习计划,2)选择合适的学习资源和方法,3)动手实践和复习巩固,可以在这段时间内逐步掌握Python的基本知识和高级功能。

Python vs. C:了解关键差异 Python vs. C:了解关键差异 Apr 21, 2025 am 12:18 AM

Python和C 各有优势,选择应基于项目需求。1)Python适合快速开发和数据处理,因其简洁语法和动态类型。2)C 适用于高性能和系统编程,因其静态类型和手动内存管理。

Python标准库的哪一部分是:列表或数组? Python标准库的哪一部分是:列表或数组? Apr 27, 2025 am 12:03 AM

pythonlistsarepartofthestAndArdLibrary,herilearRaysarenot.listsarebuilt-In,多功能,和Rused ForStoringCollections,而EasaraySaraySaraySaraysaraySaraySaraysaraySaraysarrayModuleandleandleandlesscommonlyusedDduetolimitedFunctionalityFunctionalityFunctionality。

Python:自动化,脚本和任务管理 Python:自动化,脚本和任务管理 Apr 16, 2025 am 12:14 AM

Python在自动化、脚本编写和任务管理中表现出色。1)自动化:通过标准库如os、shutil实现文件备份。2)脚本编写:使用psutil库监控系统资源。3)任务管理:利用schedule库调度任务。Python的易用性和丰富库支持使其在这些领域中成为首选工具。

科学计算的Python:详细的外观 科学计算的Python:详细的外观 Apr 19, 2025 am 12:15 AM

Python在科学计算中的应用包括数据分析、机器学习、数值模拟和可视化。1.Numpy提供高效的多维数组和数学函数。2.SciPy扩展Numpy功能,提供优化和线性代数工具。3.Pandas用于数据处理和分析。4.Matplotlib用于生成各种图表和可视化结果。

See all articles