← Back to Python

All Topics

Advertisement

Learn/Python/Python Advanced

Structured Concurrency

Topic: Asynchronous

Advertisement

Introduction

Structured concurrency provides a way to manage groups of related async tasks, ensuring they complete or fail together. The TaskGroup pattern ensures proper cleanup.

TaskGroup (Python 3.11+)

import asyncio

async def fetch_data(id):
    await asyncio.sleep(id * 0.5)
    return f"Data {id}"

async def main():
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(fetch_data(1))
        task2 = tg.create_task(fetch_data(2))
        task3 = tg.create_task(fetch_data(3))
    
    print(task1.result())
    print(task2.result())
    print(task3.result())

asyncio.run(main())

Handling Exceptions

import asyncio

async def risky_task(n):
    if n == 2:
        raise ValueError(f"Error in task {n}")
    await asyncio.sleep(n)
    return f"Task {n} done"

async def main():
    try:
        async with asyncio.TaskGroup() as tg:
            tg.create_task(risky_task(1))
            tg.create_task(risky_task(2))  # This will fail
            tg.create_task(risky_task(3))
    except* ValueError as eg:
        for exc in eg.exceptions:
            print(f"Caught: {exc}")

Nursery Pattern

async def nursery(tasks):
    async with asyncio.TaskGroup() as tg:
        for task in tasks:
            tg.create_task(task)

async def worker(n):
    print(f"Worker {n} started")
    await asyncio.sleep(n)
    print(f"Worker {n} done")

async def main():
    await nursery([worker(1), worker(2), worker(3)])

Spawn and Wait

async def spawn_and_wait(tasks):
    results = []
    async with asyncio.TaskGroup() as tg:
        async def run_with_result(task, idx):
            result = await task
            results.append((idx, result))
        
        for idx, task in enumerate(tasks):
            tg.create_task(run_with_result(task, idx))
    return results

async def main():
    tasks = [asyncio.sleep(i, f"Result {i}") for i in range(3)]
    results = await spawn_and_wait(tasks)
    print(results)

Timeout with TaskGroup

import asyncio

async def slow_task():
    await asyncio.sleep(10)
    return "Done"

async def main():
    try:
        async with asyncio.TaskGroup() as tg:
            tg.create_task(slow_task())
    except asyncio.TimeoutError:
        print("Tasks timed out")

Practice Problems

  1. Use TaskGroup for parallel tasks
  2. Handle multiple exceptions with except*
  3. Implement nursery pattern
  4. Add timeout to TaskGroup
  5. Create task tracking system

Advertisement

Advertisement

Need More Practice?

Get personalized Python help from ChatWhole's AI-powered platform.

Get Expert Help →