Logging: Structured Logging Best Practices
At 3 AM, when your pager goes off and you're staring at a wall of text logs, the difference between structured and unstructured logging becomes painfully clear. With plain text logs, you're running...
Key Insights
- Structured logging transforms logs from text files into queryable data, enabling you to answer questions like “show me all failed payments for user X in the last hour” in seconds instead of hours.
- Every log entry needs five core fields: timestamp, level, message, correlation ID, and service name—without these, you’re just creating expensive noise.
- The biggest structured logging mistake isn’t technical—it’s logging everything at ERROR level and sensitive data at INFO level, which creates alert fatigue and compliance nightmares.
Why Structured Logging Matters
At 3 AM, when your pager goes off and you’re staring at a wall of text logs, the difference between structured and unstructured logging becomes painfully clear. With plain text logs, you’re running increasingly creative grep commands, piping through awk, and praying you can find the needle in the haystack before your SLA breach timer runs out.
Structured logging treats log entries as data, not strings. Each log is a set of key-value pairs that machines can parse, index, and query. This isn’t about making logs prettier—it’s about making them useful.
The benefits compound across your organization: developers debug faster, operations teams build meaningful dashboards, and security can actually audit what happened during an incident. If you’re running more than a handful of services, structured logging isn’t optional—it’s infrastructure.
Anatomy of a Structured Log Entry
Every structured log entry should contain five essential fields:
- timestamp: ISO 8601 format with timezone (preferably UTC)
- level: The severity of the event
- message: A human-readable description
- correlation_id: A unique identifier that ties related logs together
- service: The name of the service emitting the log
Here’s what the same event looks like in both formats:
# Unstructured
2024-01-15 14:23:45 ERROR PaymentService - Payment failed for order 12345, user john@example.com, amount $99.99, reason: card_declined
# Structured (JSON)
{
"timestamp": "2024-01-15T14:23:45.123Z",
"level": "ERROR",
"message": "Payment failed",
"service": "payment-service",
"correlation_id": "req-abc123",
"order_id": "12345",
"user_id": "usr-789",
"amount_cents": 9999,
"currency": "USD",
"failure_reason": "card_declined",
"trace_id": "4bf92f3577b34da6a3ce929d0e0e4736"
}
The structured version is longer, but now you can query for all card_declined failures in the last week, aggregate payment failures by reason, or find every log related to order_id: 12345 across all services.
JSON is the most common format because every log aggregator understands it. Logfmt (key=value key2=value2) is more human-readable and works well if you’re frequently reading logs directly, but JSON’s ubiquity usually wins.
Schema consistency matters more than format choice. If one service logs userId and another logs user_id, you’ve just made cross-service queries painful. Define a schema, document it, and enforce it through shared libraries.
Choosing the Right Log Levels
Log levels exist to help you filter signal from noise. Here’s how to use them:
- DEBUG: Detailed diagnostic information useful during development. Never enable in production by default.
- INFO: Normal operational events—service started, request completed, job finished.
- WARN: Something unexpected happened, but the system handled it. Worth investigating but not urgent.
- ERROR: Something failed and needs attention. A specific operation couldn’t complete.
- FATAL: The service is going down. Use sparingly—this should precede a crash.
The most common mistake is logging everything as ERROR. When everything is an error, nothing is. Your alerts become meaningless, and on-call engineers develop notification blindness.
Here’s a practical logging wrapper that enforces good practices:
import logging
from enum import Enum
from typing import Any, Optional
class LogDecision(Enum):
DEBUG = "debug"
INFO = "info"
WARN = "warn"
ERROR = "error"
def decide_log_level(
operation_succeeded: bool,
is_retryable: bool,
affects_user: bool,
is_expected: bool
) -> LogDecision:
"""
Decision tree for choosing appropriate log level.
Use this to maintain consistency across your codebase.
"""
if operation_succeeded:
return LogDecision.INFO
if is_expected:
# Expected failures (e.g., validation errors) are not ERRORs
return LogDecision.INFO if affects_user else LogDecision.DEBUG
if is_retryable:
# We'll try again, so it's a warning for now
return LogDecision.WARN
# Unexpected, non-retryable failure
return LogDecision.ERROR
# Usage
level = decide_log_level(
operation_succeeded=False,
is_retryable=True,
affects_user=True,
is_expected=False
)
# Returns WARN - we'll retry, so don't page anyone yet
Context Enrichment and Correlation
A single user request might touch ten services. Without correlation IDs, you’re manually piecing together logs by timestamp and hoping nothing ran concurrently.
The pattern is simple: generate a correlation ID at the edge, propagate it through all service calls, and include it in every log entry. Here’s middleware that handles this automatically:
import uuid
from contextvars import ContextVar
from functools import wraps
import structlog
# Context variables for request-scoped data
correlation_id_var: ContextVar[str] = ContextVar('correlation_id', default='')
user_id_var: ContextVar[str] = ContextVar('user_id', default='')
def get_context_processors():
"""Processors that inject context into every log entry."""
def add_correlation_context(logger, method_name, event_dict):
event_dict['correlation_id'] = correlation_id_var.get() or 'no-correlation-id'
if user_id := user_id_var.get():
event_dict['user_id'] = user_id
return event_dict
return [add_correlation_context]
# Flask middleware example
from flask import Flask, request, g
app = Flask(__name__)
@app.before_request
def inject_correlation_id():
# Use incoming header or generate new ID
corr_id = request.headers.get('X-Correlation-ID', str(uuid.uuid4()))
correlation_id_var.set(corr_id)
# Extract user from auth token if present
if user_id := extract_user_from_token(request):
user_id_var.set(user_id)
@app.after_request
def add_correlation_header(response):
response.headers['X-Correlation-ID'] = correlation_id_var.get()
return response
# Now every log automatically includes correlation_id
logger = structlog.get_logger()
logger.info("Processing payment", amount=9999)
# Output includes correlation_id without explicit passing
For async code, use contextvars (Python), AsyncLocalStorage (Node.js), or similar mechanisms that properly propagate across await boundaries.
What to Log (and What Not To)
Log at service boundaries: incoming requests, outgoing calls, and their results. This gives you visibility without drowning in noise.
Never log:
- Passwords or API keys
- Full credit card numbers
- Social Security numbers or government IDs
- Session tokens or JWTs
- PII without explicit business need
Build sanitization into your logging infrastructure:
import re
from typing import Any
SENSITIVE_PATTERNS = {
'password': re.compile(r'.*'), # Always redact
'api_key': re.compile(r'.*'),
'credit_card': re.compile(r'\d{13,19}'),
'ssn': re.compile(r'\d{3}-?\d{2}-?\d{4}'),
'email': re.compile(r'[^@]+@[^@]+\.[^@]+'),
}
SENSITIVE_KEYS = {'password', 'passwd', 'secret', 'api_key', 'apikey',
'token', 'credit_card', 'cc_number', 'ssn', 'authorization'}
def sanitize_log_data(data: dict[str, Any], redact_emails: bool = False) -> dict[str, Any]:
"""
Recursively sanitize sensitive fields from log data.
Call this before logging any user-provided or external data.
"""
sanitized = {}
for key, value in data.items():
key_lower = key.lower()
# Check if key itself indicates sensitive data
if any(sensitive in key_lower for sensitive in SENSITIVE_KEYS):
sanitized[key] = '[REDACTED]'
continue
# Recursively handle nested dicts
if isinstance(value, dict):
sanitized[key] = sanitize_log_data(value, redact_emails)
continue
# Check value patterns
if isinstance(value, str):
if key_lower == 'email' and redact_emails:
sanitized[key] = redact_email(value)
elif SENSITIVE_PATTERNS['credit_card'].match(value) and len(value) >= 13:
sanitized[key] = mask_card_number(value)
else:
sanitized[key] = value
else:
sanitized[key] = value
return sanitized
def mask_card_number(card: str) -> str:
digits = re.sub(r'\D', '', card)
if len(digits) >= 13:
return f"****{digits[-4:]}"
return '[REDACTED]'
def redact_email(email: str) -> str:
if '@' in email:
local, domain = email.rsplit('@', 1)
return f"{local[0]}***@{domain}"
return '[REDACTED]'
Performance Considerations
Logging is I/O, and I/O is slow. In hot paths, naive logging can become your bottleneck.
Use async logging with buffering. Most logging libraries support this—configure a buffer size and flush interval rather than writing synchronously on every log call.
Avoid string formatting for logs that won’t be emitted:
import structlog
from functools import lru_cache
logger = structlog.get_logger()
# BAD: This builds the string even if DEBUG is disabled
logger.debug(f"Processing items: {expensive_serialize(items)}")
# GOOD: Lazy evaluation - only compute if level is enabled
def log_debug_lazy(logger, message: str, **kwargs):
"""Only evaluate kwargs if debug logging is enabled."""
if logger.isEnabledFor(logging.DEBUG):
# Evaluate any callables in kwargs
evaluated = {
k: v() if callable(v) else v
for k, v in kwargs.items()
}
logger.debug(message, **evaluated)
# Usage
log_debug_lazy(
logger,
"Processing items",
item_summary=lambda: expensive_serialize(items)
)
For high-volume events, implement sampling:
import random
from functools import wraps
def sampled_log(sample_rate: float = 0.01):
"""Decorator to sample high-volume logs."""
def decorator(log_func):
@wraps(log_func)
def wrapper(*args, **kwargs):
if random.random() < sample_rate:
kwargs['_sampled'] = True
kwargs['_sample_rate'] = sample_rate
return log_func(*args, **kwargs)
return wrapper
return decorator
@sampled_log(sample_rate=0.01) # Log 1% of events
def log_cache_hit(key: str):
logger.debug("Cache hit", cache_key=key)
Integrating with Observability Tools
Logs are one pillar of observability, alongside metrics and traces. Connect them.
Here’s a logger configuration that outputs JSON with OpenTelemetry trace context:
import structlog
from opentelemetry import trace
def add_trace_context(logger, method_name, event_dict):
"""Inject OpenTelemetry trace context into logs."""
span = trace.get_current_span()
if span.is_recording():
ctx = span.get_span_context()
event_dict['trace_id'] = format(ctx.trace_id, '032x')
event_dict['span_id'] = format(ctx.span_id, '016x')
return event_dict
structlog.configure(
processors=[
structlog.stdlib.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
add_trace_context,
structlog.processors.JSONRenderer()
],
wrapper_class=structlog.stdlib.BoundLogger,
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
)
# Every log now includes trace_id and span_id
# Your observability platform can link logs to traces automatically
Configure your log shipper (Fluentd, Vector, or Filebeat) to parse JSON and forward to your aggregator. Set up alerts on patterns: error rate spikes, specific error messages, or unusual log volumes.
The goal is a single pane of glass where you can jump from an alert to relevant logs to the distributed trace, all connected by correlation and trace IDs.
Structured logging is foundational infrastructure. Get it right early, enforce consistency through shared libraries, and your future self—debugging at 3 AM—will thank you.