Skip to content

asyncio

Mental model

  • asyncio is a single-threaded, cooperative concurrency framework.
  • Concurrency happens when tasks voluntarily yield control to the event loop (typically at await points).
  • The event loop is a scheduler + I/O multiplexer:
    • runs ready callbacks / tasks
    • watches sockets/pipes/subprocesses via OS mechanisms (epoll/kqueue/IOCP)
    • resumes tasks when awaited operations complete
  • Key separation:
    • Concurrency (many things in progress) vs Parallelism (many things at once on multiple cores).
    • asyncio gives concurrency; CPU-bound work needs threads/processes if you want parallelism.

What actually runs?

  • A coroutine function: async def f(...): ...
  • Calling it returns a coroutine object: coro = f()
  • A coroutine object does nothing until:
    • await coro inside another coroutine, or
    • asyncio.create_task(coro) schedules it, or
    • asyncio.run(coro) creates an event loop and drives it to completion

Yield points (why await matters)

  • The event loop can switch tasks at:
    • await (most common)
    • asyncio.sleep(0) (explicit cooperative yield)
    • awaiting futures/tasks/I/O operations
  • If you write an async def that never awaits, it will behave like synchronous code and block the loop.

Core primitives

Event loop

  • The loop owns:
    • a queue of ready callbacks/tasks
    • timers (scheduled future callbacks)
    • I/O watchers (file descriptor readiness)
  • You rarely interact with the loop directly in application code.
  • In libraries, you may accept a loop implicitly by using asyncio.get_running_loop().

Tasks vs Futures

Future

A Future is a passive result container.

  • Represents an eventual result, exception, or cancellation
  • Has no execution logic
  • Must be completed by external code
  • Awaiting a Future suspends the current task until it completes

Mental model:

“A promise that someone else will fulfill.”

Key points:

  • States: pending → finished (result/exception) or cancelled
  • Usually created and completed by the event loop or low-level libraries
  • Rarely created manually in application code

Task

A Task is an active Future that runs a coroutine.

  • Wraps a coroutine object
  • Schedules it on the event loop
  • Advances the coroutine at each await
  • Stores the final result or exception in itself
  • Is a subclass of Future

Mental model:

“A Future that knows how to execute a coroutine.”

Lifecycle:

  1. Created from a coroutine (create_task)
  2. Driven by the event loop
  3. Completes when the coroutine returns, raises, or is cancelled

Awaiting

  • await task and await future work the same way
  • await only cares about the Future interface
  • The difference is who completes it:
    • Future → external code
    • Task → the coroutine itself

Practical implications:

  • Create tasks for concurrent work: task = asyncio.create_task(coro())
  • Prefer structured patterns that ensure tasks are awaited/cancelled (asyncio.TaskGroup).

Cancellation

  • Cancellation is cooperative.
  • task.cancel() schedules a CancelledError to be raised at the task’s next await point.
  • If a task never hits an await, cancellation may be delayed indefinitely.
  • Cancellation is contagious only if you propagate it (e.g., await a cancelled task).

Best practice:

  • Treat CancelledError as control flow, not an “error to swallow”.
  • Clean up resources in finally blocks.

Timeouts

  • Timeouts are typically implemented by cancellation under the hood.
  • asyncio.wait_for(aw, timeout=...) cancels aw on timeout (then raises TimeoutError).
  • Prefer asyncio.timeout(...) (Python 3.11+) for clearer scoping.

Synchronization

  • asyncio.Lock: mutual exclusion around critical sections that might await.

    • Use when multiple coroutines access shared mutable state and the critical section contains await, making it unsafe to rely on normal sequential execution.
    • Only one coroutine can hold the lock at a time; others wait cooperatively.
    import asyncio
    balance = 0
    lock = asyncio.Lock()
    async def deposit(amount: int):
    global balance
    async with lock:
    current = balance
    await asyncio.sleep(0) # simulate async work
    balance = current + amount
  • asyncio.Event: await a signal.

    • Use when one or more coroutines must wait until some condition becomes true.
    • Once set, all current and future waiters proceed until the event is cleared.
      import asyncio
      ready = asyncio.Event()
      async def worker():
      await ready.wait()
      print("started")
      async def main():
      asyncio.create_task(worker())
      await asyncio.sleep(1)
      ready.set()
  • asyncio.Condition: coordinated waiting/notification.

    • Use when coroutines must wait for specific state changes, not just a single signal.
    • Combines a lock with wait/notify semantics and requires explicit condition checks.
      import asyncio
      condition = asyncio.Condition()
      items: list[int] = []
      async def consumer():
      async with condition:
      await condition.wait_for(lambda: items)
      item = items.pop()
      print(item)
      async def producer():
      async with condition:
      items.append(1)
      condition.notify()
  • asyncio.Queue: async producer/consumer pattern.

    • Use for safe communication between producers and consumers with built-in backpressure.
    • Handles locking, waiting, and notification internally.
      import asyncio
      async def producer(q: asyncio.Queue[int]):
      await q.put(1)
      async def consumer(q: asyncio.Queue[int]):
      item = await q.get()
      q.task_done()
      return item
      async def main():
      q = asyncio.Queue()
      await producer(q)
      result = await consumer(q)
      print(result)

Rule of thumb:

  • Use async primitives for code running in the event loop.
  • Use threading.* primitives for threads; don’t mix them casually.

Scheduling patterns

Running coroutines concurrently

  • create_task + await later is the building block.
  • asyncio.gather(*aws) awaits many awaitables concurrently.
    • By default, if one fails, it cancels the others (behavior nuances depend on Python version and parameters).
    • Consider return_exceptions=True only when you really mean “collect failures as values”.

Waiting for first completion

  • asyncio.wait(aws, return_when=FIRST_COMPLETED) gives you two sets: done, pending.
  • You must decide what to do with pending (cancel/await/etc).
  • A common trap is leaking pending tasks.

Structured concurrency (TaskGroup)

  • asyncio.TaskGroup (Python 3.11+) provides structured concurrency:
    • tasks started inside the group are awaited automatically
    • failure semantics are clearer and safer
    • avoids “forgotten task” leaks

Prefer:

  • TaskGroup (3.11+) over raw create_task + manual bookkeeping, when possible.

I/O vs CPU-bound work

I/O-bound: good fit

  • Many sockets, HTTP calls, DB calls (if driver is async), file ops (limited), subprocess.
  • Async shines when tasks spend most time waiting.

CPU-bound: not a good fit (by default)

  • CPU-bound work blocks the event loop and starves other tasks.

Options:

  • asyncio.to_thread(fn, *args) for blocking sync functions (Python 3.9+)
  • loop.run_in_executor(...) for custom executors
  • multiprocessing / process pools for real parallelism

Rule:

  • If it pegs a CPU core, move it out of the loop.

Practical examples

Concurrent HTTP-style work (pattern-only)

This uses a placeholder fetch() to emphasize structure (swap in your actual async client):

import asyncio
from dataclasses import dataclass
@dataclass
class Result:
url: str
status: int
body: bytes
async def fetch(url: str) -> Result:
# placeholder for a real async HTTP client call
await asyncio.sleep(0.05)
return Result(url=url, status=200, body=b"ok")
async def main(urls: list[str]) -> list[Result]:
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(fetch(u)) for u in urls]
# after TaskGroup exits, all tasks finished or raised
return [t.result() for t in tasks]
if __name__ == "__main__":
results = asyncio.run(main(["https://a", "https://b", "https://c"]))
print([r.status for r in results])

Key points:

  • TaskGroup ensures tasks do not leak.

Failures propagate cleanly.

Bounded concurrency with a semaphore

import asyncio
async def worker(item: int) -> int:
await asyncio.sleep(0.05)
return item * 2
async def bounded_map(items: list[int], limit: int) -> list[int]:
sem = asyncio.Semaphore(limit)
async def run_one(x: int) -> int:
async with sem:
return await worker(x)
return await asyncio.gather(*(run_one(x) for x in items))
async def main():
out = await bounded_map(list(range(10)), limit=3)
print(out)
asyncio.run(main())

Pitfall avoided:

  • launching thousands of tasks at once without backpressure.

Producer / consumer with asyncio.Queue

import asyncio
async def producer(q: asyncio.Queue[int], n: int) -> None:
for i in range(n):
await q.put(i)
await q.put(-1) # sentinel
async def consumer(q: asyncio.Queue[int]) -> list[int]:
out: list[int] = []
while True:
item = await q.get()
try:
if item == -1:
return out
out.append(item * 10)
finally:
q.task_done()
async def main():
q: asyncio.Queue[int] = asyncio.Queue()
prod = asyncio.create_task(producer(q, 5))
cons = asyncio.create_task(consumer(q))
await prod
await q.join() # wait until all tasks marked done
result = await cons
print(result)
asyncio.run(main())

Notes:

  • task_done() must be called exactly once per get(), typically via finally.

Handling timeouts and cancellation safely

import asyncio
async def long_op() -> str:
try:
await asyncio.sleep(10)
return "done"
finally:
# ensure cleanup happens even if cancelled
pass
async def main():
try:
return await asyncio.wait_for(long_op(), timeout=0.2)
except asyncio.TimeoutError:
return "timed out"
print(asyncio.run(main()))

Key point:

  • wait_for cancels the underlying awaitable on timeout.

Best practices

  • Prefer structured concurrency (asyncio.TaskGroup) over ad-hoc task spawning.
  • Keep event-loop code non-blocking:
    • no CPU-heavy loops
    • no blocking I/O (use async drivers or offload with to_thread)
  • Add backpressure:
    • semaphore, queue, or bounded worker pools
  • Use timeouts for external I/O boundaries.
  • Always ensure tasks are:
    • awaited, or
    • cancelled and awaited (to suppress warnings and ensure cleanup)
  • Make cancellation safe:
    • use try/finally around resource acquisition
    • do not swallow CancelledError unless you re-raise it
  • Be explicit about failure semantics:
    • gather vs wait vs TaskGroup
  • Prefer asyncio.get_running_loop() in async contexts (not get_event_loop()).

Common pitfalls

  • “It’s async so it’s fast”: async only helps if you are I/O-bound and yielding.
  • Blocking the loop:
    • time.sleep(...), heavy JSON parsing in a tight loop, large CPU work.
  • Forgetting to await:
    • coroutine created but never awaited -> runtime warnings and missing work.
  • Task leaks:
    • create_task() without awaiting/cancelling later.
  • Misusing gather:
    • assuming it automatically cancels/cleans up everything the way you want.
  • Cancellation swallowing:
    • broad except Exception: that accidentally catches CancelledError in some versions/patterns.
  • Unbounded concurrency:
    • gather(*(fetch(u) for u in huge_list)) can blow memory and overwhelm dependencies.
  • Shared mutable state:
    • async code can interleave at await points; treat it like concurrent code.

Pytest testing tips for async code

Use pytest-asyncio

  • Install and enable async tests via the plugin.
  • Mark async tests with @pytest.mark.asyncio (or configure auto mode).
  • Prefer asyncio-native patterns rather than spinning threads in tests.

Example:

import asyncio
import pytest
async def f(x: int) -> int:
await asyncio.sleep(0)
return x + 1
@pytest.mark.asyncio
async def test_f():
assert await f(1) == 2

Testing concurrency behavior

Test that tasks truly run concurrently by controlling scheduling:

  • Use asyncio.Event to coordinate ordering.
  • Use a short asyncio.sleep(0) to allow the loop to switch tasks deterministically.
import asyncio
import pytest
@pytest.mark.asyncio
async def test_concurrent_start():
started = asyncio.Event()
proceed = asyncio.Event()
async def a():
started.set()
await proceed.wait()
return 1
async def b():
await started.wait()
proceed.set()
return 2
async with asyncio.TaskGroup() as tg:
ta = tg.create_task(a())
tb = tg.create_task(b())
assert (ta.result(), tb.result()) == (1, 2)

Testing timeouts reliably

  • Avoid very tight timeouts on slow CI.
  • Use generous margins, or simulate time progression if your code supports injecting a clock.
  • If you must use real timeouts, keep them stable (e.g., 200ms rather than 5ms).
import asyncio
import pytest
@pytest.mark.asyncio
async def test_timeout():
async def never():
await asyncio.Event().wait()
with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(never(), timeout=0.2)

Monkeypatching async dependencies

  • If you replace an async function, make the replacement async def.
  • If you replace an object method used with await, ensure it returns an awaitable.
import pytest
class Client:
async def get(self) -> int:
return 1
async def uses_client(c: Client) -> int:
return await c.get()
@pytest.mark.asyncio
async def test_monkeypatch(monkeypatch):
c = Client()
async def fake_get():
return 42
monkeypatch.setattr(c, "get", fake_get)
assert await uses_client(c) == 42

Avoid nested event loops

  • Do not call asyncio.run() inside tests that already run in an event loop.
  • Use await directly in async tests.

Cleanup of background tasks

If your code spawns background tasks, tests must ensure they are stopped:

  • Provide a shutdown hook in your app/service API
  • In tests, call shutdown and await it
  • If you must cancel tasks:
    • task.cancel()
    • with contextlib.suppress(asyncio.CancelledError): await task

Common Traps

  • Confusing coroutine objects vs tasks vs futures.
  • Assuming preemption (there is none; yielding is explicit).
  • Not understanding cancellation semantics (cancel is a request, delivered at await points).
  • Treating async code as “thread-safe by default” (it is not).
  • Forgetting failure propagation and cleanup when multiple tasks run concurrently.