Spark Streaming - Join Streaming with Static Data

• Joining streaming data with static reference data is essential for enrichment scenarios like adding customer details, product catalogs, or configuration lookups to real-time events

Key Insights

• Joining streaming data with static reference data is essential for enrichment scenarios like adding customer details, product catalogs, or configuration lookups to real-time events • Static DataFrames in Spark Structured Streaming are broadcast to executors, enabling efficient lookups without shuffle operations when the reference data fits in memory • Managing stale reference data requires implementing reload strategies using foreachBatch or external triggers to refresh the static DataFrame periodically

Understanding Stream-Static Joins

Stream-static joins combine real-time event streams with slowly-changing reference data. Unlike stream-stream joins that require watermarking and state management, stream-static joins are stateless operations from Spark’s perspective. The streaming DataFrame maintains its incremental processing model while the static DataFrame acts as a lookup table.

The most common use case involves enriching transaction events with dimension tables. Consider an e-commerce system where purchase events need customer demographic data, or IoT sensors requiring device metadata. The static data typically comes from databases, files, or data warehouses and changes infrequently compared to the streaming rate.

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, current_timestamp
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

spark = SparkSession.builder \
    .appName("StreamStaticJoin") \
    .config("spark.sql.shuffle.partitions", "4") \
    .getOrCreate()

# Define schema for streaming purchases
purchase_schema = StructType([
    StructField("transaction_id", StringType(), False),
    StructField("customer_id", StringType(), False),
    StructField("product_id", StringType(), False),
    StructField("amount", DoubleType(), False),
    StructField("timestamp", StringType(), False)
])

# Read streaming data from Kafka
streaming_purchases = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "purchases") \
    .load() \
    .selectExpr("CAST(value AS STRING) as json_data") \
    .select(from_json(col("json_data"), purchase_schema).alias("data")) \
    .select("data.*")

Loading and Joining Static Reference Data

Static DataFrames are loaded using standard Spark read operations. The join operation looks identical to batch joins, but Spark’s execution engine handles it differently for streaming queries.

# Load customer dimension table (static data)
customer_dim = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:5432/warehouse") \
    .option("dbtable", "customers") \
    .option("user", "spark_user") \
    .option("password", "password") \
    .load()

# Load product catalog (static data from Parquet)
product_catalog = spark.read \
    .parquet("s3://data-lake/reference/products/")

# Perform stream-static joins
enriched_stream = streaming_purchases \
    .join(customer_dim, "customer_id", "left") \
    .join(product_catalog, "product_id", "left") \
    .select(
        col("transaction_id"),
        col("customer_id"),
        col("customer_name"),
        col("customer_tier"),
        col("product_id"),
        col("product_name"),
        col("category"),
        col("amount"),
        col("timestamp")
    )

# Write enriched stream to output
query = enriched_stream \
    .writeStream \
    .format("parquet") \
    .option("path", "s3://output/enriched-purchases/") \
    .option("checkpointLocation", "s3://checkpoints/enriched-purchases/") \
    .outputMode("append") \
    .start()

Handling Static Data Refreshes

The primary challenge with stream-static joins is keeping reference data current. Static DataFrames are loaded once when the streaming query starts. For data that changes hourly or daily, you need refresh mechanisms.

The foreachBatch approach provides the most control, allowing you to reload static data for each micro-batch:

def process_batch_with_refresh(batch_df, batch_id):
    """Process each micro-batch with fresh reference data"""
    
    # Reload static data for this batch
    current_customers = spark.read \
        .format("jdbc") \
        .option("url", "jdbc:postgresql://localhost:5432/warehouse") \
        .option("dbtable", "customers") \
        .option("user", "spark_user") \
        .option("password", "password") \
        .load()
    
    current_products = spark.read \
        .parquet("s3://data-lake/reference/products/")
    
    # Join and write
    enriched = batch_df \
        .join(current_customers, "customer_id", "left") \
        .join(current_products, "product_id", "left")
    
    enriched.write \
        .format("parquet") \
        .mode("append") \
        .save("s3://output/enriched-purchases/")

# Apply foreachBatch
query = streaming_purchases \
    .writeStream \
    .foreachBatch(process_batch_with_refresh) \
    .option("checkpointLocation", "s3://checkpoints/enriched-purchases/") \
    .start()

For less frequent updates, implement a scheduled reload using broadcast variables:

from pyspark.sql.functions import broadcast
import time
from threading import Thread

class StaticDataManager:
    def __init__(self, spark):
        self.spark = spark
        self.customer_data = None
        self.product_data = None
        self.load_static_data()
    
    def load_static_data(self):
        """Load or reload static reference data"""
        self.customer_data = self.spark.read \
            .format("jdbc") \
            .option("url", "jdbc:postgresql://localhost:5432/warehouse") \
            .option("dbtable", "customers") \
            .option("user", "spark_user") \
            .option("password", "password") \
            .load() \
            .cache()
        
        self.product_data = self.spark.read \
            .parquet("s3://data-lake/reference/products/") \
            .cache()
        
        print(f"Static data loaded at {time.time()}")
    
    def refresh_loop(self, interval_seconds=3600):
        """Background thread to refresh data periodically"""
        while True:
            time.sleep(interval_seconds)
            self.load_static_data()

# Initialize manager
data_manager = StaticDataManager(spark)

# Start background refresh (every hour)
refresh_thread = Thread(target=data_manager.refresh_loop, args=(3600,))
refresh_thread.daemon = True
refresh_thread.start()

# Use in streaming query
def enrich_batch(batch_df, batch_id):
    enriched = batch_df \
        .join(broadcast(data_manager.customer_data), "customer_id", "left") \
        .join(broadcast(data_manager.product_data), "product_id", "left")
    
    enriched.write.format("parquet").mode("append") \
        .save("s3://output/enriched-purchases/")

query = streaming_purchases \
    .writeStream \
    .foreachBatch(enrich_batch) \
    .option("checkpointLocation", "s3://checkpoints/enriched-purchases/") \
    .start()

Performance Optimization Strategies

Broadcast joins are critical for performance when the static dataset fits in driver and executor memory. Spark automatically broadcasts DataFrames under 10MB (configurable via spark.sql.autoBroadcastJoinThreshold), but explicit broadcasting ensures optimal execution:

from pyspark.sql.functions import broadcast

# Explicit broadcast for small dimension tables
enriched_stream = streaming_purchases \
    .join(broadcast(customer_dim), "customer_id", "left") \
    .join(broadcast(product_catalog), "product_id", "left")

For larger reference datasets that don’t fit in memory, partition both streaming and static data on the join key:

# Partition static data by join key
customer_dim.write \
    .partitionBy("customer_id") \
    .parquet("s3://reference/customers-partitioned/")

# Read partitioned static data
partitioned_customers = spark.read \
    .parquet("s3://reference/customers-partitioned/")

# Configure shuffle partitions based on data volume
spark.conf.set("spark.sql.shuffle.partitions", "200")

enriched_stream = streaming_purchases \
    .join(partitioned_customers, "customer_id", "left")

Monitoring and Troubleshooting

Track join effectiveness and performance through Spark’s streaming metrics:

# Access streaming query metrics
query.lastProgress  # Latest micro-batch statistics
query.status        # Current query state

# Monitor null rates after joins (indicates missing reference data)
enriched_stream \
    .writeStream \
    .foreachBatch(lambda df, id: 
        print(f"Batch {id}: Null customer names: {df.filter(col('customer_name').isNull()).count()}")
    ) \
    .start()

Common issues include memory pressure from large static DataFrames and stale reference data causing incorrect enrichment. Use caching judiciously and implement alerting on null rates that exceed expected thresholds. For production systems, consider Delta Lake or Iceberg for static data sources, enabling time-travel queries and ACID guarantees during reference data updates.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.