Introduction

The asyncio event loop is the core scheduler that runs all async tasks. When the loop is closed - either explicitly, due to an error, or after asyncio.run() completes - any attempt to schedule new tasks or run coroutines on that loop raises RuntimeError: Event loop is closed. This commonly happens in long-running applications, test suites, and interactive environments.

Symptoms

  • RuntimeError: Event loop is closed
  • RuntimeError: cannot schedule new futures after shutdown
  • Background tasks fail silently after main task completes
  • Error occurs during application shutdown or test cleanup
  • Jupyter notebook cells fail after restarting the kernel
bash
Traceback (most recent call last):
  File "app.py", line 15, in cleanup
    await session.close()
  File ".../asyncio/base_events.py", line 686, in call_soon
    self._check_closed()
  File ".../asyncio/base_events.py", line 510, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed

Common Causes

  • Calling loop.close() while tasks are still running
  • asyncio.run() finishes and closes the loop before background tasks complete
  • Reusing a closed loop across test cases
  • Jupyter notebook kernel restart closing the loop
  • Signal handlers closing the loop during shutdown

Step-by-Step Fix

  1. 1.Ensure all tasks complete before closing the loop:
  2. 2.```python
  3. 3.import asyncio

async def main(): # Create background tasks task1 = asyncio.create_task(background_worker()) task2 = asyncio.create_task(monitor())

# Do main work await do_main_work()

# Cancel and wait for background tasks for task in [task1, task2]: task.cancel() try: await task except asyncio.CancelledError: pass

asyncio.run(main()) # Loop closes only after main() fully completes ```

  1. 1.Use shutdown_default_executor for thread pools:
  2. 2.```python
  3. 3.import asyncio

async def main(): loop = asyncio.get_event_loop() try: await do_work() finally: # Ensure thread pool executor is shut down await loop.shutdown_default_executor()

asyncio.run(main()) ```

  1. 1.Handle cleanup in proper order:
  2. 2.```python
  3. 3.import asyncio
  4. 4.import aiohttp

async def main(): session = aiohttp.ClientSession() try: async with session.get('https://api.example.com/data') as resp: data = await resp.json() return data finally: await session.close() # Close session BEFORE loop closes

# asyncio.run() handles loop lifecycle automatically result = asyncio.run(main()) ```

  1. 1.Fix Jupyter notebook event loop issues:
  2. 2.```python
  3. 3.# In Jupyter, use nest_asyncio to allow nested event loops
  4. 4.import nest_asyncio
  5. 5.nest_asyncio.apply()

import asyncio # Now you can use asyncio.run() in notebook cells asyncio.run(some_async_function()) ```

  1. 1.Create a new loop if the previous one was closed:
  2. 2.```python
  3. 3.import asyncio

def run_with_fresh_loop(coro): """Run coroutine on a fresh event loop.""" loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: return loop.run_until_complete(coro) finally: loop.close() asyncio.set_event_loop(None)

# Use between tests or operations run_with_fresh_loop(async_task_1()) run_with_fresh_loop(async_task_2()) # Fresh loop, no conflict ```

Prevention

  • Always use asyncio.run() instead of manual loop management when possible
  • Register cleanup with loop.add_signal_handler() for graceful shutdown
  • Use async with for context managers that need async cleanup
  • In tests, create a fresh loop per test with pytest-asyncio
  • Track all created tasks and ensure they complete during shutdown
  • Use atexit handlers to verify no pending tasks remain