AgentSkillsCN

Async Python

Async Python

SKILL.md

Async Python Skill

Master async/await for high-performance Python applications

🎯 FUNDAMENTALS

Basic Async Function

python
import asyncio

async def fetch_data(url: str) -> dict:
    """Async function (coroutine)."""
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.json()

# Call from sync code
result = asyncio.run(fetch_data("https://api.example.com"))

Await Rules

python
# ✅ Can await inside async function
async def process():
    result = await fetch_data(url)  # ✅ OK
    
# ❌ Cannot await in sync function
def process():
    result = await fetch_data(url)  # ❌ SyntaxError

🔄 RUNNING TASKS

asyncio.gather() - Run Concurrently

python
async def main():
    # Run all at once, wait for all to complete
    results = await asyncio.gather(
        fetch_user(1),
        fetch_user(2),
        fetch_user(3),
    )
    # results = [user1, user2, user3]
    
    # With error handling
    results = await asyncio.gather(
        *tasks,
        return_exceptions=True  # Don't fail on first exception
    )
    for result in results:
        if isinstance(result, Exception):
            print(f"Task failed: {result}")

asyncio.create_task() - Fire and Forget

python
async def main():
    # Start task immediately, don't wait
    task = asyncio.create_task(background_job())
    
    # Do other work...
    await do_other_stuff()
    
    # Wait for task when needed
    result = await task

asyncio.wait() - With Conditions

python
async def main():
    tasks = [asyncio.create_task(job(i)) for i in range(10)]
    
    # Wait for first to complete
    done, pending = await asyncio.wait(
        tasks,
        return_when=asyncio.FIRST_COMPLETED
    )
    
    # Cancel remaining
    for task in pending:
        task.cancel()

TaskGroup (3.11+)

python
async def main():
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(fetch_user(1))
        task2 = tg.create_task(fetch_user(2))
    
    # Both complete when exiting context
    # Exceptions propagate properly

⏱️ TIMEOUTS

python
async def fetch_with_timeout():
    try:
        # Timeout after 5 seconds
        result = await asyncio.wait_for(
            slow_operation(),
            timeout=5.0
        )
    except asyncio.TimeoutError:
        print("Operation timed out")

# Context manager timeout (3.11+)
async def with_timeout():
    async with asyncio.timeout(5.0):
        await slow_operation()

📋 ASYNC CONTEXT MANAGERS

python
from contextlib import asynccontextmanager

@asynccontextmanager
async def database_connection():
    conn = await create_connection()
    try:
        yield conn
    finally:
        await conn.close()

# Usage
async def process():
    async with database_connection() as conn:
        await conn.execute(query)

Class-based

python
class AsyncResource:
    async def __aenter__(self):
        self.resource = await acquire_resource()
        return self.resource
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.resource.release()
        return False

🔄 ASYNC GENERATORS

python
async def fetch_pages(url: str):
    """Async generator - yields values lazily."""
    page = 1
    while True:
        data = await fetch(f"{url}?page={page}")
        if not data:
            break
        yield data
        page += 1

# Consume async generator
async def process_all():
    async for page_data in fetch_pages(url):
        process(page_data)

🔐 SYNCHRONIZATION

Lock

python
lock = asyncio.Lock()

async def safe_update():
    async with lock:
        # Only one coroutine here at a time
        await update_shared_resource()

Semaphore - Rate Limiting

python
semaphore = asyncio.Semaphore(10)  # Max 10 concurrent

async def limited_fetch(url: str):
    async with semaphore:
        return await fetch(url)

# Process 1000 URLs, max 10 at a time
async def process_urls(urls: list[str]):
    tasks = [limited_fetch(url) for url in urls]
    return await asyncio.gather(*tasks)

Event

python
event = asyncio.Event()

async def waiter():
    await event.wait()  # Blocks until event is set
    print("Event received!")

async def setter():
    await asyncio.sleep(1)
    event.set()  # Triggers all waiters

⚠️ COMMON PITFALLS

❌ Blocking the Event Loop

python
# ❌ BAD - Blocks entire loop
async def bad_example():
    time.sleep(5)  # Blocking call!
    result = requests.get(url)  # Blocking!

# ✅ GOOD - Use async alternatives
async def good_example():
    await asyncio.sleep(5)
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            result = await response.json()

# ✅ GOOD - Run blocking in thread pool
async def with_blocking():
    loop = asyncio.get_event_loop()
    result = await loop.run_in_executor(
        None,  # Use default ThreadPoolExecutor
        blocking_function,
        arg1, arg2
    )

❌ Forgetting to Await

python
# ❌ BAD - Coroutine never runs!
async def bad():
    fetch_data(url)  # Returns coroutine, not result!

# ✅ GOOD
async def good():
    result = await fetch_data(url)

❌ Creating Tasks Without Storing Reference

python
# ❌ BAD - Task may be garbage collected
async def bad():
    asyncio.create_task(background_job())
    # Task reference lost!

# ✅ GOOD - Keep reference
async def good():
    task = asyncio.create_task(background_job())
    background_tasks.add(task)
    task.add_done_callback(background_tasks.discard)

🔧 PATTERNS

Async Retry

python
async def retry_async(
    coro_func,
    max_attempts: int = 3,
    delay: float = 1.0
):
    for attempt in range(max_attempts):
        try:
            return await coro_func()
        except Exception as e:
            if attempt == max_attempts - 1:
                raise
            await asyncio.sleep(delay * (2 ** attempt))

Producer-Consumer with Queue

python
async def producer(queue: asyncio.Queue):
    for item in items:
        await queue.put(item)
    await queue.put(None)  # Signal done

async def consumer(queue: asyncio.Queue):
    while True:
        item = await queue.get()
        if item is None:
            break
        await process(item)
        queue.task_done()

async def main():
    queue = asyncio.Queue(maxsize=100)
    await asyncio.gather(
        producer(queue),
        consumer(queue),
        consumer(queue),  # Multiple consumers
    )

Async Context Manager Pool

python
class ConnectionPool:
    def __init__(self, max_size: int = 10):
        self.semaphore = asyncio.Semaphore(max_size)
        self.connections: list = []
    
    async def acquire(self):
        await self.semaphore.acquire()
        if self.connections:
            return self.connections.pop()
        return await create_connection()
    
    async def release(self, conn):
        self.connections.append(conn)
        self.semaphore.release()

📎 QUICK REFERENCE

FunctionUse Case
asyncio.run()Entry point from sync code
awaitWait for coroutine result
asyncio.gather()Run multiple concurrently
asyncio.create_task()Start task immediately
asyncio.wait_for()Add timeout
asyncio.sleep()Async delay
asyncio.QueueProducer-consumer pattern
asyncio.LockMutual exclusion
asyncio.SemaphoreRate limiting

🔗 RELATED SKILLS

Async Python - AgentSkills CN | 技能图谱