PySpark - Subtract (Except) Two DataFrames

DataFrame subtraction in PySpark answers a deceptively simple question: which rows exist in DataFrame A but not in DataFrame B? This operation, also called set difference or 'except,' is fundamental...

Key Insights

  • PySpark’s subtract() removes duplicates and returns distinct rows from the first DataFrame not present in the second, while exceptAll() preserves duplicate counts for more precise data differencing
  • Schema alignment is critical—column order and data types must match exactly, requiring explicit select() statements to reorder columns before subtraction operations
  • For large-scale production workloads, left anti joins often outperform subtract() by avoiding the implicit distinct operation and providing more control over join conditions

Introduction to DataFrame Subtraction in PySpark

DataFrame subtraction in PySpark answers a deceptively simple question: which rows exist in DataFrame A but not in DataFrame B? This operation, also called set difference or “except,” is fundamental to data engineering workflows. You’ll encounter it when reconciling data between systems, identifying new records since the last batch, detecting deleted entries, or validating ETL pipelines.

Unlike SQL joins that combine data, subtraction filters it. The result contains only rows from the first DataFrame that have no exact match in the second. This makes it invaluable for data quality checks, incremental processing, and auditing. Understanding the nuances between PySpark’s subtraction methods and their performance characteristics will save you from costly mistakes in production.

The subtract() Method

The subtract() method performs a set difference operation, returning rows in the first DataFrame that don’t exist in the second. Crucially, it returns distinct rows—duplicates are automatically removed from the result, even if they existed in the source DataFrame.

Here’s the basic syntax and behavior:

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

spark = SparkSession.builder.appName("SubtractExample").getOrCreate()

# Define schema
schema = StructType([
    StructField("employee_id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("department", StringType(), True)
])

# Current employee records
current_data = [
    (1, "Alice Johnson", "Engineering"),
    (2, "Bob Smith", "Marketing"),
    (3, "Carol White", "Engineering"),
    (4, "David Brown", "Sales")
]

# Previous snapshot
previous_data = [
    (1, "Alice Johnson", "Engineering"),
    (2, "Bob Smith", "Marketing"),
    (5, "Eve Davis", "HR")
]

df_current = spark.createDataFrame(current_data, schema)
df_previous = spark.createDataFrame(previous_data, schema)

# Find new employees (in current but not in previous)
new_employees = df_current.subtract(df_previous)
new_employees.show()

Output:

+-----------+-----------+-----------+
|employee_id|       name| department|
+-----------+-----------+-----------+
|          3|Carol White|Engineering|
|          4|David Brown|      Sales|
+-----------+-----------+-----------+

The subtract() method compares entire rows. All columns must match exactly for a row to be considered identical. This whole-row comparison is both a strength and a limitation—it’s simple to reason about but inflexible when you need partial matching.

The exceptAll() Method

While subtract() returns distinct results, exceptAll() preserves duplicate counts. This distinction matters when duplicate rows carry meaning—for instance, when each row represents an individual transaction rather than a unique entity.

# Data with duplicates
data1 = [
    (1, "Product A"),
    (1, "Product A"),  # Duplicate
    (2, "Product B"),
    (2, "Product B"),  # Duplicate
    (3, "Product C")
]

data2 = [
    (1, "Product A"),  # One instance
    (2, "Product B"),
    (2, "Product B")   # Two instances
]

df1 = spark.createDataFrame(data1, ["id", "product"])
df2 = spark.createDataFrame(data2, ["id", "product"])

# Using subtract() - returns distinct rows
print("subtract() result:")
df1.subtract(df2).show()

# Using exceptAll() - preserves duplicate counts
print("exceptAll() result:")
df1.exceptAll(df2).show()

Output:

subtract() result:
+---+---------+
| id|  product|
+---+---------+
|  3|Product C|
+---+---------+

exceptAll() result:
+---+---------+
| id|  product|
+---+---------+
|  1|Product A|
|  3|Product C|
+---+---------+

With subtract(), the duplicate (1, "Product A") rows are eliminated entirely from the result since at least one match exists in df2. With exceptAll(), one instance remains because df1 has two occurrences while df2 has only one. This bag semantics (multiset difference) is essential for accurate transaction-level reconciliation.

Handling Schema Differences

Schema misalignment is the most common cause of subtraction failures. PySpark requires identical schemas: same column names, same data types, and same column order. Even if the columns contain the same data, different ordering will cause incorrect results.

# DataFrame with different column order
data_ordered1 = [(1, "Alice", "Engineering")]
data_ordered2 = [("Engineering", "Alice", 1)]

df_order1 = spark.createDataFrame(data_ordered1, ["id", "name", "dept"])
df_order2 = spark.createDataFrame(data_ordered2, ["dept", "name", "id"])

# This will NOT work as expected - column order differs
# df_order1.subtract(df_order2).show()  # Wrong result!

# Correct approach: align columns explicitly
df_order2_aligned = df_order2.select("id", "name", "dept")
result = df_order1.subtract(df_order2_aligned)
result.show()

When DataFrames have extra columns, you must select only the common columns:

# DataFrames with different column sets
df_extra1 = spark.createDataFrame(
    [(1, "Alice", "Engineering", "2023-01-01")],
    ["id", "name", "dept", "hire_date"]
)

df_extra2 = spark.createDataFrame(
    [(1, "Alice", "Engineering")],
    ["id", "name", "dept"]
)

# Select common columns before subtraction
common_cols = ["id", "name", "dept"]
result = df_extra1.select(common_cols).subtract(df_extra2.select(common_cols))
result.show()

Always validate schemas before subtraction in production code:

def safe_subtract(df1, df2, columns=None):
    """Perform subtract with automatic column alignment."""
    if columns:
        df1 = df1.select(columns)
        df2 = df2.select(columns)
    else:
        # Use columns from first DataFrame as reference
        columns = df1.columns
        df2 = df2.select(columns)
    
    return df1.subtract(df2)

Performance Considerations and Alternatives

The subtract() operation triggers a shuffle and an implicit distinct(), making it expensive on large datasets. For multi-terabyte DataFrames, consider alternatives that give you more control.

A left anti join often performs better because it avoids the distinct operation:

from pyspark.sql.functions import col

# Sample data
large_df1 = spark.range(0, 1000000).withColumn("value", col("id") * 2)
large_df2 = spark.range(0, 999000).withColumn("value", col("id") * 2)

# Method 1: subtract() - includes implicit distinct
result_subtract = large_df1.subtract(large_df2)

# Method 2: left anti join - more performant, preserves duplicates
result_join = large_df1.join(
    large_df2,
    on=["id", "value"],
    how="left_anti"
)

# For distinct results matching subtract() behavior
result_join_distinct = large_df1.join(
    large_df2,
    on=["id", "value"],
    how="left_anti"
).distinct()

The left anti join approach provides several advantages:

  • Explicit control: You specify exactly which columns to compare
  • Better performance: No forced distinct operation unless you need it
  • Flexibility: You can add additional filter conditions
  • Observability: Easier to tune with broadcast hints or repartitioning

For simple existence checks on a single column, isin() with negation can be even faster:

# When you only need to check one column
ids_to_exclude = df2.select("id").rdd.flatMap(lambda x: x).collect()
result = df1.filter(~col("id").isin(ids_to_exclude))

However, isin() only works well when the exclusion list fits in driver memory.

Practical Use Case: Data Reconciliation

Here’s a complete example demonstrating daily data reconciliation between a production database and a backup system:

from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp, lit

spark = SparkSession.builder.appName("DataReconciliation").getOrCreate()

# Simulate production database records
prod_records = [
    (1001, "Order A", "2024-01-15", 250.00),
    (1002, "Order B", "2024-01-15", 175.50),
    (1003, "Order C", "2024-01-16", 430.25),
    (1004, "Order D", "2024-01-16", 89.99),
    (1005, "Order E", "2024-01-16", 320.00)
]

# Simulate backup database records (missing recent orders)
backup_records = [
    (1001, "Order A", "2024-01-15", 250.00),
    (1002, "Order B", "2024-01-15", 175.50),
    (1003, "Order C", "2024-01-16", 430.25)
]

schema = ["order_id", "order_name", "order_date", "amount"]
df_prod = spark.createDataFrame(prod_records, schema)
df_backup = spark.createDataFrame(backup_records, schema)

# Find missing records in backup
missing_in_backup = df_prod.subtract(df_backup)

print(f"Missing records in backup: {missing_in_backup.count()}")
missing_in_backup.show()

# Find records in backup but not in production (data loss scenario)
orphaned_in_backup = df_backup.subtract(df_prod)

print(f"Orphaned records in backup: {orphaned_in_backup.count()}")
orphaned_in_backup.show()

# Generate reconciliation report
if missing_in_backup.count() > 0:
    reconciliation_report = missing_in_backup.withColumn(
        "issue_type", lit("MISSING_IN_BACKUP")
    ).withColumn(
        "detected_at", current_timestamp()
    )
    
    # In production, write to alerting system or logging table
    reconciliation_report.show(truncate=False)

This pattern extends to comparing data across environments, validating incremental loads, or auditing data migrations.

Common Pitfalls and Best Practices

Pitfall 1: Ignoring null handling Nulls are compared using equality semantics. Two null values are considered equal in subtraction operations, which may not match your business logic.

Pitfall 2: Schema drift Production schemas evolve. Always validate column compatibility before subtraction, especially in automated pipelines.

Pitfall 3: Memory pressure subtract() can cause memory issues on skewed data. Monitor partition sizes and consider repartitioning before the operation.

Best Practices:

  1. Explicitly select and order columns before subtraction to prevent schema mismatches
  2. Use exceptAll() for transaction-level data where duplicates matter
  3. Consider left anti joins for large datasets or when you need partial column matching
  4. Cache intermediate results if you’re performing multiple subtraction operations on the same DataFrames
  5. Add data quality checks to verify row counts and null percentages before and after subtraction
  6. Partition strategically on high-cardinality columns to distribute the workload evenly

Subtraction operations are powerful tools for data validation and reconciliation. Choose the right method based on your duplicate handling needs, optimize for your data scale, and always validate schemas to avoid silent failures in production pipelines.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.