Python - Context Managers (contextlib)
• Context managers automate resource cleanup using `__enter__` and `__exit__` methods, preventing resource leaks even when exceptions occur
Key Insights
• Context managers automate resource cleanup using __enter__ and __exit__ methods, preventing resource leaks even when exceptions occur
• The contextlib module provides decorators and utilities to create context managers without boilerplate class definitions
• Advanced patterns like ExitStack and AsyncExitStack enable dynamic resource management and composable cleanup logic
Understanding Context Manager Protocol
Context managers in Python implement two methods: __enter__() and __exit__(). The __enter__ method executes when entering the with block, while __exit__ handles cleanup regardless of how the block terminates.
class DatabaseConnection:
def __init__(self, connection_string):
self.connection_string = connection_string
self.connection = None
def __enter__(self):
print(f"Connecting to {self.connection_string}")
self.connection = self._establish_connection()
return self.connection
def __exit__(self, exc_type, exc_value, traceback):
print("Closing connection")
if self.connection:
self.connection.close()
# Return False to propagate exceptions
return False
def _establish_connection(self):
# Simulated connection
return {"status": "connected"}
# Usage
with DatabaseConnection("postgresql://localhost") as conn:
print(f"Using connection: {conn}")
# Connection automatically closes after this block
The __exit__ method receives three arguments: exception type, exception value, and traceback. Returning True suppresses exceptions, while False or None propagates them.
Creating Context Managers with contextlib.contextmanager
The @contextmanager decorator transforms generator functions into context managers, eliminating class boilerplate. Code before yield executes in __enter__, and code after handles cleanup.
from contextlib import contextmanager
import time
import logging
@contextmanager
def timer(operation_name):
start_time = time.time()
logging.info(f"Starting {operation_name}")
try:
yield # Control returns to with block
finally:
elapsed = time.time() - start_time
logging.info(f"{operation_name} completed in {elapsed:.2f}s")
# Usage
with timer("data processing"):
# Simulate work
time.sleep(1)
result = sum(range(1000000))
# Output:
# INFO:root:Starting data processing
# INFO:root:data processing completed in 1.00s
You can yield values to provide resources to the with block:
@contextmanager
def temporary_attribute(obj, attr_name, value):
"""Temporarily set an attribute on an object."""
original = getattr(obj, attr_name, None)
setattr(obj, attr_name, value)
try:
yield obj
finally:
if original is None:
delattr(obj, attr_name)
else:
setattr(obj, attr_name, original)
class Config:
debug = False
config = Config()
print(f"Before: {config.debug}") # False
with temporary_attribute(config, 'debug', True):
print(f"Inside: {config.debug}") # True
print(f"After: {config.debug}") # False
Exception Handling in Context Managers
Context managers execute cleanup code even when exceptions occur. You can catch and handle exceptions within the context manager:
@contextmanager
def error_handler(operation):
try:
yield
except ValueError as e:
logging.error(f"{operation} failed with ValueError: {e}")
# Exception is suppressed - won't propagate
except Exception as e:
logging.error(f"{operation} failed unexpectedly: {e}")
raise # Re-raise other exceptions
with error_handler("parsing"):
value = int("not_a_number") # Logs error, continues execution
print("Execution continues") # This runs
Managing Multiple Resources with ExitStack
ExitStack dynamically manages multiple context managers, useful when the number of resources isn’t known upfront:
from contextlib import ExitStack
import tempfile
def process_files(filenames):
with ExitStack() as stack:
# Open all files and register them for cleanup
files = [stack.enter_context(open(fname)) for fname in filenames]
# Process all files
for f in files:
content = f.read()
print(f"Processing {f.name}: {len(content)} bytes")
# All files automatically closed when exiting
# Usage
temp_files = []
for i in range(3):
tf = tempfile.NamedTemporaryFile(mode='w', delete=False)
tf.write(f"Content {i}" * 100)
tf.close()
temp_files.append(tf.name)
process_files(temp_files)
ExitStack supports conditional resource management:
@contextmanager
def optional_transaction(connection, use_transaction=True):
with ExitStack() as stack:
if use_transaction:
stack.enter_context(connection.begin())
yield connection
# Transaction only starts if use_transaction=True
with optional_transaction(conn, use_transaction=False) as c:
c.execute("SELECT * FROM users")
Callback Registration with ExitStack
Register arbitrary cleanup callbacks using ExitStack.callback():
from contextlib import ExitStack
def setup_test_environment():
stack = ExitStack()
# Register cleanup functions
stack.callback(lambda: print("Cleanup: Removing temp files"))
stack.callback(lambda: print("Cleanup: Closing connections"))
# Setup code
print("Setup: Creating resources")
return stack
with setup_test_environment():
print("Running tests")
# Output:
# Setup: Creating resources
# Running tests
# Cleanup: Closing connections
# Cleanup: Removing temp files (LIFO order)
Callbacks execute in LIFO order, matching the natural cleanup sequence:
def deploy_application(config):
with ExitStack() as stack:
# Start services in order
database = start_database(config)
stack.callback(database.stop)
cache = start_cache(config)
stack.callback(cache.stop)
app = start_app(config)
stack.callback(app.stop)
# Services stop in reverse order: app, cache, database
yield app
Suppressing Exceptions with suppress
The suppress context manager catches and silences specific exceptions:
from contextlib import suppress
import os
# Remove file if it exists, ignore if it doesn't
with suppress(FileNotFoundError):
os.remove('nonexistent_file.txt')
# Equivalent to:
try:
os.remove('nonexistent_file.txt')
except FileNotFoundError:
pass
Suppress multiple exception types:
def safe_parse(data):
with suppress(ValueError, TypeError, KeyError):
return int(data['value'])
return None # Return default if parsing fails
result = safe_parse({'value': 'invalid'}) # Returns None
Redirecting Output Streams
contextlib provides utilities for temporarily redirecting stdout and stderr:
from contextlib import redirect_stdout, redirect_stderr
from io import StringIO
def capture_output(func):
stdout_capture = StringIO()
stderr_capture = StringIO()
with redirect_stdout(stdout_capture), redirect_stderr(stderr_capture):
func()
return stdout_capture.getvalue(), stderr_capture.getvalue()
def noisy_function():
print("This goes to stdout")
import sys
print("This goes to stderr", file=sys.stderr)
stdout, stderr = capture_output(noisy_function)
print(f"Captured stdout: {stdout.strip()}")
print(f"Captured stderr: {stderr.strip()}")
Async Context Managers
For asynchronous code, use @asynccontextmanager:
from contextlib import asynccontextmanager
import asyncio
import aiohttp
@asynccontextmanager
async def async_timer(operation):
start = asyncio.get_event_loop().time()
try:
yield
finally:
elapsed = asyncio.get_event_loop().time() - start
print(f"{operation} took {elapsed:.2f}s")
@asynccontextmanager
async def http_session():
session = aiohttp.ClientSession()
try:
yield session
finally:
await session.close()
async def fetch_data():
async with async_timer("HTTP request"):
async with http_session() as session:
async with session.get('https://api.example.com') as response:
return await response.text()
# asyncio.run(fetch_data())
AsyncExitStack provides async equivalents of ExitStack functionality:
from contextlib import AsyncExitStack
async def process_async_resources(urls):
async with AsyncExitStack() as stack:
# Enter multiple async context managers
sessions = []
for url in urls:
session = await stack.enter_async_context(http_session())
sessions.append(session)
# Use all sessions
tasks = [session.get(url) for session, url in zip(sessions, urls)]
responses = await asyncio.gather(*tasks)
# All sessions automatically closed
Practical Patterns
Combine context managers for complex resource management:
@contextmanager
def atomic_write(filepath):
"""Write to temporary file, rename on success."""
temp_path = f"{filepath}.tmp"
try:
with open(temp_path, 'w') as f:
yield f
# Success - rename temp file to target
os.replace(temp_path, filepath)
except Exception:
# Failure - remove temp file
with suppress(FileNotFoundError):
os.remove(temp_path)
raise
with atomic_write('config.json') as f:
f.write('{"setting": "value"}')
# File only updated if no exceptions occurred
Context managers ensure resources are properly managed, making code more reliable and maintainable. The contextlib module eliminates boilerplate while providing powerful composition patterns for complex resource scenarios.