Apache Spark - Broadcast Variables Best Practices
Every Spark job faces the same fundamental challenge: how do you get reference data to the workers that need it? By default, Spark serializes any variables your tasks reference and ships them along...
Key Insights
- Broadcast variables eliminate redundant data transfer by shipping read-only data once per executor instead of once per task, making them essential for joins against small lookup tables and configuration data under 1GB.
- The most common mistake is capturing data through closures instead of explicit broadcasts, which silently ships data with every task and can tank performance by orders of magnitude.
- Always call
unpersist()when you’re done with a broadcast variable in long-running applications—Spark won’t automatically clean them up, and you’ll eventually run out of executor memory.
Introduction to Broadcast Variables
Every Spark job faces the same fundamental challenge: how do you get reference data to the workers that need it? By default, Spark serializes any variables your tasks reference and ships them along with each task. For a job with 10,000 tasks referencing a 100MB lookup table, that’s potentially 1TB of network transfer.
Broadcast variables solve this by shipping the data once to each executor, where it’s cached and reused across all tasks running on that node. Instead of 10,000 copies, you send maybe 100 (one per executor). The data must be read-only—Spark makes no attempt to synchronize changes across the cluster.
This isn’t just an optimization. For many workloads, the difference between broadcasting and not broadcasting is the difference between a job completing in minutes versus hours, or completing at all versus running out of memory.
When to Use Broadcast Variables
Broadcast variables shine in specific scenarios. Here’s when to reach for them:
Good candidates:
- Lookup tables for enrichment (country codes, product catalogs, user segments)
- Machine learning model weights for inference
- Configuration maps and feature flags
- Regex patterns or parsing rules
- Small dimension tables for star schema joins
Size guidelines:
- Under 100MB: Always broadcast, no questions asked
- 100MB - 1GB: Broadcast with monitoring; watch driver memory
- Over 1GB: Reconsider your approach; this is anti-pattern territory
Anti-patterns to avoid:
- Broadcasting data that changes during job execution
- Broadcasting large fact tables (use partitioning instead)
- Broadcasting data you only use in a single task
Here’s the performance difference in practice:
from pyspark.sql import SparkSession
from pyspark.sql.functions import broadcast
spark = SparkSession.builder.appName("BroadcastDemo").getOrCreate()
# Large fact table: 100 million rows
sales = spark.read.parquet("s3://data/sales/")
# Small dimension table: 10,000 rows
products = spark.read.parquet("s3://data/products/")
# Without broadcast hint - Spark may choose shuffle join
# This shuffles both datasets across the network
slow_result = sales.join(products, "product_id")
# With broadcast hint - forces broadcast hash join
# Only the small table crosses the network
fast_result = sales.join(broadcast(products), "product_id")
# Check the execution plan
fast_result.explain()
# == Physical Plan ==
# *(2) BroadcastHashJoin [product_id], [product_id], Inner, BuildRight
# :- *(2) ... (sales scan)
# +- BroadcastExchange HashedRelationBroadcastMode(List(product_id))
# +- *(1) ... (products scan)
The BroadcastHashJoin in the plan confirms Spark will ship the products table to all executors rather than shuffling both tables.
Creating and Using Broadcast Variables
The mechanics differ slightly between the DataFrame API (where Spark handles broadcasts implicitly) and the RDD API (where you manage them explicitly).
Explicit broadcast with RDDs and UDFs:
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
# Create the lookup data on the driver
country_codes = {
"US": "United States",
"GB": "United Kingdom",
"DE": "Germany",
"JP": "Japan"
}
# Broadcast it to all executors
broadcast_countries = sc.broadcast(country_codes)
# Access via .value in your transformations
def enrich_country(code):
return broadcast_countries.value.get(code, "Unknown")
# Use in RDD operations
rdd = sc.parallelize([("US", 100), ("GB", 200), ("XX", 50)])
enriched = rdd.map(lambda x: (enrich_country(x[0]), x[1]))
print(enriched.collect())
# [('United States', 100), ('United Kingdom', 200), ('Unknown', 50)]
Scala equivalent:
val countryCodes = Map(
"US" -> "United States",
"GB" -> "United Kingdom",
"DE" -> "Germany"
)
val broadcastCountries = spark.sparkContext.broadcast(countryCodes)
val rdd = spark.sparkContext.parallelize(Seq(("US", 100), ("GB", 200)))
val enriched = rdd.map { case (code, value) =>
(broadcastCountries.value.getOrElse(code, "Unknown"), value)
}
For streaming applications:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
# Broadcast lookup table for streaming enrichment
segment_rules = {
"high_value": lambda x: x > 1000,
"medium_value": lambda x: 100 < x <= 1000,
"low_value": lambda x: x <= 100
}
# For streaming, broadcast the serializable version
segment_thresholds = sc.broadcast({"high": 1000, "medium": 100})
@udf(StringType())
def classify_value(amount):
thresholds = segment_thresholds.value
if amount > thresholds["high"]:
return "high_value"
elif amount > thresholds["medium"]:
return "medium_value"
return "low_value"
# Apply to streaming DataFrame
streaming_df = spark.readStream.format("kafka").load()
enriched_stream = streaming_df.withColumn("segment", classify_value("amount"))
Memory Management and Sizing
Broadcast variables consume memory in three places: the driver (which holds the original), the network (during transfer), and each executor (cached copy). Misjudging size leads to OOM errors that can be tricky to diagnose.
Check size before broadcasting:
import sys
import pickle
def get_broadcast_size(data):
"""Estimate serialized size of data for broadcasting."""
# Pickle gives a reasonable estimate of serialized size
serialized = pickle.dumps(data)
size_mb = len(serialized) / (1024 * 1024)
return size_mb
# Before broadcasting
lookup_table = load_lookup_data()
size = get_broadcast_size(lookup_table)
print(f"Broadcast size: {size:.2f} MB")
if size > 500:
print("WARNING: Large broadcast may cause memory pressure")
if size > 1024:
raise ValueError("Broadcast too large - refactor to use join or partition")
# Proceed with broadcast
broadcast_lookup = sc.broadcast(lookup_table)
Monitor in Spark UI:
Navigate to the “Storage” tab in Spark UI to see broadcast variables. Look for entries prefixed with “broadcast_” showing size and memory usage per executor.
Driver memory configuration:
# Ensure driver has headroom for broadcasts
spark-submit \
--driver-memory 8g \
--conf spark.driver.maxResultSize=4g \
your_job.py
Common Pitfalls and Debugging
The most insidious broadcast problem is accidental closure capture. Your code looks fine, runs without errors, but performs terribly:
# WRONG: Closure capture - ships data with every task
lookup_dict = {"a": 1, "b": 2, "c": 3} # This gets serialized per task
def bad_enrichment(row):
return lookup_dict.get(row.key, 0) # Captures lookup_dict in closure
rdd.map(bad_enrichment) # Slow! Serializes lookup_dict 10,000 times
# RIGHT: Explicit broadcast - ships data once per executor
broadcast_lookup = sc.broadcast({"a": 1, "b": 2, "c": 3})
def good_enrichment(row):
return broadcast_lookup.value.get(row.key, 0)
rdd.map(good_enrichment) # Fast! Uses cached broadcast
Other common mistakes:
# WRONG: Modifying broadcast data (undefined behavior)
broadcast_data = sc.broadcast({"count": 0})
def bad_counter(x):
broadcast_data.value["count"] += 1 # Don't do this!
return x
# WRONG: Broadcasting non-serializable objects
import threading
lock = threading.Lock()
sc.broadcast(lock) # Fails: can't serialize Lock
# WRONG: Forgetting .value
broadcast_map = sc.broadcast({"key": "value"})
result = broadcast_map.get("key") # AttributeError!
result = broadcast_map.value.get("key") # Correct
Performance Optimization Techniques
Enable Kryo serialization for faster broadcast:
spark = SparkSession.builder \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.config("spark.kryo.registrationRequired", "false") \
.getOrCreate()
Force broadcast joins in SQL:
# Using SQL hint
spark.sql("""
SELECT /*+ BROADCAST(products) */
s.*, p.product_name
FROM sales s
JOIN products p ON s.product_id = p.product_id
""")
# Configure auto-broadcast threshold
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 100 * 1024 * 1024) # 100MB
Combining with accumulators for metrics:
broadcast_rules = sc.broadcast(validation_rules)
invalid_count = sc.accumulator(0)
def validate_and_count(row):
rules = broadcast_rules.value
if not rules.validate(row):
invalid_count.add(1)
return None
return row
results = rdd.map(validate_and_count).filter(lambda x: x is not None)
print(f"Invalid records: {invalid_count.value}")
Cleanup and Resource Management
Broadcast variables stick around until you explicitly remove them. In long-running applications like streaming jobs, this causes memory leaks:
# Manual cleanup
broadcast_data = sc.broadcast(large_dict)
# ... use it ...
# Remove from executor memory (lazy - happens on next GC)
broadcast_data.unpersist()
# Remove from driver and block manager (immediate, can't use after)
broadcast_data.destroy()
Automated cleanup pattern for streaming:
class BroadcastManager:
def __init__(self, sc):
self.sc = sc
self.current_broadcast = None
def update(self, new_data):
"""Replace broadcast with new data, cleaning up old."""
old_broadcast = self.current_broadcast
self.current_broadcast = self.sc.broadcast(new_data)
if old_broadcast is not None:
old_broadcast.unpersist()
return self.current_broadcast
# Usage in streaming
manager = BroadcastManager(sc)
def refresh_lookup():
new_data = fetch_latest_from_database()
manager.update(new_data)
# Refresh periodically
scheduler.schedule(refresh_lookup, interval=timedelta(hours=1))
Broadcast variables are one of Spark’s most powerful optimization tools, but they require deliberate management. Size them appropriately, broadcast explicitly rather than through closures, and clean up after yourself. Get these basics right, and you’ll avoid the most common performance traps in distributed data processing.