Data Pipelines: Stream and Batch Processing
Every data pipeline ultimately answers one question: how quickly does your business need to act on new information? If your fraud detection system can wait 24 hours to flag suspicious transactions,...
Key Insights
- Batch processing optimizes for throughput and cost efficiency on bounded datasets, while stream processing optimizes for latency on unbounded data—choose based on your actual business latency requirements, not perceived coolness factor.
- The Kappa architecture (stream-only) is increasingly viable for most use cases, but Lambda (batch + stream) still makes sense when you need complex historical reprocessing or cost-sensitive batch workloads.
- Data quality challenges multiply in streaming systems; invest heavily in schema registries, dead-letter queues, and idempotent processing from day one.
The Two Paradigms
Every data pipeline ultimately answers one question: how quickly does your business need to act on new information? If your fraud detection system can wait 24 hours to flag suspicious transactions, you have a very different problem than if you need sub-second alerts.
Batch processing treats data as finite, bounded datasets. You collect data over a period, process it all at once, and produce results. Stream processing treats data as infinite, unbounded sequences of events. You process each event (or small groups of events) as they arrive.
Neither is universally superior. I’ve watched teams waste months building real-time pipelines for reports that executives check once a week. I’ve also seen companies lose millions because their batch-only fraud detection couldn’t catch attackers fast enough.
Understanding both paradigms—and when to apply each—is foundational knowledge for any data engineer.
Batch Processing Deep Dive
Batch processing shines when you have bounded datasets and can tolerate latency measured in minutes to hours. The classic use cases remain compelling: nightly ETL jobs, weekly business reports, machine learning model training, and historical data backfills.
The economics favor batch processing when throughput matters more than latency. You can provision resources for a specific window, process aggressively, then shut everything down. Spot instances and preemptible VMs become viable. Your cost per record processed drops significantly.
Batch also simplifies reasoning about data. When you know the complete dataset upfront, operations like global sorting, exact distinct counts, and complex joins become straightforward. You don’t need to worry about late-arriving data or out-of-order events.
Here’s a typical Spark batch job processing daily log files:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, avg, window
spark = SparkSession.builder \
.appName("DailyLogAnalytics") \
.config("spark.sql.adaptive.enabled", "true") \
.getOrCreate()
# Read yesterday's log files from S3
logs_df = spark.read.json("s3://data-lake/logs/dt=2024-01-15/")
# Business logic: calculate hourly metrics per endpoint
hourly_metrics = logs_df \
.filter(col("status_code").between(200, 299)) \
.groupBy(
window(col("timestamp"), "1 hour"),
col("endpoint")
) \
.agg(
count("*").alias("request_count"),
avg("response_time_ms").alias("avg_latency"),
avg("bytes_sent").alias("avg_response_size")
) \
.orderBy("window", "endpoint")
# Write to analytics warehouse
hourly_metrics.write \
.mode("overwrite") \
.partitionBy("window") \
.parquet("s3://analytics-warehouse/hourly_endpoint_metrics/")
spark.stop()
This job runs once daily, processes all of yesterday’s data, and overwrites the output partition. Simple, predictable, and easy to debug when things go wrong.
Stream Processing Deep Dive
Stream processing becomes necessary when business value decays rapidly with latency. Real-time fraud detection, live dashboards, IoT sensor monitoring, and personalization engines all demand sub-second or sub-minute responses.
The mental model shifts from “process this dataset” to “react to this event.” You’re building event-driven systems where each message triggers computation. This introduces complexity around windowing (how do you aggregate over time when time never stops?), watermarks (how do you know when you’ve seen all events for a window?), and state management (where do you store intermediate results?).
Here’s a Flink pipeline consuming clickstream events from Kafka:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
public class ClickstreamAnalytics {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000); // Checkpoint every minute
Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", "kafka:9092");
kafkaProps.setProperty("group.id", "clickstream-analytics");
FlinkKafkaConsumer<ClickEvent> consumer = new FlinkKafkaConsumer<>(
"clickstream-events",
new ClickEventDeserializer(),
kafkaProps
);
consumer.assignTimestampsAndWatermarks(
WatermarkStrategy.<ClickEvent>forBoundedOutOfOrderness(Duration.ofSeconds(30))
.withTimestampAssigner((event, timestamp) -> event.getEventTime())
);
DataStream<ClickEvent> clickStream = env.addSource(consumer);
// Real-time session analysis with 30-minute session gaps
clickStream
.keyBy(ClickEvent::getUserId)
.window(EventTimeSessionWindows.withGap(Time.minutes(30)))
.aggregate(new SessionAggregator())
.addSink(new SessionMetricsSink());
// Sliding window for trending pages (5-min windows, sliding every 1 min)
clickStream
.keyBy(ClickEvent::getPageId)
.window(SlidingEventTimeWindows.of(Time.minutes(5), Time.minutes(1)))
.aggregate(new PageViewCounter())
.addSink(new TrendingPagesSink());
env.execute("Clickstream Real-time Analytics");
}
}
Notice the watermark strategy handling events up to 30 seconds late. This is the fundamental tension in stream processing: wait longer for late data and increase latency, or process faster and potentially miss events.
Architectural Patterns
The Lambda architecture, proposed by Nathan Marz, runs batch and stream processing in parallel. The batch layer provides accurate, complete results with higher latency. The speed layer provides approximate, fast results. A serving layer merges both views.
┌─────────────────┐
│ Data Source │
└────────┬────────┘
│
┌──────────────┴──────────────┐
▼ ▼
┌─────────────────┐ ┌─────────────────┐
│ Batch Layer │ │ Speed Layer │
│ (Spark/Hadoop) │ │ (Flink/Kafka) │
└────────┬────────┘ └────────┬────────┘
│ │
▼ ▼
┌─────────────────┐ ┌─────────────────┐
│ Batch Views │ │ Real-time Views│
│ (Complete/Slow)│ │ (Partial/Fast) │
└────────┬────────┘ └────────┬────────┘
│ │
└──────────────┬───────────────┘
▼
┌─────────────────┐
│ Serving Layer │
│ (Merge Views) │
└─────────────────┘
The Kappa architecture simplifies this by using only stream processing. Historical reprocessing happens by replaying events from the log (Kafka with long retention). You maintain one codebase, one processing paradigm.
# Shared business logic for both batch and stream
class MetricsCalculator:
@staticmethod
def calculate_session_metrics(events: List[ClickEvent]) -> SessionMetrics:
"""Core logic used by both batch and stream processing."""
total_duration = events[-1].timestamp - events[0].timestamp
page_views = len(events)
unique_pages = len(set(e.page_id for e in events))
return SessionMetrics(
duration_seconds=total_duration.total_seconds(),
page_views=page_views,
unique_pages=unique_pages,
bounce=(page_views == 1)
)
# Stream processing uses this directly on windows
# Batch processing uses this on grouped/sorted data
# Same logic, same results, one codebase
My recommendation: start with Kappa unless you have specific requirements that demand Lambda. Those requirements typically include complex historical reprocessing that’s impractical to replay, or significant cost savings from batch-only processing of historical data.
Data Quality and Reliability
Streaming systems amplify data quality challenges. Late-arriving data, duplicate events, schema changes, and processing failures all become harder to handle when data flows continuously.
Implement idempotent writes from day one. Every sink should handle duplicate deliveries gracefully:
class IdempotentPostgresSink:
def __init__(self, connection_string: str):
self.conn = psycopg2.connect(connection_string)
def write_metrics(self, metrics: SessionMetrics):
"""Upsert pattern ensures idempotency."""
with self.conn.cursor() as cur:
cur.execute("""
INSERT INTO session_metrics
(session_id, user_id, duration_seconds, page_views, processed_at)
VALUES (%s, %s, %s, %s, %s)
ON CONFLICT (session_id) DO UPDATE SET
duration_seconds = EXCLUDED.duration_seconds,
page_views = EXCLUDED.page_views,
processed_at = EXCLUDED.processed_at
WHERE session_metrics.processed_at < EXCLUDED.processed_at
""", (
metrics.session_id,
metrics.user_id,
metrics.duration_seconds,
metrics.page_views,
datetime.utcnow()
))
self.conn.commit()
class DeadLetterQueue:
def __init__(self, kafka_producer, dlq_topic: str):
self.producer = kafka_producer
self.dlq_topic = dlq_topic
def send_to_dlq(self, failed_event: dict, error: Exception):
"""Route unprocessable events for later analysis."""
dlq_message = {
"original_event": failed_event,
"error_type": type(error).__name__,
"error_message": str(error),
"failed_at": datetime.utcnow().isoformat(),
"pipeline_version": os.environ.get("PIPELINE_VERSION", "unknown")
}
self.producer.send(self.dlq_topic, value=dlq_message)
Use a schema registry (Confluent Schema Registry, AWS Glue Schema Registry) to enforce compatibility. Reject events that don’t match expected schemas before they poison your pipeline.
Technology Landscape
The modern data stack offers excellent options across the spectrum:
Apache Spark: The workhorse for batch processing. Structured Streaming brings unified batch/stream APIs. Best for teams already invested in the Spark ecosystem.
Apache Flink: Purpose-built for stream processing with excellent exactly-once semantics. Superior for complex event processing and stateful streaming.
Apache Kafka Streams: Lightweight stream processing as a library. No separate cluster needed. Ideal for simpler transformations within Kafka-centric architectures.
Apache Beam: Write-once, run-anywhere abstraction. Useful if you need portability across runners (Dataflow, Flink, Spark).
Here’s the same aggregation logic in Spark Structured Streaming versus batch mode:
# Unified API - same code, different execution modes
def build_hourly_aggregation(df):
return df \
.withWatermark("event_time", "10 minutes") \
.groupBy(
window(col("event_time"), "1 hour"),
col("endpoint")
) \
.agg(count("*").alias("requests"))
# Batch execution
batch_df = spark.read.json("s3://logs/dt=2024-01-15/")
result = build_hourly_aggregation(batch_df)
result.write.parquet("s3://output/batch/")
# Streaming execution
stream_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "log-events") \
.load()
result = build_hourly_aggregation(stream_df)
result.writeStream \
.format("parquet") \
.option("path", "s3://output/streaming/") \
.option("checkpointLocation", "s3://checkpoints/") \
.start()
Practical Guidelines
Use this decision framework:
Choose batch when:
- Latency requirements exceed 15 minutes
- You need complex global operations (full sorts, exact distinct counts)
- Cost optimization is critical and workloads are predictable
- Data sources are naturally batched (daily file drops, weekly exports)
Choose streaming when:
- Latency requirements are under 5 minutes
- Events trigger immediate business actions
- Data arrives continuously with no natural boundaries
- You need real-time dashboards or alerting
Consider hybrid when:
- Different consumers have different latency needs
- Historical reprocessing requirements are complex
- You’re migrating incrementally from batch to stream
Start with the simplest architecture that meets requirements. You can always add complexity later; removing it is much harder.