How to Use Broadcast Joins in PySpark

Joins are the most expensive operations in distributed data processing. When you join two large DataFrames in PySpark, Spark must shuffle data across the network so that matching keys end up on the...

Key Insights

  • Broadcast joins eliminate expensive shuffle operations by replicating small tables to all worker nodes, often delivering 10x or greater performance improvements for joins between large and small datasets.
  • Use broadcast joins when your smaller table fits comfortably in executor memory (default threshold is 10MB, but you can safely increase this to 100MB-1GB depending on your cluster configuration).
  • Always verify your broadcast join is actually executing by checking the query plan with .explain()—Spark’s optimizer sometimes makes unexpected decisions.

The Cost of Joins in Distributed Systems

Joins are the most expensive operations in distributed data processing. When you join two large DataFrames in PySpark, Spark must shuffle data across the network so that matching keys end up on the same partition. This shuffle operation involves serializing data, writing to disk, transferring across the network, and deserializing on the receiving end.

For a standard sort-merge join between two tables with millions of rows each, you’re looking at shuffling both datasets entirely. The network I/O alone can dominate your job’s runtime, and if you’re paying for cloud compute, it’s dominating your bill too.

Broadcast joins solve this problem for a specific but common scenario: when one of your tables is small enough to fit in memory on each executor.

What is a Broadcast Join?

A broadcast join works by copying the entire smaller DataFrame to every worker node in your cluster. Once the small table exists locally on each executor, Spark can join it with partitions of the large table without any shuffle.

Here’s the mental model: instead of moving mountain-sized data around, you photocopy the small lookup table and hand a copy to every worker. Each worker then joins their local partition of the large table against their local copy of the small table.

Let’s see the difference in execution plans:

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.appName("BroadcastJoinDemo").getOrCreate()

# Create sample DataFrames
large_df = spark.range(10000000).withColumn("category_id", (F.col("id") % 100).cast("int"))
small_df = spark.range(100).withColumnRenamed("id", "category_id").withColumn("category_name", F.concat(F.lit("Category_"), F.col("category_id")))

# Standard join (will use SortMergeJoin for large tables)
standard_join = large_df.join(small_df, "category_id")
print("=== Standard Join Plan ===")
standard_join.explain()

# Broadcast join
broadcast_join = large_df.join(F.broadcast(small_df), "category_id")
print("=== Broadcast Join Plan ===")
broadcast_join.explain()

The standard join plan shows SortMergeJoin with Exchange nodes (shuffles) on both inputs. The broadcast join plan shows BroadcastHashJoin with BroadcastExchange only on the small table—no shuffle on the large table.

When to Use Broadcast Joins

The decision to broadcast comes down to three factors: table size, available memory, and join patterns.

Size thresholds: Spark’s default autoBroadcastJoinThreshold is 10MB. This is conservative. On modern clusters with executors running 4-16GB of memory, you can comfortably broadcast tables up to 100MB-500MB. I’ve seen production workloads broadcast 1GB tables successfully, though you need to account for serialization overhead (broadcast data typically expands 2-4x in memory).

Ideal use cases:

  • Dimension tables in star schemas (products, customers, dates)
  • Lookup tables for code-to-description mappings
  • Configuration or reference data
  • Filtering joins where you’re semi-joining against a whitelist

When to avoid broadcast joins:

  • When the “small” table is actually large (hundreds of MB or more without sufficient memory)
  • When the small table will grow over time and eventually exceed memory limits
  • When you’re joining the same large table multiple times with different broadcast tables (memory pressure compounds)
  • When data is heavily skewed in the broadcast table and you’d benefit from other optimizations

Implementing Broadcast Joins

You have three ways to trigger broadcast joins in PySpark.

Method 1: Explicit broadcast() Hint

This is the most explicit and my preferred approach for production code:

from pyspark.sql import functions as F

# Explicit broadcast hint
result = orders_df.join(
    F.broadcast(products_df),
    orders_df.product_id == products_df.id,
    "left"
)

The F.broadcast() function wraps your DataFrame and tells Spark’s optimizer to broadcast it regardless of size. This overrides the automatic threshold check.

Method 2: SQL Hint Syntax

If you’re using Spark SQL, use the broadcast hint in your query:

# Register DataFrames as temp views
large_df.createOrReplaceTempView("orders")
small_df.createOrReplaceTempView("products")

# SQL with broadcast hint
result = spark.sql("""
    SELECT /*+ BROADCAST(products) */ 
        o.*, 
        p.product_name, 
        p.category
    FROM orders o
    LEFT JOIN products p ON o.product_id = p.id
""")

You can also use BROADCASTJOIN or MAPJOIN as aliases—they’re all equivalent.

Method 3: Automatic Broadcasting via Threshold

Configure the threshold to let Spark decide automatically:

# Set threshold to 100MB (in bytes)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 100 * 1024 * 1024)

# Now any table under 100MB will be automatically broadcast
result = large_df.join(small_df, "key_column")

# Disable automatic broadcasting entirely
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

I recommend using explicit hints in production code rather than relying on automatic broadcasting. The threshold check uses table statistics, which can be stale or unavailable, leading to inconsistent behavior.

Monitoring and Verification

Never assume your broadcast join is working—verify it. The execution plan tells you exactly what Spark will do.

# Detailed execution plan
result = large_df.join(F.broadcast(small_df), "category_id")
result.explain(True)

Here’s what to look for in the output:

== Physical Plan ==
*(2) Project [id#0L, category_id#1, category_name#5]
+- *(2) BroadcastHashJoin [category_id#1], [category_id#3], Inner, BuildRight, false
   :- *(2) Project [id#0L, category_id#1]
   :  +- *(2) Range (0, 10000000, step=1, splits=8)
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#42]
      +- *(1) Range (0, 100, step=1, splits=8)

The key indicators are:

  • BroadcastHashJoin instead of SortMergeJoin
  • BroadcastExchange on the small table
  • No Exchange (shuffle) on the large table

If you see SortMergeJoin when you expected broadcast, check these common causes:

  1. Table exceeds threshold: Your table statistics show it’s larger than the broadcast threshold
  2. Missing statistics: Run ANALYZE TABLE to compute statistics
  3. Join type incompatibility: Some join types don’t support broadcasting
  4. Hint ignored: Spark may ignore hints in complex query plans
# Force statistics computation
spark.sql("ANALYZE TABLE products COMPUTE STATISTICS")

# Check current threshold
print(spark.conf.get("spark.sql.autoBroadcastJoinThreshold"))

Performance Benchmarks and Best Practices

Let’s quantify the performance difference:

import time

def benchmark_join(join_df, name, iterations=3):
    times = []
    for _ in range(iterations):
        start = time.time()
        join_df.count()  # Force evaluation
        times.append(time.time() - start)
    avg_time = sum(times) / len(times)
    print(f"{name}: {avg_time:.2f}s average over {iterations} runs")
    return avg_time

# Create test data
large_df = spark.range(50000000).withColumn("key", (F.col("id") % 1000).cast("int"))
small_df = spark.range(1000).withColumnRenamed("id", "key").withColumn("value", F.rand())

# Cache the small DataFrame for fair comparison
small_df.cache().count()

# Benchmark both approaches
standard_time = benchmark_join(
    large_df.join(small_df, "key"),
    "Sort-Merge Join"
)

broadcast_time = benchmark_join(
    large_df.join(F.broadcast(small_df), "key"),
    "Broadcast Join"
)

print(f"Speedup: {standard_time / broadcast_time:.1f}x")

On a typical cluster, you’ll see 5-20x speedup depending on network bandwidth and cluster size.

Best practices to maximize broadcast join performance:

  1. Cache broadcast tables: If you’re joining the same small table multiple times, cache it first. The broadcast will reuse the cached data.
lookup_df.cache().count()  # Materialize cache
result1 = table1.join(F.broadcast(lookup_df), "key")
result2 = table2.join(F.broadcast(lookup_df), "key")
  1. Filter before broadcasting: Reduce the broadcast table size by filtering unnecessary rows and columns first.
# Bad: Broadcasting entire table
result = orders.join(F.broadcast(products), "product_id")

# Good: Broadcasting only needed columns
products_slim = products.select("product_id", "name", "price")
result = orders.join(F.broadcast(products_slim), "product_id")
  1. Monitor executor memory: Watch for OOM errors in the Spark UI. If executors are dying during broadcast, reduce the broadcast threshold or increase executor memory.

  2. Set explicit timeouts: Large broadcasts can timeout. Adjust if needed:

spark.conf.set("spark.sql.broadcastTimeout", 600)  # 10 minutes

Conclusion

Broadcast joins are one of the highest-impact optimizations in your PySpark toolkit. The pattern is straightforward: identify small tables in your joins, verify they fit in executor memory, and apply the broadcast hint.

Start by auditing your existing jobs with .explain(). Look for SortMergeJoin operations where one side is a dimension or lookup table. Add explicit F.broadcast() hints and measure the improvement. For most star-schema workloads, you’ll find several joins that benefit immediately.

The explicit hint approach keeps your optimization intentions visible in code, making it easier for future maintainers to understand why certain joins perform well—and to adjust when data volumes change.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.