Introduction
The asyncio event loop is the core scheduler that runs all async tasks. When the loop is closed - either explicitly, due to an error, or after asyncio.run() completes - any attempt to schedule new tasks or run coroutines on that loop raises RuntimeError: Event loop is closed. This commonly happens in long-running applications, test suites, and interactive environments.
Symptoms
RuntimeError: Event loop is closedRuntimeError: cannot schedule new futures after shutdown- Background tasks fail silently after main task completes
- Error occurs during application shutdown or test cleanup
- Jupyter notebook cells fail after restarting the kernel
Traceback (most recent call last):
File "app.py", line 15, in cleanup
await session.close()
File ".../asyncio/base_events.py", line 686, in call_soon
self._check_closed()
File ".../asyncio/base_events.py", line 510, in _check_closed
raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closedCommon Causes
- Calling
loop.close()while tasks are still running asyncio.run()finishes and closes the loop before background tasks complete- Reusing a closed loop across test cases
- Jupyter notebook kernel restart closing the loop
- Signal handlers closing the loop during shutdown
Step-by-Step Fix
- 1.Ensure all tasks complete before closing the loop:
- 2.```python
- 3.import asyncio
async def main(): # Create background tasks task1 = asyncio.create_task(background_worker()) task2 = asyncio.create_task(monitor())
# Do main work await do_main_work()
# Cancel and wait for background tasks for task in [task1, task2]: task.cancel() try: await task except asyncio.CancelledError: pass
asyncio.run(main()) # Loop closes only after main() fully completes ```
- 1.Use shutdown_default_executor for thread pools:
- 2.```python
- 3.import asyncio
async def main(): loop = asyncio.get_event_loop() try: await do_work() finally: # Ensure thread pool executor is shut down await loop.shutdown_default_executor()
asyncio.run(main()) ```
- 1.Handle cleanup in proper order:
- 2.```python
- 3.import asyncio
- 4.import aiohttp
async def main(): session = aiohttp.ClientSession() try: async with session.get('https://api.example.com/data') as resp: data = await resp.json() return data finally: await session.close() # Close session BEFORE loop closes
# asyncio.run() handles loop lifecycle automatically result = asyncio.run(main()) ```
- 1.Fix Jupyter notebook event loop issues:
- 2.```python
- 3.# In Jupyter, use nest_asyncio to allow nested event loops
- 4.import nest_asyncio
- 5.nest_asyncio.apply()
import asyncio # Now you can use asyncio.run() in notebook cells asyncio.run(some_async_function()) ```
- 1.Create a new loop if the previous one was closed:
- 2.```python
- 3.import asyncio
def run_with_fresh_loop(coro): """Run coroutine on a fresh event loop.""" loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: return loop.run_until_complete(coro) finally: loop.close() asyncio.set_event_loop(None)
# Use between tests or operations run_with_fresh_loop(async_task_1()) run_with_fresh_loop(async_task_2()) # Fresh loop, no conflict ```
Prevention
- Always use
asyncio.run()instead of manual loop management when possible - Register cleanup with
loop.add_signal_handler()for graceful shutdown - Use
async withfor context managers that need async cleanup - In tests, create a fresh loop per test with pytest-asyncio
- Track all created tasks and ensure they complete during shutdown
- Use
atexithandlers to verify no pending tasks remain