Introduction

Python asyncio event loop errors occur when asynchronous code mismanages coroutines, blocks the event loop, or mishandles task lifecycle, causing applications to hang, crash, or behave unpredictably. The asyncio library provides infrastructure for writing concurrent code using the async/await syntax, managing an event loop that schedules and executes coroutines. Errors manifest at multiple layers: coroutine not awaited properly, event loop closed prematurely, blocking calls freezing the event loop, task cancellation not handled, race conditions in shared state, deadlock from improper lock usage, or event loop policy mismatches. Common causes include calling async functions without await, using synchronous blocking I/O in async context, not awaiting task completion before loop close, cancelling tasks without proper cleanup, accessing shared state without locks, nested event loops conflicting (asyncio.run inside running loop), thread safety violations calling async code from threads, improper timeout handling causing resource leaks, and mixing asyncio with threading/multiprocessing incorrectly. The fix requires understanding the async/await paradigm, event loop mechanics, Task scheduling, cancellation propagation, and proper async patterns for I/O operations. This guide provides production-proven debugging patterns for asyncio applications across web frameworks (FastAPI, aiohttp, Tornado), background task processors, and microservices.

Symptoms

  • RuntimeError: This event loop is already running
  • RuntimeError: Event loop is closed
  • RuntimeError: await wasn't used on future
  • Application hangs indefinitely on async operation
  • CancelledError not caught causing task failure
  • asyncio.exceptions.TimeoutError not handled
  • Blocking call freezes entire application
  • Task exception was never retrieved warnings
  • Coroutines not executing in expected order
  • Race conditions in shared async state
  • Deadlock when multiple async locks contended
  • Memory leak from unbounded task creation
  • get_event_loop returns wrong loop in nested calls
  • Async generator not cleaned up properly

Common Causes

  • Calling async function without await keyword
  • Blocking I/O (time.sleep, requests) in async context
  • Task not awaited before event loop close
  • Cancellation not handled in task cleanup
  • Shared state accessed without asyncio.Lock
  • Nested event loop conflicts (asyncio.run misuse)
  • Calling async code from synchronous thread
  • Timeout not wrapping async operation properly
  • Async generator not aclosed() before discard
  • Event loop policy not set for async frameworks

Step-by-Step Fix

### 1. Diagnose asyncio errors

Enable asyncio debug mode:

```python # Enable debug mode for detailed asyncio logging import asyncio import logging

logging.basicConfig(level=logging.DEBUG)

# Set debug on event loop asyncio.run(main(), debug=True)

# Or manually set debug loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.set_debug(True) loop.run_until_complete(main())

# Debug output shows: # - Slow coroutines (>100ms execution time) # - Unclosed event loops # - Unclosed transports/protocols # - Exceptions not retrieved from tasks

# Environment variable alternative # PYTHONASYNCIODEBUG=1 python app.py ```

Detect blocking calls:

```python # Use aiofiles instead of sync file I/O # WRONG: Blocking call in async context async def read_file(): with open('data.txt', 'r') as f: # Blocks event loop! return f.read()

# CORRECT: Use asyncio.to_thread for sync operations async def read_file(): return await asyncio.to_thread(open, 'data.txt', 'r')

# Or use async file library import aiofiles

async def read_file(): async with aiofiles.open('data.txt', 'r') as f: return await f.read()

# Detect blocking with asyncio.slow_callback_duration import asyncio

# Set threshold for slow callback warning (default 0.1s) asyncio.get_event_loop().slow_callback_duration = 0.05 # 50ms

# Logs warnings for callbacks exceeding threshold ```

Inspect running tasks:

```python # Get all running tasks tasks = asyncio.all_tasks() print(f"Running tasks: {len(tasks)}")

for task in tasks: print(f"Task: {task.get_name()}") print(f" Done: {task.done()}") print(f" Cancelled: {task.cancelled()}")

if task.done() and not task.cancelled(): try: result = task.result() print(f" Result: {result}") except Exception as e: print(f" Exception: {e}")

# Get current task current = asyncio.current_task() print(f"Current task: {current.get_name()}")

# Monitor tasks periodically async def monitor_tasks(): while True: await asyncio.sleep(5) tasks = asyncio.all_tasks() pending = [t for t in tasks if not t.done()] print(f"Pending tasks: {len(pending)}") ```

### 2. Fix missing await errors

Await all async function calls:

```python # WRONG: Async function called but not awaited async def fetch_data(): response = asyncio.sleep(1) # Returns coroutine, doesn't sleep! return response

# CORRECT: Await the coroutine async def fetch_data(): response = await asyncio.sleep(1) # Actually waits return response

# Common mistake patterns:

# Pattern 1: Async function in list comprehension # WRONG async def fetch_all(): urls = ['url1', 'url2', 'url3'] # Creates list of coroutines, doesn't execute coros = [fetch(url) for url in urls] return coros

# CORRECT async def fetch_all(): urls = ['url1', 'url2', 'url3'] results = await asyncio.gather(*[fetch(url) for url in urls]) return results

# Pattern 2: Async function as callback # WRONG async def setup(): timer = asyncio.create_task(scheduled_task()) # timer is Task, need to await it later

# CORRECT async def setup(): timer = asyncio.create_task(scheduled_task()) await timer # Wait for completion

# Pattern 3: Async generator iteration # WRONG async def process_items(): async for item in async_generator(): # Correct pass

# But this is wrong: items = [x for x in async_generator()] # Doesn't work!

# CORRECT async def process_items(): items = [x async for x in async_generator()] ```

Handle async context managers:

```python # WRONG: Sync context manager with async resource async def get_session(): session = aiohttp.ClientSession() # Never closed! return session

# CORRECT: Async context manager async def get_session(): async with aiohttp.ClientSession() as session: return await session.get('http://example.com')

# Custom async context manager class AsyncResource: async def __aenter__(self): self.resource = await acquire_resource() return self.resource

async def __aexit__(self, exc_type, exc_val, exc_tb): await self.resource.close()

# Usage async def use_resource(): async with AsyncResource() as resource: await resource.do_work() ```

### 3. Fix event loop lifecycle issues

Properly manage event loop lifecycle:

```python # WRONG: Manual loop management prone to errors loop = asyncio.get_event_loop() try: loop.run_until_complete(main()) finally: loop.close()

# This can fail if loop already running or closed

# CORRECT: Use asyncio.run (Python 3.7+) asyncio.run(main())

# asyncio.run handles: # - Creates new event loop # - Sets as current loop # - Runs main coroutine # - Cleans up async generators # - Closes event loop

# For nested event loops (not recommended): # WRONG async def outer(): asyncio.run(inner()) # RuntimeError!

# CORRECT: Use create_task instead async def outer(): task = asyncio.create_task(inner()) await task

# Or use asyncio.create_task for fire-and-forget async def outer(): asyncio.create_task(inner()) # Fire and forget await asyncio.sleep(0) # Yield control ```

Handle loop in threading scenarios:

```python # Running async code from synchronous thread import asyncio import threading

def sync_function(): # WRONG: get_event_loop() fails if no loop in thread loop = asyncio.get_event_loop() loop.run_until_complete(async_task())

# CORRECT: Create new loop for thread def sync_function(): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: loop.run_until_complete(async_task()) finally: loop.close()

# Or use asyncio.run (Python 3.7+) def sync_function(): asyncio.run(async_task())

# Calling async from main thread when loop running in background async def call_from_sync(): loop = asyncio.get_running_loop()

# Schedule coroutine on running loop future = asyncio.run_coroutine_threadsafe(async_task(), loop) result = future.result() # Blocking wait return result

# Better: await the task async def call_from_sync(): task = asyncio.create_task(async_task()) return await task ```

Fix asyncio.run conflicts:

```python # Problem: asyncio.run cannot be called inside running loop # Common in Jupyter notebooks, web frameworks with async support

# WRONG (in Jupyter or FastAPI endpoint) async def handler(): asyncio.run(some_async_function()) # RuntimeError!

# CORRECT: Just await directly async def handler(): await some_async_function()

# For Jupyter notebooks, use nest_asyncio import nest_asyncio nest_asyncio.apply()

# Now asyncio.run works in notebook cells asyncio.run(main())

# Or use ipyasync extension for Jupyter ```

### 4. Fix task cancellation handling

Handle CancelledError properly:

```python # WRONG: Catching and suppressing CancelledError async def worker(): try: while True: await asyncio.sleep(1) except asyncio.CancelledError: pass # BAD: Task appears cancelled but continues!

# CORRECT: Re-raise CancelledError after cleanup async def worker(): try: while True: await do_work() except asyncio.CancelledError: # Perform cleanup await cleanup_resources() # Re-raise to indicate cancellation complete raise

# OR: Use finally for cleanup async def worker(): try: while True: await do_work() finally: # Always runs on cancellation await cleanup_resources() ```

Cancel tasks gracefully:

```python # Create and cancel task task = asyncio.create_task(worker())

# Wait for task to complete with timeout try: await asyncio.wait_for(task, timeout=30.0) except asyncio.TimeoutError: task.cancel() try: await task # Wait for cancellation to complete except asyncio.CancelledError: pass # Expected

# Cancel multiple tasks tasks = [ asyncio.create_task(worker1()), asyncio.create_task(worker2()), asyncio.create_task(worker3()), ]

# Signal cancellation for task in tasks: task.cancel()

# Wait for all to complete cancellation results = await asyncio.gather(*tasks, return_exceptions=True)

for i, result in enumerate(results): if isinstance(result, asyncio.CancelledError): print(f"Task {i} was cancelled") elif isinstance(result, Exception): print(f"Task {i} failed: {result}") ```

Implement shutdown with cancellation:

```python import signal

class GracefulShutdown: def __init__(self): self.tasks = set() self.shutdown_event = asyncio.Event()

def create_task(self, coro): task = asyncio.create_task(coro) self.tasks.add(task) task.add_done_callback(self.tasks.discard) return task

async def shutdown(self): print("Shutting down...") self.shutdown_event.set()

# Cancel all pending tasks for task in self.tasks: task.cancel()

# Wait for tasks to complete with timeout if self.tasks: done, pending = await asyncio.wait( self.tasks, timeout=5.0 )

# Cancel any remaining for task in pending: task.cancel()

# Wait for final cleanup await asyncio.gather(*pending, return_exceptions=True)

# Usage shutdown = GracefulShutdown()

async def main(): task1 = shutdown.create_task(worker1()) task2 = shutdown.create_task(worker2())

# Wait for shutdown signal await shutdown.shutdown_event.wait()

# Run with signal handling loop = asyncio.get_event_loop()

for sig in (signal.SIGTERM, signal.SIGINT): loop.add_signal_handler( sig, lambda: asyncio.create_task(shutdown.shutdown()) )

loop.run_until_complete(main()) ```

### 5. Fix blocking calls in async context

Replace blocking I/O with async alternatives:

```python # Network I/O # WRONG: requests blocks event loop import requests

async def fetch(): response = requests.get('http://example.com') # Blocks! return response.text

# CORRECT: Use aiohttp import aiohttp

async def fetch(): async with aiohttp.ClientSession() as session: async with session.get('http://example.com') as response: return await response.text()

# File I/O # WRONG: open() blocks event loop async def read_file(): with open('data.txt') as f: # Blocks! return f.read()

# CORRECT: Use aiofiles or to_thread import aiofiles

async def read_file(): async with aiofiles.open('data.txt') as f: return await f.read()

# Or for one-off operations async def read_file(): return await asyncio.to_thread( lambda: open('data.txt').read() )

# Sleep # WRONG: time.sleep blocks event loop import time

async def wait(): time.sleep(5) # Blocks entire event loop!

# CORRECT: asyncio.sleep async def wait(): await asyncio.sleep(5)

# Database (example with asyncpg for PostgreSQL) # WRONG: psycopg2 (sync) import psycopg2

async def query(): conn = psycopg2.connect(...) # Blocks! cursor = conn.cursor() cursor.execute("SELECT * FROM users") return cursor.fetchall()

# CORRECT: asyncpg import asyncpg

async def query(): conn = await asyncpg.connect(...) rows = await conn.fetch("SELECT * FROM users") await conn.close() return rows ```

Run blocking code in executor:

```python # For operations that can't be made async import asyncio from concurrent.futures import ThreadPoolExecutor

executor = ThreadPoolExecutor(max_workers=4)

async def run_blocking(): loop = asyncio.get_event_loop()

# Run CPU-bound function in process pool result = await loop.run_in_executor( None, # Default executor cpu_intensive_function, arg1, arg2 )

# Run I/O-bound function in thread pool result = await loop.run_in_executor( executor, io_function, arg1 )

return result

# Or use asyncio.to_thread (Python 3.9+) async def run_blocking(): result = await asyncio.to_thread(io_function, arg1) return result

# For process pool (CPU-bound) from concurrent.futures import ProcessPoolExecutor

process_executor = ProcessPoolExecutor()

async def run_cpu_bound(): loop = asyncio.get_event_loop() return await loop.run_in_executor( process_executor, cpu_intensive_function, data ) ```

### 6. Fix async lock and synchronization issues

Use asyncio.Lock correctly:

```python # WRONG: Not using async lock for shared state counter = 0

async def increment(): global counter # Race condition! current = counter await asyncio.sleep(0.001) # Context switch counter = current + 1

# CORRECT: Use asyncio.Lock lock = asyncio.Lock() counter = 0

async def increment(): global counter async with lock: current = counter await asyncio.sleep(0.001) counter = current + 1

# Or manual acquire/release async def increment(): global counter await lock.acquire() try: current = counter await asyncio.sleep(0.001) counter = current + 1 finally: lock.release() ```

Prevent deadlock with multiple locks:

```python # WRONG: Lock ordering can cause deadlock lock_a = asyncio.Lock() lock_b = asyncio.Lock()

async def task1(): async with lock_a: await asyncio.sleep(0.1) async with lock_b: # May deadlock if task2 holds lock_b pass

async def task2(): async with lock_b: await asyncio.sleep(0.1) async with lock_a: # May deadlock if task1 holds lock_a pass

# CORRECT: Always acquire locks in same order async def task1(): async with lock_a: await asyncio.sleep(0.1) async with lock_b: pass

async def task2(): async with lock_a: # Same order as task1 await asyncio.sleep(0.1) async with lock_b: pass

# Or use asyncio.Semaphore for limited concurrent access semaphore = asyncio.Semaphore(5) # Max 5 concurrent

async def limited_worker(): async with semaphore: await do_work() ```

Use asyncio.Event for signaling:

```python # Signal between coroutines event = asyncio.Event()

async def waiter(): print("Waiting for event...") await event.wait() # Blocks until event.set() print("Event received!")

async def setter(): await asyncio.sleep(1) event.set() # Wake up all waiters

# Run both await asyncio.gather(waiter(), setter())

# Use Condition for complex synchronization condition = asyncio.Condition() queue = []

async def producer(): async with condition: queue.append(item) condition.notify() # Wake one consumer

async def consumer(): async with condition: while not queue: await condition.wait() # Wait for notification item = queue.pop(0) return item ```

### 7. Fix timeout handling

Use asyncio.wait_for:

```python # WRONG: No timeout - can hang forever async def fetch(): return await slow_operation()

# CORRECT: Wrap with timeout async def fetch(): try: return await asyncio.wait_for( slow_operation(), timeout=30.0 ) except asyncio.TimeoutError: print("Operation timed out") # Handle timeout - cleanup if needed raise

# Timeout with cleanup class Resource: def __init__(self): self.connection = None

async def connect(self): self.connection = await create_connection()

async def close(self): if self.connection: await self.connection.close()

async def operation_with_timeout(resource, timeout=30.0): try: return await asyncio.wait_for( resource.connect(), timeout=timeout ) except asyncio.TimeoutError: await resource.close() # Cleanup on timeout raise ```

Implement timeout context manager:

```python from contextlib import asynccontextmanager

@asynccontextmanager async def timeout_context(seconds, operation_name="Operation"): try: async with asyncio.timeout(seconds): yield except asyncio.TimeoutError: print(f"{operation_name} timed out after {seconds}s") raise

# Usage (Python 3.11+) async def fetch(): async with timeout_context(30.0, "Database query"): return await db.query()

# For Python 3.10 and earlier @asynccontextmanager async def timeout_context(seconds, operation_name="Operation"): try: await asyncio.wait_for( asyncio.sleep(0), # Dummy await timeout=seconds ) yield except asyncio.TimeoutError: print(f"{operation_name} timed out") raise

# Or use wait_for directly async def fetch(): return await asyncio.wait_for(db.query(), timeout=30.0) ```

Handle timeout in task groups:

```python # Python 3.11+ asyncio.TaskGroup async def run_with_timeout(): try: async with asyncio.TaskGroup() as tg: task1 = tg.create_task(operation1()) task2 = tg.create_task(operation2())

# Wait for both with overall timeout await asyncio.wait_for( asyncio.sleep(0), # Just for timeout timeout=60.0 ) except* asyncio.TimeoutError: print("Task group timed out") except* Exception as eg: print(f"Tasks failed: {eg}")

# For earlier Python, use gather with timeout async def run_with_timeout(): try: await asyncio.wait_for( asyncio.gather(operation1(), operation2()), timeout=60.0 ) except asyncio.TimeoutError: print("Operations timed out") ```

### 8. Fix async generator cleanup

Properly close async generators:

```python # WRONG: Async generator not closed async def process(): async for item in async_generator(): if should_stop(item): break # Generator not closed! # Resource leak if generator holds resources

# CORRECT: Use async with for async context manager async def process(): async with async_resource() as gen: async for item in gen: if should_stop(item): break

# Or manually close async def process(): gen = async_generator() try: async for item in gen: if should_stop(item): break finally: await gen.aclose() # Proper cleanup ```

Implement async generator with cleanup:

```python # Async generator with try/finally for cleanup async def async_resource(): resource = await acquire_resource() try: while True: item = await resource.get_next() if item is None: break yield item finally: # Always runs on break, exception, or completion await resource.close()

# Usage async def process(): async for item in async_resource(): process_item(item) # Resource automatically closed

# Async generator with exception handling async def safe_generator(): try: for i in range(100): yield await process(i) except asyncio.CancelledError: # Handle cancellation gracefully await cleanup() raise except Exception as e: # Log but don't hide exception print(f"Generator error: {e}") raise finally: await final_cleanup() ```

### 9. Debug asyncio performance issues

Profile async code:

```python # Use asyncio.slow_callback_duration import asyncio

loop = asyncio.get_event_loop() loop.slow_callback_duration = 0.05 # Warn if > 50ms

# Monitor with aioconsole for interactive debugging from aioconsole import ainput

async def debug_prompt(): while True: cmd = await ainput("debug> ") if cmd == "tasks": for task in asyncio.all_tasks(): print(f" {task.get_name()}: done={task.done()}") elif cmd == "quit": break

# Use aiomonitor for REPL into running event loop import aiomonitor

async def main(): with aiomonitor.start_monitor(): await run_application()

# Provides telnet interface to inspect tasks, run code ```

Detect event loop starvation:

```python # Watchdog for event loop responsiveness class LoopWatchdog: def __init__(self, threshold=1.0): self.threshold = threshold self.last_check = asyncio.get_event_loop().time()

async def check(self): now = asyncio.get_event_loop().time() elapsed = now - self.last_check

if elapsed > self.threshold: print(f"WARNING: Event loop blocked for {elapsed:.2f}s")

self.last_check = now await asyncio.sleep(0) # Yield control

# Usage watchdog = LoopWatchdog(threshold=0.5)

async def main(): asyncio.create_task(monitor_loop())

while True: await watchdog.check() await do_work()

async def monitor_loop(): watchdog = LoopWatchdog() while True: await watchdog.check() await asyncio.sleep(0.1) ```

### 10. Fix framework-specific asyncio issues

FastAPI async dependency handling:

```python from fastapi import FastAPI, Depends import asyncio

app = FastAPI()

# WRONG: Async dependency called without await async def get_db(): db = await create_connection() try: yield db finally: await db.close()

@app.get("/items") async def get_items(db = Depends(get_db)): # FastAPI handles this # But manually calling would be wrong: # db = get_db() # Returns coroutine! # CORRECT: db = await get_db() pass

# FastAPI automatically awaits async dependencies # No need to manually await in route handlers ```

aiohttp client session management:

```python import aiohttp from aiohttp import ClientSession

# WRONG: Creating session per request async def fetch(url): async with ClientSession() as session: # Expensive! async with session.get(url) as response: return await response.text()

# CORRECT: Reuse session session = None

async def get_session(): global session if session is None: session = ClientSession() return session

async def fetch(url): s = await get_session() async with s.get(url) as response: return await response.text()

# Best: Use application-managed session class App: def __init__(self): self.session = None

async def startup(self): self.session = aiohttp.ClientSession()

async def shutdown(self): await self.session.close()

async def fetch(self, url): async with self.session.get(url) as response: return await response.text() ```

Prevention

  • Always await async function calls - use linters to detect missing await
  • Use asyncio.to_thread for blocking sync operations
  • Handle CancelledError in long-running tasks for clean shutdown
  • Use asyncio.Lock for shared state access in async context
  • Wrap external I/O with asyncio.wait_for timeouts
  • Close async generators with aclose() or async with
  • Enable asyncio debug mode in development and staging
  • Use async libraries (aiohttp, asyncpg, aiofiles) over sync alternatives
  • **RuntimeError: Event loop is closed**: Loop accessed after close
  • **CancelledError not handled**: Task cleanup skipped
  • **await wasn't used on future**: Coroutine not awaited
  • **Blocking call on event loop**: Sync operation freezing async
  • **Task exception never retrieved**: Task failure not handled