asyncio
Mental model
asynciois a single-threaded, cooperative concurrency framework.- Concurrency happens when tasks voluntarily yield control to the event loop (typically at
awaitpoints). - The event loop is a scheduler + I/O multiplexer:
- runs ready callbacks / tasks
- watches sockets/pipes/subprocesses via OS mechanisms (epoll/kqueue/IOCP)
- resumes tasks when awaited operations complete
- Key separation:
- Concurrency (many things in progress) vs Parallelism (many things at once on multiple cores).
asynciogives concurrency; CPU-bound work needs threads/processes if you want parallelism.
What actually runs?
- A coroutine function:
async def f(...): ... - Calling it returns a coroutine object:
coro = f() - A coroutine object does nothing until:
await coroinside another coroutine, orasyncio.create_task(coro)schedules it, orasyncio.run(coro)creates an event loop and drives it to completion
Yield points (why await matters)
- The event loop can switch tasks at:
await(most common)asyncio.sleep(0)(explicit cooperative yield)- awaiting futures/tasks/I/O operations
- If you write an
async defthat never awaits, it will behave like synchronous code and block the loop.
Core primitives
Event loop
- The loop owns:
- a queue of ready callbacks/tasks
- timers (scheduled future callbacks)
- I/O watchers (file descriptor readiness)
- You rarely interact with the loop directly in application code.
- In libraries, you may accept a loop implicitly by using
asyncio.get_running_loop().
Tasks vs Futures
Future
A Future is a passive result container.
- Represents an eventual result, exception, or cancellation
- Has no execution logic
- Must be completed by external code
- Awaiting a Future suspends the current task until it completes
Mental model:
“A promise that someone else will fulfill.”
Key points:
- States: pending → finished (result/exception) or cancelled
- Usually created and completed by the event loop or low-level libraries
- Rarely created manually in application code
Task
A Task is an active Future that runs a coroutine.
- Wraps a coroutine object
- Schedules it on the event loop
- Advances the coroutine at each
await - Stores the final result or exception in itself
- Is a subclass of Future
Mental model:
“A Future that knows how to execute a coroutine.”
Lifecycle:
- Created from a coroutine (
create_task) - Driven by the event loop
- Completes when the coroutine returns, raises, or is cancelled
Awaiting
await taskandawait futurework the same wayawaitonly cares about the Future interface- The difference is who completes it:
- Future → external code
- Task → the coroutine itself
Practical implications:
- Create tasks for concurrent work:
task = asyncio.create_task(coro()) - Prefer structured patterns that ensure tasks are awaited/cancelled (
asyncio.TaskGroup).
Cancellation
- Cancellation is cooperative.
task.cancel()schedules aCancelledErrorto be raised at the task’s next await point.- If a task never hits an await, cancellation may be delayed indefinitely.
- Cancellation is contagious only if you propagate it (e.g.,
awaita cancelled task).
Best practice:
- Treat
CancelledErroras control flow, not an “error to swallow”. - Clean up resources in
finallyblocks.
Timeouts
- Timeouts are typically implemented by cancellation under the hood.
asyncio.wait_for(aw, timeout=...)cancelsawon timeout (then raisesTimeoutError).- Prefer
asyncio.timeout(...)(Python 3.11+) for clearer scoping.
Synchronization
-
asyncio.Lock: mutual exclusion around critical sections that mightawait.- Use when multiple coroutines access shared mutable state and the critical section contains
await, making it unsafe to rely on normal sequential execution. - Only one coroutine can hold the lock at a time; others wait cooperatively.
import asynciobalance = 0lock = asyncio.Lock()async def deposit(amount: int):global balanceasync with lock:current = balanceawait asyncio.sleep(0) # simulate async workbalance = current + amount - Use when multiple coroutines access shared mutable state and the critical section contains
-
asyncio.Event: await a signal.- Use when one or more coroutines must wait until some condition becomes true.
- Once set, all current and future waiters proceed until the event is cleared.
import asyncioready = asyncio.Event()async def worker():await ready.wait()print("started")async def main():asyncio.create_task(worker())await asyncio.sleep(1)ready.set()
-
asyncio.Condition: coordinated waiting/notification.- Use when coroutines must wait for specific state changes, not just a single signal.
- Combines a lock with wait/notify semantics and requires explicit condition checks.
import asynciocondition = asyncio.Condition()items: list[int] = []async def consumer():async with condition:await condition.wait_for(lambda: items)item = items.pop()print(item)async def producer():async with condition:items.append(1)condition.notify()
-
asyncio.Queue: async producer/consumer pattern.- Use for safe communication between producers and consumers with built-in backpressure.
- Handles locking, waiting, and notification internally.
import asyncioasync def producer(q: asyncio.Queue[int]):await q.put(1)async def consumer(q: asyncio.Queue[int]):item = await q.get()q.task_done()return itemasync def main():q = asyncio.Queue()await producer(q)result = await consumer(q)print(result)
Rule of thumb:
- Use async primitives for code running in the event loop.
- Use
threading.*primitives for threads; don’t mix them casually.
Scheduling patterns
Running coroutines concurrently
create_task+awaitlater is the building block.asyncio.gather(*aws)awaits many awaitables concurrently.- By default, if one fails, it cancels the others (behavior nuances depend on Python version and parameters).
- Consider
return_exceptions=Trueonly when you really mean “collect failures as values”.
Waiting for first completion
asyncio.wait(aws, return_when=FIRST_COMPLETED)gives you two sets:done,pending.- You must decide what to do with
pending(cancel/await/etc). - A common trap is leaking pending tasks.
Structured concurrency (TaskGroup)
asyncio.TaskGroup(Python 3.11+) provides structured concurrency:- tasks started inside the group are awaited automatically
- failure semantics are clearer and safer
- avoids “forgotten task” leaks
Prefer:
TaskGroup(3.11+) over rawcreate_task+ manual bookkeeping, when possible.
I/O vs CPU-bound work
I/O-bound: good fit
- Many sockets, HTTP calls, DB calls (if driver is async), file ops (limited), subprocess.
- Async shines when tasks spend most time waiting.
CPU-bound: not a good fit (by default)
- CPU-bound work blocks the event loop and starves other tasks.
Options:
asyncio.to_thread(fn, *args)for blocking sync functions (Python 3.9+)loop.run_in_executor(...)for custom executorsmultiprocessing/ process pools for real parallelism
Rule:
- If it pegs a CPU core, move it out of the loop.
Practical examples
Concurrent HTTP-style work (pattern-only)
This uses a placeholder fetch() to emphasize structure (swap in your actual async client):
import asynciofrom dataclasses import dataclass
@dataclassclass Result: url: str status: int body: bytes
async def fetch(url: str) -> Result: # placeholder for a real async HTTP client call await asyncio.sleep(0.05) return Result(url=url, status=200, body=b"ok")
async def main(urls: list[str]) -> list[Result]: async with asyncio.TaskGroup() as tg: tasks = [tg.create_task(fetch(u)) for u in urls]
# after TaskGroup exits, all tasks finished or raised return [t.result() for t in tasks]
if __name__ == "__main__": results = asyncio.run(main(["https://a", "https://b", "https://c"])) print([r.status for r in results])Key points:
TaskGroupensures tasks do not leak.
Failures propagate cleanly.
Bounded concurrency with a semaphore
import asyncio
async def worker(item: int) -> int: await asyncio.sleep(0.05) return item * 2
async def bounded_map(items: list[int], limit: int) -> list[int]: sem = asyncio.Semaphore(limit)
async def run_one(x: int) -> int: async with sem: return await worker(x)
return await asyncio.gather(*(run_one(x) for x in items))
async def main(): out = await bounded_map(list(range(10)), limit=3) print(out)
asyncio.run(main())Pitfall avoided:
- launching thousands of tasks at once without backpressure.
Producer / consumer with asyncio.Queue
import asyncio
async def producer(q: asyncio.Queue[int], n: int) -> None: for i in range(n): await q.put(i) await q.put(-1) # sentinel
async def consumer(q: asyncio.Queue[int]) -> list[int]: out: list[int] = [] while True: item = await q.get() try: if item == -1: return out out.append(item * 10) finally: q.task_done()
async def main(): q: asyncio.Queue[int] = asyncio.Queue() prod = asyncio.create_task(producer(q, 5)) cons = asyncio.create_task(consumer(q))
await prod await q.join() # wait until all tasks marked done result = await cons print(result)
asyncio.run(main())Notes:
task_done()must be called exactly once perget(), typically viafinally.
Handling timeouts and cancellation safely
import asyncio
async def long_op() -> str: try: await asyncio.sleep(10) return "done" finally: # ensure cleanup happens even if cancelled pass
async def main(): try: return await asyncio.wait_for(long_op(), timeout=0.2) except asyncio.TimeoutError: return "timed out"
print(asyncio.run(main()))Key point:
wait_forcancels the underlying awaitable on timeout.
Best practices
- Prefer structured concurrency (
asyncio.TaskGroup) over ad-hoc task spawning. - Keep event-loop code non-blocking:
- no CPU-heavy loops
- no blocking I/O (use async drivers or offload with
to_thread)
- Add backpressure:
- semaphore, queue, or bounded worker pools
- Use timeouts for external I/O boundaries.
- Always ensure tasks are:
- awaited, or
- cancelled and awaited (to suppress warnings and ensure cleanup)
- Make cancellation safe:
- use
try/finallyaround resource acquisition - do not swallow
CancelledErrorunless you re-raise it
- use
- Be explicit about failure semantics:
gathervswaitvsTaskGroup
- Prefer
asyncio.get_running_loop()in async contexts (notget_event_loop()).
Common pitfalls
- “It’s async so it’s fast”: async only helps if you are I/O-bound and yielding.
- Blocking the loop:
time.sleep(...), heavy JSON parsing in a tight loop, large CPU work.
- Forgetting to await:
- coroutine created but never awaited -> runtime warnings and missing work.
- Task leaks:
create_task()without awaiting/cancelling later.
- Misusing
gather:- assuming it automatically cancels/cleans up everything the way you want.
- Cancellation swallowing:
- broad
except Exception:that accidentally catchesCancelledErrorin some versions/patterns.
- broad
- Unbounded concurrency:
gather(*(fetch(u) for u in huge_list))can blow memory and overwhelm dependencies.
- Shared mutable state:
- async code can interleave at await points; treat it like concurrent code.
Pytest testing tips for async code
Use pytest-asyncio
- Install and enable async tests via the plugin.
- Mark async tests with @pytest.mark.asyncio (or configure auto mode).
- Prefer asyncio-native patterns rather than spinning threads in tests.
Example:
import asyncioimport pytest
async def f(x: int) -> int: await asyncio.sleep(0) return x + 1
@pytest.mark.asyncioasync def test_f(): assert await f(1) == 2Testing concurrency behavior
Test that tasks truly run concurrently by controlling scheduling:
- Use asyncio.Event to coordinate ordering.
- Use a short asyncio.sleep(0) to allow the loop to switch tasks deterministically.
import asyncioimport pytest
@pytest.mark.asyncioasync def test_concurrent_start(): started = asyncio.Event() proceed = asyncio.Event()
async def a(): started.set() await proceed.wait() return 1
async def b(): await started.wait() proceed.set() return 2
async with asyncio.TaskGroup() as tg: ta = tg.create_task(a()) tb = tg.create_task(b())
assert (ta.result(), tb.result()) == (1, 2)Testing timeouts reliably
- Avoid very tight timeouts on slow CI.
- Use generous margins, or simulate time progression if your code supports injecting a clock.
- If you must use real timeouts, keep them stable (e.g., 200ms rather than 5ms).
import asyncioimport pytest
@pytest.mark.asyncioasync def test_timeout(): async def never(): await asyncio.Event().wait()
with pytest.raises(asyncio.TimeoutError): await asyncio.wait_for(never(), timeout=0.2)Monkeypatching async dependencies
- If you replace an async function, make the replacement async def.
- If you replace an object method used with await, ensure it returns an awaitable.
import pytest
class Client: async def get(self) -> int: return 1
async def uses_client(c: Client) -> int: return await c.get()
@pytest.mark.asyncioasync def test_monkeypatch(monkeypatch): c = Client()
async def fake_get(): return 42
monkeypatch.setattr(c, "get", fake_get) assert await uses_client(c) == 42Avoid nested event loops
- Do not call
asyncio.run()inside tests that already run in an event loop. - Use
awaitdirectly in async tests.
Cleanup of background tasks
If your code spawns background tasks, tests must ensure they are stopped:
- Provide a shutdown hook in your app/service API
- In tests, call shutdown and await it
- If you must cancel tasks:
task.cancel()with contextlib.suppress(asyncio.CancelledError): await task
Common Traps
- Confusing coroutine objects vs tasks vs futures.
- Assuming preemption (there is none; yielding is explicit).
- Not understanding cancellation semantics (cancel is a request, delivered at await points).
- Treating async code as “thread-safe by default” (it is not).
- Forgetting failure propagation and cleanup when multiple tasks run concurrently.