PySpark - Join Two DataFrames (Inner, Left, Right, Full)
Joins are fundamental operations in PySpark for combining data from multiple sources. Whether you're enriching customer data with transaction history, combining dimension tables with fact tables, or...
Key Insights
- PySpark supports four primary join types (inner, left, right, full), each serving distinct use cases—inner for matching records only, left/right for preserving one side’s data, and full for complete datasets with nulls where matches don’t exist
- Join performance dramatically improves when broadcasting small DataFrames (under 10MB) to all executor nodes, avoiding expensive shuffle operations that can bottleneck distributed processing
- Always specify explicit join conditions and column names to avoid ambiguous column references, especially when joining on multiple columns or dealing with DataFrames that share column names beyond the join key
Introduction & Setup
Joins are fundamental operations in PySpark for combining data from multiple sources. Whether you’re enriching customer data with transaction history, combining dimension tables with fact tables, or merging datasets from different systems, understanding join mechanics is critical for building efficient data pipelines.
The choice of join type directly impacts your results. Use inner joins when you only want matching records, left joins to preserve your primary dataset while enriching it, right joins when the secondary dataset is your source of truth, and full outer joins when you need complete visibility across both datasets regardless of matches.
Let’s create two sample DataFrames that we’ll use throughout this article:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
# Initialize Spark session
spark = SparkSession.builder \
.appName("PySpark Joins") \
.getOrCreate()
# Create employees DataFrame
employees_data = [
(1, "John Doe", 101),
(2, "Jane Smith", 102),
(3, "Mike Johnson", 101),
(4, "Sarah Williams", 103),
(5, "Tom Brown", None) # Employee without department
]
employees_schema = StructType([
StructField("emp_id", IntegerType(), True),
StructField("emp_name", StringType(), True),
StructField("dept_id", IntegerType(), True)
])
employees_df = spark.createDataFrame(employees_data, employees_schema)
# Create departments DataFrame
departments_data = [
(101, "Engineering"),
(102, "Sales"),
(103, "Marketing"),
(104, "HR") # Department without employees
]
departments_schema = StructType([
StructField("dept_id", IntegerType(), True),
StructField("dept_name", StringType(), True)
])
departments_df = spark.createDataFrame(departments_data, departments_schema)
employees_df.show()
departments_df.show()
Inner Join
Inner joins return only the rows where the join condition matches in both DataFrames. This is the most restrictive join type and typically the default when you don’t specify a join type explicitly.
Use inner joins when you need data that exists in both datasets. For example, finding employees who are assigned to valid departments, or matching orders with customer records where both exist.
# Perform inner join
inner_join_df = employees_df.join(
departments_df,
employees_df.dept_id == departments_df.dept_id,
"inner"
)
inner_join_df.show()
# Output:
# +------+--------------+-------+-------+-----------+
# |emp_id| emp_name|dept_id|dept_id| dept_name|
# +------+--------------+-------+-------+-----------+
# | 1| John Doe| 101| 101|Engineering|
# | 3| Mike Johnson| 101| 101|Engineering|
# | 2| Jane Smith| 102| 102| Sales|
# | 4|Sarah Williams| 103| 103| Marketing|
# +------+--------------+-------+-------+-----------+
# Notice Tom Brown (no department) and HR department (no employees) are excluded
Notice that the result contains duplicate dept_id columns. We’ll address this later, but for now, understand that inner joins exclude Tom Brown (employee without a department) and the HR department (department without employees).
Left (Left Outer) Join
Left joins return all records from the left DataFrame and matching records from the right DataFrame. Where no match exists, the right DataFrame’s columns contain null values.
This is your go-to join when the left DataFrame is your primary dataset and you want to enrich it with additional information. Even if enrichment data is missing, you retain all original records.
# Perform left join
left_join_df = employees_df.join(
departments_df,
employees_df.dept_id == departments_df.dept_id,
"left"
)
left_join_df.show()
# Output:
# +------+--------------+-------+-------+-----------+
# |emp_id| emp_name|dept_id|dept_id| dept_name|
# +------+--------------+-------+-------+-----------+
# | 1| John Doe| 101| 101|Engineering|
# | 2| Jane Smith| 102| 102| Sales|
# | 3| Mike Johnson| 101| 101|Engineering|
# | 4|Sarah Williams| 103| 103| Marketing|
# | 5| Tom Brown| null| null| null|
# +------+--------------+-------+-------+-----------+
# Tom Brown appears with null department information
# HR department is still excluded
# Handle nulls explicitly
from pyspark.sql.functions import coalesce, lit
left_join_df.select(
"emp_id",
"emp_name",
coalesce(departments_df.dept_name, lit("Unassigned")).alias("department")
).show()
Tom Brown now appears in the results with null values for department columns. This preserves your employee roster while showing which employees lack department assignments.
Right (Right Outer) Join
Right joins are the mirror image of left joins—they return all records from the right DataFrame and matching records from the left DataFrame. Null values appear in left DataFrame columns where no match exists.
Use right joins when your secondary dataset is the authoritative source. For instance, showing all departments including those without assigned employees.
# Perform right join
right_join_df = employees_df.join(
departments_df,
employees_df.dept_id == departments_df.dept_id,
"right"
)
right_join_df.show()
# Output:
# +------+--------------+-------+-------+-----------+
# |emp_id| emp_name|dept_id|dept_id| dept_name|
# +------+--------------+-------+-------+-----------+
# | 1| John Doe| 101| 101|Engineering|
# | 3| Mike Johnson| 101| 101|Engineering|
# | 2| Jane Smith| 102| 102| Sales|
# | 4|Sarah Williams| 103| 103| Marketing|
# | null| null| null| 104| HR|
# +------+--------------+-------+-------+-----------+
# HR department appears with null employee information
# Tom Brown is excluded
# Count employees per department including empty departments
right_join_df.groupBy(departments_df.dept_name).count().show()
The HR department now appears with null employee values. This is useful for reports requiring all departments, highlighting which ones need staffing.
Full (Full Outer) Join
Full outer joins return all records from both DataFrames. Where matches don’t exist, you get null values on the respective side. This gives you complete visibility across both datasets.
Use full outer joins when you need to reconcile two datasets and identify mismatches on both sides—finding both unassigned employees and unstaffed departments.
# Perform full outer join
full_join_df = employees_df.join(
departments_df,
employees_df.dept_id == departments_df.dept_id,
"full"
)
full_join_df.show()
# Output:
# +------+--------------+-------+-------+-----------+
# |emp_id| emp_name|dept_id|dept_id| dept_name|
# +------+--------------+-------+-------+-----------+
# | 1| John Doe| 101| 101|Engineering|
# | 2| Jane Smith| 102| 102| Sales|
# | 3| Mike Johnson| 101| 101|Engineering|
# | 4|Sarah Williams| 103| 103| Marketing|
# | 5| Tom Brown| null| null| null|
# | null| null| null| 104| HR|
# +------+--------------+-------+-------+-----------+
# Both Tom Brown and HR department appear
# Identify data quality issues
full_join_df.filter(
(employees_df.emp_id.isNull()) | (departments_df.dept_id.isNull())
).show()
Full outer joins are powerful for data quality audits, showing exactly where your datasets diverge.
Advanced Join Techniques
Real-world scenarios often require more sophisticated join operations. Here are techniques that will improve your PySpark join implementations.
Joining on Multiple Columns
# Create DataFrames with composite keys
sales_data = [
(1, "2024-01", 1000),
(1, "2024-02", 1500),
(2, "2024-01", 2000)
]
sales_df = spark.createDataFrame(
sales_data,
["emp_id", "month", "amount"]
)
targets_data = [
(1, "2024-01", 900),
(1, "2024-02", 1200),
(2, "2024-01", 1800)
]
targets_df = spark.createDataFrame(
targets_data,
["emp_id", "month", "target"]
)
# Join on multiple columns
multi_col_join = sales_df.join(
targets_df,
(sales_df.emp_id == targets_df.emp_id) &
(sales_df.month == targets_df.month),
"inner"
)
multi_col_join.show()
Handling Column Name Conflicts
# Join with explicit column selection to avoid duplicates
clean_join = employees_df.join(
departments_df,
employees_df.dept_id == departments_df.dept_id,
"inner"
).select(
employees_df.emp_id,
employees_df.emp_name,
employees_df.dept_id,
departments_df.dept_name
)
clean_join.show()
# Alternative: use alias
from pyspark.sql.functions import col
emp_alias = employees_df.alias("emp")
dept_alias = departments_df.alias("dept")
aliased_join = emp_alias.join(
dept_alias,
col("emp.dept_id") == col("dept.dept_id"),
"inner"
).select("emp.*", "dept.dept_name")
aliased_join.show()
Broadcast Joins for Performance
When joining a large DataFrame with a small one (typically under 10MB), broadcast the small DataFrame to all executors to avoid expensive shuffle operations.
from pyspark.sql.functions import broadcast
# Broadcast small departments DataFrame
broadcast_join = employees_df.join(
broadcast(departments_df),
employees_df.dept_id == departments_df.dept_id,
"inner"
)
# This prevents shuffling the larger employees DataFrame
broadcast_join.explain()
Conclusion & Best Practices
Choose your join type based on data requirements: inner joins for strict matching, left joins to preserve your primary dataset, right joins when the secondary dataset is authoritative, and full outer joins for complete reconciliation.
Performance matters in distributed computing. Always broadcast small DataFrames, partition large DataFrames on join keys, and avoid full outer joins on massive datasets unless absolutely necessary. Monitor shuffle operations—they’re often the bottleneck.
Common pitfalls include forgetting to handle duplicate column names, not accounting for null values in outer joins, and using full outer joins when a left or inner join would suffice. Always specify explicit join conditions and test with representative data volumes.
Use the explain() method to understand query plans and identify optimization opportunities. PySpark’s Catalyst optimizer is powerful, but explicit hints like broadcast joins help it make better decisions for your specific use case.