Python - Context Managers (contextlib)

• Context managers automate resource cleanup using `__enter__` and `__exit__` methods, preventing resource leaks even when exceptions occur

Key Insights

• Context managers automate resource cleanup using __enter__ and __exit__ methods, preventing resource leaks even when exceptions occur • The contextlib module provides decorators and utilities to create context managers without boilerplate class definitions • Advanced patterns like ExitStack and AsyncExitStack enable dynamic resource management and composable cleanup logic

Understanding Context Manager Protocol

Context managers in Python implement two methods: __enter__() and __exit__(). The __enter__ method executes when entering the with block, while __exit__ handles cleanup regardless of how the block terminates.

class DatabaseConnection:
    def __init__(self, connection_string):
        self.connection_string = connection_string
        self.connection = None
    
    def __enter__(self):
        print(f"Connecting to {self.connection_string}")
        self.connection = self._establish_connection()
        return self.connection
    
    def __exit__(self, exc_type, exc_value, traceback):
        print("Closing connection")
        if self.connection:
            self.connection.close()
        # Return False to propagate exceptions
        return False
    
    def _establish_connection(self):
        # Simulated connection
        return {"status": "connected"}

# Usage
with DatabaseConnection("postgresql://localhost") as conn:
    print(f"Using connection: {conn}")
    # Connection automatically closes after this block

The __exit__ method receives three arguments: exception type, exception value, and traceback. Returning True suppresses exceptions, while False or None propagates them.

Creating Context Managers with contextlib.contextmanager

The @contextmanager decorator transforms generator functions into context managers, eliminating class boilerplate. Code before yield executes in __enter__, and code after handles cleanup.

from contextlib import contextmanager
import time
import logging

@contextmanager
def timer(operation_name):
    start_time = time.time()
    logging.info(f"Starting {operation_name}")
    
    try:
        yield  # Control returns to with block
    finally:
        elapsed = time.time() - start_time
        logging.info(f"{operation_name} completed in {elapsed:.2f}s")

# Usage
with timer("data processing"):
    # Simulate work
    time.sleep(1)
    result = sum(range(1000000))

# Output:
# INFO:root:Starting data processing
# INFO:root:data processing completed in 1.00s

You can yield values to provide resources to the with block:

@contextmanager
def temporary_attribute(obj, attr_name, value):
    """Temporarily set an attribute on an object."""
    original = getattr(obj, attr_name, None)
    setattr(obj, attr_name, value)
    
    try:
        yield obj
    finally:
        if original is None:
            delattr(obj, attr_name)
        else:
            setattr(obj, attr_name, original)

class Config:
    debug = False

config = Config()
print(f"Before: {config.debug}")  # False

with temporary_attribute(config, 'debug', True):
    print(f"Inside: {config.debug}")  # True

print(f"After: {config.debug}")  # False

Exception Handling in Context Managers

Context managers execute cleanup code even when exceptions occur. You can catch and handle exceptions within the context manager:

@contextmanager
def error_handler(operation):
    try:
        yield
    except ValueError as e:
        logging.error(f"{operation} failed with ValueError: {e}")
        # Exception is suppressed - won't propagate
    except Exception as e:
        logging.error(f"{operation} failed unexpectedly: {e}")
        raise  # Re-raise other exceptions

with error_handler("parsing"):
    value = int("not_a_number")  # Logs error, continues execution

print("Execution continues")  # This runs

Managing Multiple Resources with ExitStack

ExitStack dynamically manages multiple context managers, useful when the number of resources isn’t known upfront:

from contextlib import ExitStack
import tempfile

def process_files(filenames):
    with ExitStack() as stack:
        # Open all files and register them for cleanup
        files = [stack.enter_context(open(fname)) for fname in filenames]
        
        # Process all files
        for f in files:
            content = f.read()
            print(f"Processing {f.name}: {len(content)} bytes")
        
        # All files automatically closed when exiting

# Usage
temp_files = []
for i in range(3):
    tf = tempfile.NamedTemporaryFile(mode='w', delete=False)
    tf.write(f"Content {i}" * 100)
    tf.close()
    temp_files.append(tf.name)

process_files(temp_files)

ExitStack supports conditional resource management:

@contextmanager
def optional_transaction(connection, use_transaction=True):
    with ExitStack() as stack:
        if use_transaction:
            stack.enter_context(connection.begin())
        
        yield connection

# Transaction only starts if use_transaction=True
with optional_transaction(conn, use_transaction=False) as c:
    c.execute("SELECT * FROM users")

Callback Registration with ExitStack

Register arbitrary cleanup callbacks using ExitStack.callback():

from contextlib import ExitStack

def setup_test_environment():
    stack = ExitStack()
    
    # Register cleanup functions
    stack.callback(lambda: print("Cleanup: Removing temp files"))
    stack.callback(lambda: print("Cleanup: Closing connections"))
    
    # Setup code
    print("Setup: Creating resources")
    
    return stack

with setup_test_environment():
    print("Running tests")

# Output:
# Setup: Creating resources
# Running tests
# Cleanup: Closing connections
# Cleanup: Removing temp files (LIFO order)

Callbacks execute in LIFO order, matching the natural cleanup sequence:

def deploy_application(config):
    with ExitStack() as stack:
        # Start services in order
        database = start_database(config)
        stack.callback(database.stop)
        
        cache = start_cache(config)
        stack.callback(cache.stop)
        
        app = start_app(config)
        stack.callback(app.stop)
        
        # Services stop in reverse order: app, cache, database
        yield app

Suppressing Exceptions with suppress

The suppress context manager catches and silences specific exceptions:

from contextlib import suppress
import os

# Remove file if it exists, ignore if it doesn't
with suppress(FileNotFoundError):
    os.remove('nonexistent_file.txt')

# Equivalent to:
try:
    os.remove('nonexistent_file.txt')
except FileNotFoundError:
    pass

Suppress multiple exception types:

def safe_parse(data):
    with suppress(ValueError, TypeError, KeyError):
        return int(data['value'])
    return None  # Return default if parsing fails

result = safe_parse({'value': 'invalid'})  # Returns None

Redirecting Output Streams

contextlib provides utilities for temporarily redirecting stdout and stderr:

from contextlib import redirect_stdout, redirect_stderr
from io import StringIO

def capture_output(func):
    stdout_capture = StringIO()
    stderr_capture = StringIO()
    
    with redirect_stdout(stdout_capture), redirect_stderr(stderr_capture):
        func()
    
    return stdout_capture.getvalue(), stderr_capture.getvalue()

def noisy_function():
    print("This goes to stdout")
    import sys
    print("This goes to stderr", file=sys.stderr)

stdout, stderr = capture_output(noisy_function)
print(f"Captured stdout: {stdout.strip()}")
print(f"Captured stderr: {stderr.strip()}")

Async Context Managers

For asynchronous code, use @asynccontextmanager:

from contextlib import asynccontextmanager
import asyncio
import aiohttp

@asynccontextmanager
async def async_timer(operation):
    start = asyncio.get_event_loop().time()
    try:
        yield
    finally:
        elapsed = asyncio.get_event_loop().time() - start
        print(f"{operation} took {elapsed:.2f}s")

@asynccontextmanager
async def http_session():
    session = aiohttp.ClientSession()
    try:
        yield session
    finally:
        await session.close()

async def fetch_data():
    async with async_timer("HTTP request"):
        async with http_session() as session:
            async with session.get('https://api.example.com') as response:
                return await response.text()

# asyncio.run(fetch_data())

AsyncExitStack provides async equivalents of ExitStack functionality:

from contextlib import AsyncExitStack

async def process_async_resources(urls):
    async with AsyncExitStack() as stack:
        # Enter multiple async context managers
        sessions = []
        for url in urls:
            session = await stack.enter_async_context(http_session())
            sessions.append(session)
        
        # Use all sessions
        tasks = [session.get(url) for session, url in zip(sessions, urls)]
        responses = await asyncio.gather(*tasks)
        
        # All sessions automatically closed

Practical Patterns

Combine context managers for complex resource management:

@contextmanager
def atomic_write(filepath):
    """Write to temporary file, rename on success."""
    temp_path = f"{filepath}.tmp"
    
    try:
        with open(temp_path, 'w') as f:
            yield f
        # Success - rename temp file to target
        os.replace(temp_path, filepath)
    except Exception:
        # Failure - remove temp file
        with suppress(FileNotFoundError):
            os.remove(temp_path)
        raise

with atomic_write('config.json') as f:
    f.write('{"setting": "value"}')
# File only updated if no exceptions occurred

Context managers ensure resources are properly managed, making code more reliable and maintainable. The contextlib module eliminates boilerplate while providing powerful composition patterns for complex resource scenarios.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.