Introduction
Python asyncio event loop errors occur when asynchronous code mismanages coroutines, blocks the event loop, or mishandles task lifecycle, causing applications to hang, crash, or behave unpredictably. The asyncio library provides infrastructure for writing concurrent code using the async/await syntax, managing an event loop that schedules and executes coroutines. Errors manifest at multiple layers: coroutine not awaited properly, event loop closed prematurely, blocking calls freezing the event loop, task cancellation not handled, race conditions in shared state, deadlock from improper lock usage, or event loop policy mismatches. Common causes include calling async functions without await, using synchronous blocking I/O in async context, not awaiting task completion before loop close, cancelling tasks without proper cleanup, accessing shared state without locks, nested event loops conflicting (asyncio.run inside running loop), thread safety violations calling async code from threads, improper timeout handling causing resource leaks, and mixing asyncio with threading/multiprocessing incorrectly. The fix requires understanding the async/await paradigm, event loop mechanics, Task scheduling, cancellation propagation, and proper async patterns for I/O operations. This guide provides production-proven debugging patterns for asyncio applications across web frameworks (FastAPI, aiohttp, Tornado), background task processors, and microservices.
Symptoms
RuntimeError: This event loop is already runningRuntimeError: Event loop is closedRuntimeError: await wasn't used on future- Application hangs indefinitely on async operation
CancelledErrornot caught causing task failureasyncio.exceptions.TimeoutErrornot handled- Blocking call freezes entire application
Task exception was never retrievedwarnings- Coroutines not executing in expected order
- Race conditions in shared async state
- Deadlock when multiple async locks contended
- Memory leak from unbounded task creation
get_event_loopreturns wrong loop in nested calls- Async generator not cleaned up properly
Common Causes
- Calling async function without await keyword
- Blocking I/O (time.sleep, requests) in async context
- Task not awaited before event loop close
- Cancellation not handled in task cleanup
- Shared state accessed without asyncio.Lock
- Nested event loop conflicts (asyncio.run misuse)
- Calling async code from synchronous thread
- Timeout not wrapping async operation properly
- Async generator not aclosed() before discard
- Event loop policy not set for async frameworks
Step-by-Step Fix
### 1. Diagnose asyncio errors
Enable asyncio debug mode:
```python # Enable debug mode for detailed asyncio logging import asyncio import logging
logging.basicConfig(level=logging.DEBUG)
# Set debug on event loop asyncio.run(main(), debug=True)
# Or manually set debug loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.set_debug(True) loop.run_until_complete(main())
# Debug output shows: # - Slow coroutines (>100ms execution time) # - Unclosed event loops # - Unclosed transports/protocols # - Exceptions not retrieved from tasks
# Environment variable alternative # PYTHONASYNCIODEBUG=1 python app.py ```
Detect blocking calls:
```python # Use aiofiles instead of sync file I/O # WRONG: Blocking call in async context async def read_file(): with open('data.txt', 'r') as f: # Blocks event loop! return f.read()
# CORRECT: Use asyncio.to_thread for sync operations async def read_file(): return await asyncio.to_thread(open, 'data.txt', 'r')
# Or use async file library import aiofiles
async def read_file(): async with aiofiles.open('data.txt', 'r') as f: return await f.read()
# Detect blocking with asyncio.slow_callback_duration import asyncio
# Set threshold for slow callback warning (default 0.1s) asyncio.get_event_loop().slow_callback_duration = 0.05 # 50ms
# Logs warnings for callbacks exceeding threshold ```
Inspect running tasks:
```python # Get all running tasks tasks = asyncio.all_tasks() print(f"Running tasks: {len(tasks)}")
for task in tasks: print(f"Task: {task.get_name()}") print(f" Done: {task.done()}") print(f" Cancelled: {task.cancelled()}")
if task.done() and not task.cancelled(): try: result = task.result() print(f" Result: {result}") except Exception as e: print(f" Exception: {e}")
# Get current task current = asyncio.current_task() print(f"Current task: {current.get_name()}")
# Monitor tasks periodically async def monitor_tasks(): while True: await asyncio.sleep(5) tasks = asyncio.all_tasks() pending = [t for t in tasks if not t.done()] print(f"Pending tasks: {len(pending)}") ```
### 2. Fix missing await errors
Await all async function calls:
```python # WRONG: Async function called but not awaited async def fetch_data(): response = asyncio.sleep(1) # Returns coroutine, doesn't sleep! return response
# CORRECT: Await the coroutine async def fetch_data(): response = await asyncio.sleep(1) # Actually waits return response
# Common mistake patterns:
# Pattern 1: Async function in list comprehension # WRONG async def fetch_all(): urls = ['url1', 'url2', 'url3'] # Creates list of coroutines, doesn't execute coros = [fetch(url) for url in urls] return coros
# CORRECT async def fetch_all(): urls = ['url1', 'url2', 'url3'] results = await asyncio.gather(*[fetch(url) for url in urls]) return results
# Pattern 2: Async function as callback # WRONG async def setup(): timer = asyncio.create_task(scheduled_task()) # timer is Task, need to await it later
# CORRECT async def setup(): timer = asyncio.create_task(scheduled_task()) await timer # Wait for completion
# Pattern 3: Async generator iteration # WRONG async def process_items(): async for item in async_generator(): # Correct pass
# But this is wrong: items = [x for x in async_generator()] # Doesn't work!
# CORRECT async def process_items(): items = [x async for x in async_generator()] ```
Handle async context managers:
```python # WRONG: Sync context manager with async resource async def get_session(): session = aiohttp.ClientSession() # Never closed! return session
# CORRECT: Async context manager async def get_session(): async with aiohttp.ClientSession() as session: return await session.get('http://example.com')
# Custom async context manager class AsyncResource: async def __aenter__(self): self.resource = await acquire_resource() return self.resource
async def __aexit__(self, exc_type, exc_val, exc_tb): await self.resource.close()
# Usage async def use_resource(): async with AsyncResource() as resource: await resource.do_work() ```
### 3. Fix event loop lifecycle issues
Properly manage event loop lifecycle:
```python # WRONG: Manual loop management prone to errors loop = asyncio.get_event_loop() try: loop.run_until_complete(main()) finally: loop.close()
# This can fail if loop already running or closed
# CORRECT: Use asyncio.run (Python 3.7+) asyncio.run(main())
# asyncio.run handles: # - Creates new event loop # - Sets as current loop # - Runs main coroutine # - Cleans up async generators # - Closes event loop
# For nested event loops (not recommended): # WRONG async def outer(): asyncio.run(inner()) # RuntimeError!
# CORRECT: Use create_task instead async def outer(): task = asyncio.create_task(inner()) await task
# Or use asyncio.create_task for fire-and-forget async def outer(): asyncio.create_task(inner()) # Fire and forget await asyncio.sleep(0) # Yield control ```
Handle loop in threading scenarios:
```python # Running async code from synchronous thread import asyncio import threading
def sync_function(): # WRONG: get_event_loop() fails if no loop in thread loop = asyncio.get_event_loop() loop.run_until_complete(async_task())
# CORRECT: Create new loop for thread def sync_function(): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: loop.run_until_complete(async_task()) finally: loop.close()
# Or use asyncio.run (Python 3.7+) def sync_function(): asyncio.run(async_task())
# Calling async from main thread when loop running in background async def call_from_sync(): loop = asyncio.get_running_loop()
# Schedule coroutine on running loop future = asyncio.run_coroutine_threadsafe(async_task(), loop) result = future.result() # Blocking wait return result
# Better: await the task async def call_from_sync(): task = asyncio.create_task(async_task()) return await task ```
Fix asyncio.run conflicts:
```python # Problem: asyncio.run cannot be called inside running loop # Common in Jupyter notebooks, web frameworks with async support
# WRONG (in Jupyter or FastAPI endpoint) async def handler(): asyncio.run(some_async_function()) # RuntimeError!
# CORRECT: Just await directly async def handler(): await some_async_function()
# For Jupyter notebooks, use nest_asyncio import nest_asyncio nest_asyncio.apply()
# Now asyncio.run works in notebook cells asyncio.run(main())
# Or use ipyasync extension for Jupyter ```
### 4. Fix task cancellation handling
Handle CancelledError properly:
```python # WRONG: Catching and suppressing CancelledError async def worker(): try: while True: await asyncio.sleep(1) except asyncio.CancelledError: pass # BAD: Task appears cancelled but continues!
# CORRECT: Re-raise CancelledError after cleanup async def worker(): try: while True: await do_work() except asyncio.CancelledError: # Perform cleanup await cleanup_resources() # Re-raise to indicate cancellation complete raise
# OR: Use finally for cleanup async def worker(): try: while True: await do_work() finally: # Always runs on cancellation await cleanup_resources() ```
Cancel tasks gracefully:
```python # Create and cancel task task = asyncio.create_task(worker())
# Wait for task to complete with timeout try: await asyncio.wait_for(task, timeout=30.0) except asyncio.TimeoutError: task.cancel() try: await task # Wait for cancellation to complete except asyncio.CancelledError: pass # Expected
# Cancel multiple tasks tasks = [ asyncio.create_task(worker1()), asyncio.create_task(worker2()), asyncio.create_task(worker3()), ]
# Signal cancellation for task in tasks: task.cancel()
# Wait for all to complete cancellation results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results): if isinstance(result, asyncio.CancelledError): print(f"Task {i} was cancelled") elif isinstance(result, Exception): print(f"Task {i} failed: {result}") ```
Implement shutdown with cancellation:
```python import signal
class GracefulShutdown: def __init__(self): self.tasks = set() self.shutdown_event = asyncio.Event()
def create_task(self, coro): task = asyncio.create_task(coro) self.tasks.add(task) task.add_done_callback(self.tasks.discard) return task
async def shutdown(self): print("Shutting down...") self.shutdown_event.set()
# Cancel all pending tasks for task in self.tasks: task.cancel()
# Wait for tasks to complete with timeout if self.tasks: done, pending = await asyncio.wait( self.tasks, timeout=5.0 )
# Cancel any remaining for task in pending: task.cancel()
# Wait for final cleanup await asyncio.gather(*pending, return_exceptions=True)
# Usage shutdown = GracefulShutdown()
async def main(): task1 = shutdown.create_task(worker1()) task2 = shutdown.create_task(worker2())
# Wait for shutdown signal await shutdown.shutdown_event.wait()
# Run with signal handling loop = asyncio.get_event_loop()
for sig in (signal.SIGTERM, signal.SIGINT): loop.add_signal_handler( sig, lambda: asyncio.create_task(shutdown.shutdown()) )
loop.run_until_complete(main()) ```
### 5. Fix blocking calls in async context
Replace blocking I/O with async alternatives:
```python # Network I/O # WRONG: requests blocks event loop import requests
async def fetch(): response = requests.get('http://example.com') # Blocks! return response.text
# CORRECT: Use aiohttp import aiohttp
async def fetch(): async with aiohttp.ClientSession() as session: async with session.get('http://example.com') as response: return await response.text()
# File I/O # WRONG: open() blocks event loop async def read_file(): with open('data.txt') as f: # Blocks! return f.read()
# CORRECT: Use aiofiles or to_thread import aiofiles
async def read_file(): async with aiofiles.open('data.txt') as f: return await f.read()
# Or for one-off operations async def read_file(): return await asyncio.to_thread( lambda: open('data.txt').read() )
# Sleep # WRONG: time.sleep blocks event loop import time
async def wait(): time.sleep(5) # Blocks entire event loop!
# CORRECT: asyncio.sleep async def wait(): await asyncio.sleep(5)
# Database (example with asyncpg for PostgreSQL) # WRONG: psycopg2 (sync) import psycopg2
async def query(): conn = psycopg2.connect(...) # Blocks! cursor = conn.cursor() cursor.execute("SELECT * FROM users") return cursor.fetchall()
# CORRECT: asyncpg import asyncpg
async def query(): conn = await asyncpg.connect(...) rows = await conn.fetch("SELECT * FROM users") await conn.close() return rows ```
Run blocking code in executor:
```python # For operations that can't be made async import asyncio from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers=4)
async def run_blocking(): loop = asyncio.get_event_loop()
# Run CPU-bound function in process pool result = await loop.run_in_executor( None, # Default executor cpu_intensive_function, arg1, arg2 )
# Run I/O-bound function in thread pool result = await loop.run_in_executor( executor, io_function, arg1 )
return result
# Or use asyncio.to_thread (Python 3.9+) async def run_blocking(): result = await asyncio.to_thread(io_function, arg1) return result
# For process pool (CPU-bound) from concurrent.futures import ProcessPoolExecutor
process_executor = ProcessPoolExecutor()
async def run_cpu_bound(): loop = asyncio.get_event_loop() return await loop.run_in_executor( process_executor, cpu_intensive_function, data ) ```
### 6. Fix async lock and synchronization issues
Use asyncio.Lock correctly:
```python # WRONG: Not using async lock for shared state counter = 0
async def increment(): global counter # Race condition! current = counter await asyncio.sleep(0.001) # Context switch counter = current + 1
# CORRECT: Use asyncio.Lock lock = asyncio.Lock() counter = 0
async def increment(): global counter async with lock: current = counter await asyncio.sleep(0.001) counter = current + 1
# Or manual acquire/release async def increment(): global counter await lock.acquire() try: current = counter await asyncio.sleep(0.001) counter = current + 1 finally: lock.release() ```
Prevent deadlock with multiple locks:
```python # WRONG: Lock ordering can cause deadlock lock_a = asyncio.Lock() lock_b = asyncio.Lock()
async def task1(): async with lock_a: await asyncio.sleep(0.1) async with lock_b: # May deadlock if task2 holds lock_b pass
async def task2(): async with lock_b: await asyncio.sleep(0.1) async with lock_a: # May deadlock if task1 holds lock_a pass
# CORRECT: Always acquire locks in same order async def task1(): async with lock_a: await asyncio.sleep(0.1) async with lock_b: pass
async def task2(): async with lock_a: # Same order as task1 await asyncio.sleep(0.1) async with lock_b: pass
# Or use asyncio.Semaphore for limited concurrent access semaphore = asyncio.Semaphore(5) # Max 5 concurrent
async def limited_worker(): async with semaphore: await do_work() ```
Use asyncio.Event for signaling:
```python # Signal between coroutines event = asyncio.Event()
async def waiter(): print("Waiting for event...") await event.wait() # Blocks until event.set() print("Event received!")
async def setter(): await asyncio.sleep(1) event.set() # Wake up all waiters
# Run both await asyncio.gather(waiter(), setter())
# Use Condition for complex synchronization condition = asyncio.Condition() queue = []
async def producer(): async with condition: queue.append(item) condition.notify() # Wake one consumer
async def consumer(): async with condition: while not queue: await condition.wait() # Wait for notification item = queue.pop(0) return item ```
### 7. Fix timeout handling
Use asyncio.wait_for:
```python # WRONG: No timeout - can hang forever async def fetch(): return await slow_operation()
# CORRECT: Wrap with timeout async def fetch(): try: return await asyncio.wait_for( slow_operation(), timeout=30.0 ) except asyncio.TimeoutError: print("Operation timed out") # Handle timeout - cleanup if needed raise
# Timeout with cleanup class Resource: def __init__(self): self.connection = None
async def connect(self): self.connection = await create_connection()
async def close(self): if self.connection: await self.connection.close()
async def operation_with_timeout(resource, timeout=30.0): try: return await asyncio.wait_for( resource.connect(), timeout=timeout ) except asyncio.TimeoutError: await resource.close() # Cleanup on timeout raise ```
Implement timeout context manager:
```python from contextlib import asynccontextmanager
@asynccontextmanager async def timeout_context(seconds, operation_name="Operation"): try: async with asyncio.timeout(seconds): yield except asyncio.TimeoutError: print(f"{operation_name} timed out after {seconds}s") raise
# Usage (Python 3.11+) async def fetch(): async with timeout_context(30.0, "Database query"): return await db.query()
# For Python 3.10 and earlier @asynccontextmanager async def timeout_context(seconds, operation_name="Operation"): try: await asyncio.wait_for( asyncio.sleep(0), # Dummy await timeout=seconds ) yield except asyncio.TimeoutError: print(f"{operation_name} timed out") raise
# Or use wait_for directly async def fetch(): return await asyncio.wait_for(db.query(), timeout=30.0) ```
Handle timeout in task groups:
```python # Python 3.11+ asyncio.TaskGroup async def run_with_timeout(): try: async with asyncio.TaskGroup() as tg: task1 = tg.create_task(operation1()) task2 = tg.create_task(operation2())
# Wait for both with overall timeout await asyncio.wait_for( asyncio.sleep(0), # Just for timeout timeout=60.0 ) except* asyncio.TimeoutError: print("Task group timed out") except* Exception as eg: print(f"Tasks failed: {eg}")
# For earlier Python, use gather with timeout async def run_with_timeout(): try: await asyncio.wait_for( asyncio.gather(operation1(), operation2()), timeout=60.0 ) except asyncio.TimeoutError: print("Operations timed out") ```
### 8. Fix async generator cleanup
Properly close async generators:
```python # WRONG: Async generator not closed async def process(): async for item in async_generator(): if should_stop(item): break # Generator not closed! # Resource leak if generator holds resources
# CORRECT: Use async with for async context manager async def process(): async with async_resource() as gen: async for item in gen: if should_stop(item): break
# Or manually close async def process(): gen = async_generator() try: async for item in gen: if should_stop(item): break finally: await gen.aclose() # Proper cleanup ```
Implement async generator with cleanup:
```python # Async generator with try/finally for cleanup async def async_resource(): resource = await acquire_resource() try: while True: item = await resource.get_next() if item is None: break yield item finally: # Always runs on break, exception, or completion await resource.close()
# Usage async def process(): async for item in async_resource(): process_item(item) # Resource automatically closed
# Async generator with exception handling async def safe_generator(): try: for i in range(100): yield await process(i) except asyncio.CancelledError: # Handle cancellation gracefully await cleanup() raise except Exception as e: # Log but don't hide exception print(f"Generator error: {e}") raise finally: await final_cleanup() ```
### 9. Debug asyncio performance issues
Profile async code:
```python # Use asyncio.slow_callback_duration import asyncio
loop = asyncio.get_event_loop() loop.slow_callback_duration = 0.05 # Warn if > 50ms
# Monitor with aioconsole for interactive debugging from aioconsole import ainput
async def debug_prompt(): while True: cmd = await ainput("debug> ") if cmd == "tasks": for task in asyncio.all_tasks(): print(f" {task.get_name()}: done={task.done()}") elif cmd == "quit": break
# Use aiomonitor for REPL into running event loop import aiomonitor
async def main(): with aiomonitor.start_monitor(): await run_application()
# Provides telnet interface to inspect tasks, run code ```
Detect event loop starvation:
```python # Watchdog for event loop responsiveness class LoopWatchdog: def __init__(self, threshold=1.0): self.threshold = threshold self.last_check = asyncio.get_event_loop().time()
async def check(self): now = asyncio.get_event_loop().time() elapsed = now - self.last_check
if elapsed > self.threshold: print(f"WARNING: Event loop blocked for {elapsed:.2f}s")
self.last_check = now await asyncio.sleep(0) # Yield control
# Usage watchdog = LoopWatchdog(threshold=0.5)
async def main(): asyncio.create_task(monitor_loop())
while True: await watchdog.check() await do_work()
async def monitor_loop(): watchdog = LoopWatchdog() while True: await watchdog.check() await asyncio.sleep(0.1) ```
### 10. Fix framework-specific asyncio issues
FastAPI async dependency handling:
```python from fastapi import FastAPI, Depends import asyncio
app = FastAPI()
# WRONG: Async dependency called without await async def get_db(): db = await create_connection() try: yield db finally: await db.close()
@app.get("/items") async def get_items(db = Depends(get_db)): # FastAPI handles this # But manually calling would be wrong: # db = get_db() # Returns coroutine! # CORRECT: db = await get_db() pass
# FastAPI automatically awaits async dependencies # No need to manually await in route handlers ```
aiohttp client session management:
```python import aiohttp from aiohttp import ClientSession
# WRONG: Creating session per request async def fetch(url): async with ClientSession() as session: # Expensive! async with session.get(url) as response: return await response.text()
# CORRECT: Reuse session session = None
async def get_session(): global session if session is None: session = ClientSession() return session
async def fetch(url): s = await get_session() async with s.get(url) as response: return await response.text()
# Best: Use application-managed session class App: def __init__(self): self.session = None
async def startup(self): self.session = aiohttp.ClientSession()
async def shutdown(self): await self.session.close()
async def fetch(self, url): async with self.session.get(url) as response: return await response.text() ```
Prevention
- Always await async function calls - use linters to detect missing await
- Use asyncio.to_thread for blocking sync operations
- Handle CancelledError in long-running tasks for clean shutdown
- Use asyncio.Lock for shared state access in async context
- Wrap external I/O with asyncio.wait_for timeouts
- Close async generators with aclose() or async with
- Enable asyncio debug mode in development and staging
- Use async libraries (aiohttp, asyncpg, aiofiles) over sync alternatives
Related Errors
- **RuntimeError: Event loop is closed**: Loop accessed after close
- **CancelledError not handled**: Task cleanup skipped
- **await wasn't used on future**: Coroutine not awaited
- **Blocking call on event loop**: Sync operation freezing async
- **Task exception never retrieved**: Task failure not handled