PySpark - Left Anti Join with Examples
A left anti join is the inverse of an inner join. While an inner join returns rows where keys match in both DataFrames, a left anti join returns rows from the left DataFrame where there is *no*...
Key Insights
- Left anti joins return rows from the left DataFrame that have no matches in the right DataFrame, making them ideal for finding missing relationships and orphaned records in data pipelines
- PySpark’s left anti join typically outperforms equivalent operations using
filter()with~isin()for large datasets, especially when combined with broadcast hints for small lookup tables - Null values in join keys will never match, which can lead to unexpected results—always validate your data or explicitly filter nulls before performing anti joins
Introduction to Left Anti Join
A left anti join is the inverse of an inner join. While an inner join returns rows where keys match in both DataFrames, a left anti join returns rows from the left DataFrame where there is no matching key in the right DataFrame. Think of it as a “what’s missing” operation.
This join type is invaluable for data quality checks, identifying gaps in your data, and finding orphaned records. Unlike a left outer join that returns all left rows and adds nulls where there’s no match, a left anti join simply excludes the matched rows entirely—giving you a clean dataset of unmatched records.
Here’s a simple example to visualize the concept:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
spark = SparkSession.builder.appName("LeftAntiJoin").getOrCreate()
# Left DataFrame: All customers
customers = spark.createDataFrame([
(1, "Alice"),
(2, "Bob"),
(3, "Charlie"),
(4, "Diana")
], ["customer_id", "name"])
# Right DataFrame: Customers who made purchases
purchases = spark.createDataFrame([
(1, "Widget"),
(3, "Gadget")
], ["customer_id", "product"])
# Left anti join: Find customers with NO purchases
no_purchases = customers.join(purchases, "customer_id", "left_anti")
no_purchases.show()
# Output:
# +-----------+------+
# |customer_id|name |
# +-----------+------+
# |2 |Bob |
# |4 |Diana |
# +-----------+------+
Only Bob and Diana appear in the result because they have no corresponding records in the purchases DataFrame.
Syntax and Basic Usage
PySpark provides two syntax variations for left anti joins: 'left_anti' and 'leftanti' (without underscore). Both are functionally identical.
# Method 1: Using 'left_anti'
result = left_df.join(right_df, join_condition, "left_anti")
# Method 2: Using 'leftanti' (alternative)
result = left_df.join(right_df, join_condition, "leftanti")
Let’s expand on our customer example with a more realistic scenario:
# Create customers DataFrame
customers = spark.createDataFrame([
(101, "Alice Johnson", "alice@email.com"),
(102, "Bob Smith", "bob@email.com"),
(103, "Charlie Brown", "charlie@email.com"),
(104, "Diana Prince", "diana@email.com"),
(105, "Eve Wilson", "eve@email.com")
], ["customer_id", "name", "email"])
# Create orders DataFrame
orders = spark.createDataFrame([
(1, 101, 150.00),
(2, 101, 200.00),
(3, 103, 75.50),
(4, 105, 300.00)
], ["order_id", "customer_id", "amount"])
# Find customers who have never placed an order
never_ordered = customers.join(
orders,
customers.customer_id == orders.customer_id,
"left_anti"
)
never_ordered.select("customer_id", "name", "email").show()
# Output:
# +-----------+------------+---------------+
# |customer_id|name |email |
# +-----------+------------+---------------+
# |102 |Bob Smith |bob@email.com |
# |104 |Diana Prince|diana@email.com|
# +-----------+------------+---------------+
Notice that when column names differ between DataFrames, you need to explicitly specify the join condition using the equality operator.
Real-World Use Cases
Left anti joins shine in production data pipelines for several scenarios:
Finding Employees Not Assigned to Projects:
employees = spark.createDataFrame([
(1, "John Doe", "Engineering"),
(2, "Jane Smith", "Marketing"),
(3, "Mike Johnson", "Engineering"),
(4, "Sarah Williams", "Sales")
], ["emp_id", "name", "department"])
project_assignments = spark.createDataFrame([
(1, "Project Alpha"),
(3, "Project Beta")
], ["emp_id", "project_name"])
unassigned_employees = employees.join(
project_assignments,
"emp_id",
"left_anti"
)
unassigned_employees.show()
# Shows Jane Smith and Sarah Williams who need project assignments
Identifying Products with No Sales:
all_products = spark.createDataFrame([
("SKU001", "Laptop", 999.99),
("SKU002", "Mouse", 29.99),
("SKU003", "Keyboard", 79.99),
("SKU004", "Monitor", 299.99)
], ["sku", "product_name", "price"])
sold_products = spark.createDataFrame([
("SKU001",),
("SKU003",)
], ["sku"])
unsold_inventory = all_products.join(sold_products, "sku", "left_anti")
unsold_inventory.show()
# Identifies Mouse and Monitor as products needing marketing attention
Detecting Inactive Users:
from pyspark.sql.types import TimestampType
from datetime import datetime
all_users = spark.createDataFrame([
(1, "user1@example.com"),
(2, "user2@example.com"),
(3, "user3@example.com"),
(4, "user4@example.com")
], ["user_id", "email"])
recent_logins = spark.createDataFrame([
(1, datetime(2024, 1, 15)),
(3, datetime(2024, 1, 14))
], ["user_id", "last_login"])
inactive_users = all_users.join(recent_logins, "user_id", "left_anti")
inactive_users.show()
# Identifies users 2 and 4 for re-engagement campaigns
Left Anti Join vs. NOT IN/NOT EXISTS
You might be tempted to use filter() with ~isin() instead of a left anti join. While functionally similar, there are important differences:
# Using left anti join
result_anti = customers.join(orders, "customer_id", "left_anti")
# Using filter with NOT IN pattern
order_ids = [row.customer_id for row in orders.select("customer_id").distinct().collect()]
result_filter = customers.filter(~col("customer_id").isin(order_ids))
# Compare execution plans
result_anti.explain()
result_filter.explain()
The left anti join approach is generally more efficient because:
- No data collection to driver: The
isin()approach requires collecting distinct values to the driver, which fails with large datasets - Distributed execution: Anti joins execute entirely in distributed fashion across the cluster
- Catalyst optimization: Spark’s optimizer can better optimize join operations
For small lookup tables (thousands of rows), the performance difference is negligible. For millions of rows, left anti join is the clear winner.
Handling Multiple Join Conditions
Real-world scenarios often require joining on multiple columns:
# Customer data by region
customer_regions = spark.createDataFrame([
(1, "Alice", "US-WEST"),
(2, "Bob", "US-EAST"),
(3, "Charlie", "EU-NORTH"),
(4, "Diana", "US-WEST"),
(5, "Eve", "EU-SOUTH")
], ["customer_id", "name", "region"])
# Regional sales data
regional_sales = spark.createDataFrame([
(1, "US-WEST", 500.00),
(3, "EU-NORTH", 300.00),
(2, "US-WEST", 200.00) # Note: Different region for customer 2
], ["customer_id", "region", "amount"])
# Find customers with no sales in their region
no_regional_sales = customer_regions.join(
regional_sales,
(customer_regions.customer_id == regional_sales.customer_id) &
(customer_regions.region == regional_sales.region),
"left_anti"
)
no_regional_sales.show()
# Shows Bob (wrong region), Diana, and Eve
This pattern is crucial for data validation where relationships must match on multiple dimensions.
Performance Optimization Tips
Using Broadcast Joins for Small Lookup Tables:
When the right DataFrame is small (typically under 100MB), use broadcast hints to avoid shuffling:
from pyspark.sql.functions import broadcast
# Small blacklist of fraudulent accounts
fraud_accounts = spark.createDataFrame([
(42,), (137,), (891,)
], ["customer_id"])
# Large transactions dataset
transactions = spark.read.parquet("s3://bucket/transactions/")
# Broadcast the small fraud list
clean_transactions = transactions.join(
broadcast(fraud_accounts),
"customer_id",
"left_anti"
)
# This avoids shuffling the large transactions dataset
Partitioning Strategy:
# Repartition before anti join for better parallelism
large_df = large_df.repartition(200, "customer_id")
lookup_df = lookup_df.repartition(200, "customer_id")
result = large_df.join(lookup_df, "customer_id", "left_anti")
Caching Intermediate Results:
# If reusing the lookup DataFrame multiple times
lookup_df.cache()
result1 = df1.join(lookup_df, "id", "left_anti")
result2 = df2.join(lookup_df, "id", "left_anti")
result3 = df3.join(lookup_df, "id", "left_anti")
Common Pitfalls and Troubleshooting
Null Handling:
Nulls in join keys will never match, which can produce unexpected results:
customers_with_nulls = spark.createDataFrame([
(1, "Alice"),
(2, "Bob"),
(None, "Charlie"), # Null customer_id
(4, "Diana")
], ["customer_id", "name"])
orders = spark.createDataFrame([
(1, 100.00),
(None, 50.00) # Null customer_id in orders too
], ["customer_id", "amount"])
# Anti join with nulls
result = customers_with_nulls.join(orders, "customer_id", "left_anti")
result.show()
# Charlie appears in results even though there's a null in orders!
# Nulls never match, even with other nulls
Solution: Filter nulls explicitly:
# Proper null handling
clean_result = customers_with_nulls.filter(
col("customer_id").isNotNull()
).join(
orders.filter(col("customer_id").isNotNull()),
"customer_id",
"left_anti"
)
Column Name Conflicts:
# Both DataFrames have 'name' column
df1 = spark.createDataFrame([(1, "Alice")], ["id", "name"])
df2 = spark.createDataFrame([(1, "Product A")], ["id", "name"])
# This will cause issues accessing 'name' after join
result = df1.join(df2, "id", "left_anti")
# Solution: Rename columns before joining
df2_renamed = df2.withColumnRenamed("name", "product_name")
result = df1.join(df2_renamed, "id", "left_anti")
Left anti joins are a powerful tool for data quality, gap analysis, and identifying missing relationships. Master them, and you’ll write cleaner, more efficient data pipelines.