How to Left Join in PySpark

Left joins are the workhorse of data engineering. When you need to enrich a primary dataset with optional attributes from a secondary source, left joins preserve your complete dataset while pulling...

Key Insights

  • Left joins in PySpark preserve all rows from the left DataFrame while matching rows from the right, with nulls filling unmatched columns—master the join() method’s how="left" parameter for this fundamental operation.
  • Column name conflicts are the most common source of bugs in PySpark joins; use aliasing and explicit column selection to avoid ambiguous references that will crash your jobs.
  • Broadcast joins can speed up left joins by 10-100x when your right DataFrame fits in memory—always consider this optimization for dimension tables under 100MB.

Introduction

Left joins are the workhorse of data engineering. When you need to enrich a primary dataset with optional attributes from a secondary source, left joins preserve your complete dataset while pulling in matching records where they exist.

In PySpark’s distributed context, a left join returns all rows from the left DataFrame and the matched rows from the right DataFrame. When no match exists, the right side’s columns contain null values. This behavior makes left joins essential for scenarios like enriching transaction data with customer details, adding product metadata to order records, or merging event logs with user profiles.

Understanding left joins in PySpark requires knowing both the syntax variations and the distributed computing implications. A poorly constructed join can shuffle terabytes of data across your cluster unnecessarily. Let’s get into the practical details.

Basic Left Join Syntax

The join() method is your primary tool. The how parameter accepts "left", "left_outer", or "leftouter"—all equivalent.

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("LeftJoinExample").getOrCreate()

# Create sample DataFrames
orders_data = [
    (1, 101, 250.00),
    (2, 102, 175.50),
    (3, 103, 320.00),
    (4, 999, 89.99)  # customer_id 999 doesn't exist
]
orders = spark.createDataFrame(orders_data, ["order_id", "customer_id", "amount"])

customers_data = [
    (101, "Alice", "alice@email.com"),
    (102, "Bob", "bob@email.com"),
    (103, "Charlie", "charlie@email.com")
]
customers = spark.createDataFrame(customers_data, ["customer_id", "name", "email"])

# Basic left join
result = orders.join(customers, on="customer_id", how="left")
result.show()

Output:

+-----------+--------+------+-------+-----------------+
|customer_id|order_id|amount|   name|            email|
+-----------+--------+------+-------+-----------------+
|        101|       1|250.00|  Alice|  alice@email.com|
|        102|       2|175.50|    Bob|    bob@email.com|
|        103|       3|320.00|Charlie|charlie@email.com|
|        999|       4| 89.99|   null|             null|
+-----------+--------+------+-------+-----------------+

Notice order 4 with customer_id=999 remains in the result with null values for name and email. This is the defining characteristic of left joins.

Joining on Multiple Columns

Real-world data often requires composite keys. Pass a list of column names to join on multiple columns simultaneously.

# Sales data with composite key
sales_data = [
    (2023, "P001", "North", 1500),
    (2023, "P001", "South", 2200),
    (2023, "P002", "North", 800),
    (2024, "P001", "North", 1800)
]
sales = spark.createDataFrame(sales_data, ["year", "product_id", "region", "revenue"])

# Product targets with same composite key
targets_data = [
    (2023, "P001", 3000),
    (2023, "P002", 1000),
    (2024, "P001", 4000)
]
targets = spark.createDataFrame(targets_data, ["year", "product_id", "target"])

# Join on multiple columns
result = sales.join(targets, on=["year", "product_id"], how="left")
result.show()

Output:

+----+----------+------+-------+------+
|year|product_id|region|revenue|target|
+----+----------+------+-------+------+
|2023|      P001| North|   1500|  3000|
|2023|      P001| South|   2200|  3000|
|2023|      P002| North|    800|  1000|
|2024|      P001| North|   1800|  4000|
+----+----------+------+-------+------+

The join matches on both year and product_id, so the 2023 P001 target of 3000 applies to both North and South regional sales.

Handling Column Name Conflicts

When both DataFrames have columns with the same name (beyond the join key), PySpark keeps both, creating ambiguity. This breaks downstream operations.

from pyspark.sql.functions import col

# Both DataFrames have a 'status' column
orders_data = [
    (1, 101, "shipped"),
    (2, 102, "pending")
]
orders = spark.createDataFrame(orders_data, ["order_id", "customer_id", "status"])

customers_data = [
    (101, "Alice", "active"),
    (102, "Bob", "inactive")
]
customers = spark.createDataFrame(customers_data, ["customer_id", "name", "status"])

# Use aliases to disambiguate
orders_aliased = orders.alias("o")
customers_aliased = customers.alias("c")

result = orders_aliased.join(
    customers_aliased,
    col("o.customer_id") == col("c.customer_id"),
    "left"
).select(
    col("o.order_id"),
    col("o.customer_id"),
    col("o.status").alias("order_status"),
    col("c.name"),
    col("c.status").alias("customer_status")
)

result.show()

Output:

+--------+-----------+------------+-----+---------------+
|order_id|customer_id|order_status| name|customer_status|
+--------+-----------+------------+-----+---------------+
|       1|        101|     shipped|Alice|         active|
|       2|        102|     pending|  Bob|       inactive|
+--------+-----------+------------+-----+---------------+

The alias approach gives you explicit control. An alternative is dropping duplicate columns immediately after the join:

result = orders.join(customers, on="customer_id", how="left").drop(customers.status)

Left Join with Different Column Names

Frequently, the join keys have different names across DataFrames. You can’t use the on parameter with a column name list—you need an explicit condition.

from pyspark.sql.functions import col

# Different column names for the same concept
transactions_data = [
    (1, "U001", 50.00),
    (2, "U002", 75.00),
    (3, "U003", 120.00)
]
transactions = spark.createDataFrame(transactions_data, ["txn_id", "user_id", "amount"])

profiles_data = [
    ("U001", "Premium", 2020),
    ("U002", "Basic", 2022)
]
profiles = spark.createDataFrame(profiles_data, ["member_id", "tier", "join_year"])

# Join with explicit condition
result = transactions.join(
    profiles,
    transactions.user_id == profiles.member_id,
    "left"
).select(
    transactions.txn_id,
    transactions.user_id,
    transactions.amount,
    profiles.tier,
    profiles.join_year
)

result.show()

Output:

+------+-------+------+-------+---------+
|txn_id|user_id|amount|   tier|join_year|
+------+-------+------+-------+---------+
|     1|   U001| 50.00|Premium|     2020|
|     2|   U002| 75.00|  Basic|     2022|
|     3|   U003|120.00|   null|     null|
+------+-------+------+-------+---------+

When using explicit conditions, both join key columns remain in the result. Select the columns you need explicitly to avoid redundancy.

Handling Null Values

Left joins produce nulls for unmatched rows. Often you need default values instead.

from pyspark.sql.functions import coalesce, lit, when

# Using coalesce for default values
result = transactions.join(
    profiles,
    transactions.user_id == profiles.member_id,
    "left"
).select(
    transactions.txn_id,
    transactions.user_id,
    transactions.amount,
    coalesce(profiles.tier, lit("Unknown")).alias("tier"),
    coalesce(profiles.join_year, lit(0)).alias("join_year")
)

result.show()

Output:

+------+-------+------+-------+---------+
|txn_id|user_id|amount|   tier|join_year|
+------+-------+------+-------+---------+
|     1|   U001| 50.00|Premium|     2020|
|     2|   U002| 75.00|  Basic|     2022|
|     3|   U003|120.00|Unknown|        0|
+------+-------+------+-------+---------+

For bulk null replacement, fillna() works on the entire DataFrame:

result = transactions.join(
    profiles,
    transactions.user_id == profiles.member_id,
    "left"
).fillna({"tier": "Unknown", "join_year": 0})

Use coalesce() when you need column-specific logic; use fillna() for simple default values across multiple columns.

Performance Considerations

Left joins in PySpark trigger shuffles—expensive data movement across the cluster. Here’s how to minimize the pain.

Broadcast joins eliminate shuffles when one DataFrame is small enough to fit in executor memory. PySpark sends the small DataFrame to all executors, enabling local joins.

from pyspark.sql.functions import broadcast

# Dimension table with 10,000 rows - perfect for broadcasting
product_dim = spark.createDataFrame([
    ("P001", "Widget", "Electronics"),
    ("P002", "Gadget", "Electronics"),
    ("P003", "Gizmo", "Hardware")
], ["product_id", "product_name", "category"])

# Fact table with millions of rows
sales_fact = spark.createDataFrame([
    (1, "P001", 100),
    (2, "P001", 150),
    (3, "P002", 200),
    (4, "P004", 50)  # Product doesn't exist in dimension
], ["sale_id", "product_id", "quantity"])

# Broadcast the small dimension table
result = sales_fact.join(
    broadcast(product_dim),
    on="product_id",
    how="left"
)

result.explain()  # Check the physical plan shows BroadcastHashJoin
result.show()

The broadcast() hint tells Spark to distribute the small DataFrame to all nodes. This avoids shuffling the large fact table entirely.

Partitioning strategy matters for repeated joins. If you frequently join on customer_id, repartition your DataFrames on that column:

orders_partitioned = orders.repartition(200, "customer_id")
customers_partitioned = customers.repartition(200, "customer_id")

# Subsequent joins on customer_id avoid full shuffles
result = orders_partitioned.join(customers_partitioned, on="customer_id", how="left")

Filter before joining whenever possible. Reducing row counts before the join dramatically decreases shuffle data:

# Bad: join then filter
result = orders.join(customers, on="customer_id", how="left").filter(col("amount") > 100)

# Good: filter then join
result = orders.filter(col("amount") > 100).join(customers, on="customer_id", how="left")

The default broadcast threshold is 10MB (spark.sql.autoBroadcastJoinThreshold). Increase it for larger dimension tables that still fit in memory:

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 100 * 1024 * 1024)  # 100MB

Left joins are fundamental to data engineering pipelines. Master the syntax variations, handle column conflicts explicitly, and always consider broadcast opportunities for small tables. Your cluster—and your colleagues—will thank you.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.