How to Join DataFrames in PySpark

Joining DataFrames is fundamental to any data pipeline. Whether you're enriching transaction records with customer details, combining log data with reference tables, or building feature sets for...

Key Insights

  • PySpark supports seven join types, but most real-world scenarios use just three: inner, left outer, and left anti joins
  • Broadcast joins can speed up operations by 10-100x when one DataFrame is small enough to fit in executor memory (typically under 10MB)
  • Always handle duplicate column names explicitly before joining—debugging ambiguous column references in production is painful and avoidable

Introduction

Joining DataFrames is fundamental to any data pipeline. Whether you’re enriching transaction records with customer details, combining log data with reference tables, or building feature sets for machine learning, you’ll write joins constantly.

PySpark joins work differently than their pandas counterparts. In a distributed environment, joins trigger data shuffles across the cluster—moving data between executors to match keys. Understanding this has real performance implications. A poorly written join can turn a 5-minute job into a 2-hour nightmare.

This guide covers the practical join operations you’ll use daily, with code you can adapt directly for your pipelines.

Setting Up Sample DataFrames

Let’s create two DataFrames that represent a common scenario: employees and their departments.

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

spark = SparkSession.builder.appName("JoinExamples").getOrCreate()

# Employee data
employees_data = [
    (1, "Alice", 101),
    (2, "Bob", 102),
    (3, "Charlie", 101),
    (4, "Diana", 103),
    (5, "Eve", None)  # No department assigned
]

employees = spark.createDataFrame(
    employees_data, 
    ["emp_id", "name", "dept_id"]
)

# Department data
departments_data = [
    (101, "Engineering", "Building A"),
    (102, "Marketing", "Building B"),
    (104, "Sales", "Building C")  # No employees in this dept
]

departments = spark.createDataFrame(
    departments_data,
    ["dept_id", "dept_name", "location"]
)

Notice the intentional mismatches: Eve has no department (null), Diana’s department 103 doesn’t exist in the departments table, and Sales (104) has no employees. These edge cases reveal how different join types behave.

Inner Join

Inner joins return only rows where the key exists in both DataFrames. This is PySpark’s default join behavior.

# Simple syntax when column names match
inner_result = employees.join(departments, on="dept_id")
inner_result.show()

Output:

+-------+------+-------+-----------+----------+
|dept_id|emp_id|   name|  dept_name|  location|
+-------+------+-------+-----------+----------+
|    101|     1|  Alice|Engineering|Building A|
|    101|     3|Charlie|Engineering|Building A|
|    102|     2|    Bob|  Marketing|Building B|
+-------+------+-------+-----------+----------+

Eve (null dept_id) and Diana (dept_id 103) are excluded because their keys don’t match any department. Sales (104) doesn’t appear because no employee has that dept_id.

When column names differ or you need more complex conditions, use explicit expressions:

# Explicit join condition
result = employees.join(
    departments, 
    employees.dept_id == departments.dept_id
)

# Multiple conditions
result = employees.join(
    departments,
    (employees.dept_id == departments.dept_id) & 
    (employees.name != "Bob")
)

Outer Joins (Left, Right, Full)

Outer joins preserve rows even when no match exists, filling missing values with nulls.

Left outer join keeps all rows from the left DataFrame:

left_result = employees.join(departments, on="dept_id", how="left")
left_result.show()

Output:

+-------+------+-------+-----------+----------+
|dept_id|emp_id|   name|  dept_name|  location|
+-------+------+-------+-----------+----------+
|    101|     1|  Alice|Engineering|Building A|
|    102|     2|    Bob|  Marketing|Building B|
|    101|     3|Charlie|Engineering|Building A|
|    103|     4|  Diana|       null|      null|
|   null|     5|    Eve|       null|      null|
+-------+------+-------+-----------+----------+

Diana and Eve now appear with null department information. This is useful when you need all employees regardless of whether their department exists.

Right outer join keeps all rows from the right DataFrame:

right_result = employees.join(departments, on="dept_id", how="right")
right_result.show()

Output:

+-------+------+-------+-----------+----------+
|dept_id|emp_id|   name|  dept_name|  location|
+-------+------+-------+-----------+----------+
|    101|     1|  Alice|Engineering|Building A|
|    101|     3|Charlie|Engineering|Building A|
|    102|     2|    Bob|  Marketing|Building B|
|    104|  null|   null|      Sales|Building C|
+-------+------+-------+-----------+----------+

Sales appears with null employee data. Use this to find departments without employees.

Full outer join preserves all rows from both sides:

full_result = employees.join(departments, on="dept_id", how="outer")
full_result.show()

This returns every employee and every department, with nulls where matches don’t exist. Full outer joins are less common but useful for data reconciliation tasks.

Semi and Anti Joins

These filtering joins are underused but incredibly practical. They filter the left DataFrame based on key existence in the right DataFrame, without returning any columns from the right side.

Left semi join returns left rows where the key exists in the right DataFrame:

# Employees who have a valid department
semi_result = employees.join(departments, on="dept_id", how="left_semi")
semi_result.show()

Output:

+------+-------+-------+
|emp_id|   name|dept_id|
+------+-------+-------+
|     1|  Alice|    101|
|     2|    Bob|    102|
|     3|Charlie|    101|
+------+-------+-------+

Notice no department columns appear—just filtered employees. This is equivalent to WHERE EXISTS in SQL and often faster than an inner join followed by dropping columns.

Left anti join returns left rows where the key does NOT exist in the right DataFrame:

# Employees without a valid department
anti_result = employees.join(departments, on="dept_id", how="left_anti")
anti_result.show()

Output:

+------+-----+-------+
|emp_id| name|dept_id|
+------+-----+-------+
|     4|Diana|    103|
|     5|  Eve|   null|
+------+-----+-------+

This finds orphaned records—employees assigned to non-existent departments. Anti joins are essential for data quality checks and finding missing references.

Handling Common Pitfalls

Duplicate Column Names

When join columns have the same name and you use explicit conditions, both columns appear in the result:

# This creates duplicate dept_id columns
bad_result = employees.join(
    departments,
    employees.dept_id == departments.dept_id
)
# Subsequent operations on dept_id will fail with ambiguous reference

Fix this by using the on parameter (which automatically deduplicates) or by renaming columns before joining:

# Option 1: Use 'on' parameter
good_result = employees.join(departments, on="dept_id")

# Option 2: Rename before joining
departments_renamed = departments.withColumnRenamed("dept_id", "d_dept_id")
result = employees.join(
    departments_renamed,
    employees.dept_id == departments_renamed.d_dept_id
).drop("d_dept_id")

# Option 3: Drop duplicate after join
result = employees.join(
    departments,
    employees.dept_id == departments.dept_id
).drop(departments.dept_id)

Broadcast Joins for Small Tables

When one DataFrame is small (rule of thumb: under 10MB), broadcast it to avoid shuffles:

from pyspark.sql.functions import broadcast

# Force broadcast of the smaller DataFrame
result = employees.join(broadcast(departments), on="dept_id")

Broadcasting copies the small DataFrame to every executor, eliminating the need to shuffle the large DataFrame. This can dramatically improve performance.

# Check if auto-broadcast is happening
spark.conf.get("spark.sql.autoBroadcastJoinThreshold")  # Default: 10MB

Performance Considerations

Partition alignment matters. If both DataFrames are partitioned by the join key, Spark can perform the join without shuffling data. Consider repartitioning before multiple joins on the same key:

# Repartition once, join multiple times efficiently
employees_partitioned = employees.repartition(200, "dept_id")

Filter early. Apply filters before joins to reduce data volume:

# Good: filter first
active_employees = employees.filter(employees.status == "active")
result = active_employees.join(departments, on="dept_id")

# Bad: join everything, then filter
result = employees.join(departments, on="dept_id").filter(...)

Watch for skewed keys. If one key value appears millions of times while others appear rarely, that partition becomes a bottleneck. Consider salting techniques or adaptive query execution (AQE) in Spark 3.0+.

Check your explain plans. When performance matters, inspect what Spark actually does:

result.explain(mode="formatted")

Look for BroadcastHashJoin (fast) versus SortMergeJoin (requires shuffle). If you expected a broadcast but see a shuffle, your DataFrame might be larger than you thought.

Joins are where PySpark pipelines often succeed or fail performance-wise. Master these patterns, understand when each join type applies, and always consider the data volumes involved. Your future self debugging a 3 AM pipeline failure will thank you.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.