PySpark - SQL UNION and UNION ALL

In traditional SQL databases, UNION and UNION ALL serve distinct purposes: UNION removes duplicates while UNION ALL preserves every row. This distinction becomes crucial in distributed computing...

Key Insights

  • PySpark’s union() method now performs UNION ALL by default (keeps duplicates), while distinct() must be called separately for true SQL UNION behavior—a critical difference from traditional SQL that catches many developers off guard.
  • unionByName() is the superior choice for production systems because it matches columns by name rather than position, preventing silent data corruption when DataFrame schemas have different column orders.
  • UNION operations in PySpark don’t trigger shuffles for duplicate removal since union() preserves all rows, but calling distinct() afterward introduces expensive shuffle operations that can severely impact performance on large datasets.

Understanding UNION vs UNION ALL in Distributed Systems

In traditional SQL databases, UNION and UNION ALL serve distinct purposes: UNION removes duplicates while UNION ALL preserves every row. This distinction becomes crucial in distributed computing environments like PySpark, where operations that require comparing all rows across partitions can be prohibitively expensive.

PySpark handles these operations differently than you might expect. The framework prioritizes performance in distributed contexts, which has led to some counterintuitive API decisions. Understanding these nuances will save you from data quality issues and performance bottlenecks in production pipelines.

You’ll need these operations when combining data from multiple sources—merging regional sales data, consolidating logs from different services, or appending incremental loads to historical datasets. The choice between keeping or removing duplicates isn’t just about correctness; it directly impacts cluster resource consumption and job execution time.

The Critical Difference: PySpark’s Approach

Here’s where PySpark diverges from SQL conventions: the union() method performs what SQL calls UNION ALL. It keeps all rows, including duplicates. To achieve traditional SQL UNION behavior (distinct rows only), you must explicitly call distinct() after the union operation.

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.appName("UnionExample").getOrCreate()

# Create sample DataFrames with overlapping data
df1 = spark.createDataFrame([
    (1, "Alice", "Engineering"),
    (2, "Bob", "Sales"),
    (3, "Charlie", "Engineering")
], ["id", "name", "department"])

df2 = spark.createDataFrame([
    (2, "Bob", "Sales"),  # Duplicate
    (4, "Diana", "Marketing"),
    (5, "Eve", "Engineering")
], ["id", "name", "department"])

# UNION ALL behavior (default in PySpark)
union_all_result = df1.union(df2)
print(f"UNION ALL count: {union_all_result.count()}")  # Returns 6 rows

# Traditional SQL UNION behavior (distinct rows)
union_result = df1.union(df2).distinct()
print(f"UNION count: {union_result.count()}")  # Returns 5 rows

union_all_result.show()
# +---+-------+-----------+
# | id|   name| department|
# +---+-------+-----------+
# |  1|  Alice|Engineering|
# |  2|    Bob|      Sales|
# |  3|Charlie|Engineering|
# |  2|    Bob|      Sales|  # Duplicate preserved
# |  4|  Diana|  Marketing|
# |  5|    Eve|Engineering|
# +---+-------+-----------+

The performance implications are significant. The union() operation is a narrow transformation that doesn’t require shuffling data across partitions. Adding distinct() converts it to a wide transformation requiring a full shuffle—essentially comparing every row against every other row across your entire cluster.

Basic Syntax and Schema Requirements

PySpark’s union operations require schema compatibility. The DataFrames must have the same number of columns with matching data types in the same positions. Column names don’t need to match when using basic union(), but this creates a maintenance nightmare.

# Schema-compatible union
sales_q1 = spark.createDataFrame([
    ("2024-01", "ProductA", 1000),
    ("2024-02", "ProductB", 1500),
], ["month", "product", "revenue"])

sales_q2 = spark.createDataFrame([
    ("2024-04", "ProductA", 1200),
    ("2024-05", "ProductC", 1800),
], ["month", "product", "revenue"])

combined_sales = sales_q1.union(sales_q2)
combined_sales.show()

# Schema mismatch - this will fail
incompatible_df = spark.createDataFrame([
    ("2024-06", "ProductD", 2000, "North"),  # Extra column
], ["month", "product", "revenue", "region"])

# This raises: AnalysisException: Union can only be performed on tables with same number of columns
# combined_sales.union(incompatible_df)

For production systems, always use unionByName() instead. This method matches columns by name rather than position, providing safety against schema evolution and refactoring:

# Different column order - dangerous with union()
df_reordered = spark.createDataFrame([
    (3000, "2024-07", "ProductE"),  # Wrong order!
], ["revenue", "month", "product"])

# Basic union() silently creates corrupt data
wrong_result = sales_q1.union(df_reordered)
wrong_result.show()
# month becomes revenue, revenue becomes product - disaster!

# unionByName() handles this correctly
correct_result = sales_q1.unionByName(df_reordered)
correct_result.show()
# Columns matched by name, data integrity preserved

Advanced Patterns for Real-World Applications

Combining multiple DataFrames is common when processing partitioned data. Instead of chaining union calls, use Python’s reduce() function for cleaner code:

from functools import reduce
from pyspark.sql import DataFrame

# Simulate quarterly data files
q1_data = spark.createDataFrame([(1, 10000)], ["quarter", "sales"])
q2_data = spark.createDataFrame([(2, 12000)], ["quarter", "sales"])
q3_data = spark.createDataFrame([(3, 11500)], ["quarter", "sales"])
q4_data = spark.createDataFrame([(4, 13000)], ["quarter", "sales"])

quarterly_dfs = [q1_data, q2_data, q3_data, q4_data]

# Elegant multi-DataFrame union
annual_data = reduce(DataFrame.union, quarterly_dfs)
annual_data.show()

When dealing with schema evolution, unionByName() accepts an allowMissingColumns parameter to handle DataFrames with different column sets:

# Historical data with fewer columns
historical = spark.createDataFrame([
    (1, "Alice", "Engineering"),
], ["id", "name", "department"])

# New data with additional tracking column
current = spark.createDataFrame([
    (2, "Bob", "Sales", "2024-01-15"),
], ["id", "name", "department", "hire_date"])

# Fill missing columns with null
combined = historical.unionByName(current, allowMissingColumns=True)
combined.show()
# +---+-----+-----------+----------+
# | id| name| department| hire_date|
# +---+-----+-----------+----------+
# |  1|Alice|Engineering|      null|
# |  2|  Bob|      Sales|2024-01-15|
# +---+-----+-----------+----------+

Performance Optimization Strategies

Understanding execution plans reveals the true cost of union operations. Use explain() to see what Spark actually does:

# UNION ALL - no shuffle
union_all_df = df1.union(df2)
print("UNION ALL execution plan:")
union_all_df.explain()
# Shows simple Union operation, no Exchange (shuffle) stages

# UNION with distinct - expensive shuffle
union_distinct_df = df1.union(df2).distinct()
print("\nUNION with distinct execution plan:")
union_distinct_df.explain()
# Shows Union followed by HashAggregate with Exchange (shuffle)

The execution plan for union() shows a straightforward concatenation of partitions. Adding distinct() introduces HashAggregate operations with Exchange stages—Spark’s internal representation of shuffles. On a 100GB dataset, this difference might mean 2 minutes versus 20 minutes.

Choose UNION ALL (union() without distinct()) when:

  • You know the source DataFrames contain no overlapping data
  • Duplicates are acceptable or will be filtered in downstream operations
  • Processing incremental data loads where uniqueness is guaranteed by design
  • Performance is critical and you can handle deduplication at the application level

Choose UNION with distinct when:

  • Data quality requires guaranteed uniqueness
  • The dataset is small enough that shuffle costs are negligible
  • You’re prototyping and correctness trumps performance
  • Downstream systems cannot handle duplicates

Common Pitfalls and Solutions

Schema mismatches are the most frequent issue. Always validate schemas before union operations:

def safe_union(df1, df2):
    """Union with schema validation"""
    if df1.schema != df2.schema:
        raise ValueError(
            f"Schema mismatch:\nDF1: {df1.schema}\nDF2: {df2.schema}"
        )
    return df1.union(df2)

# Data type incompatibility
df_int = spark.createDataFrame([(1, 100)], ["id", "value"])
df_string = spark.createDataFrame([(2, "200")], ["id", "value"])

# This fails with type mismatch
# df_int.union(df_string)

# Solution: explicit casting
from pyspark.sql.types import StringType
df_int_casted = df_int.withColumn("value", col("value").cast(StringType()))
result = df_int_casted.union(df_string)

The deprecated unionAll() method still exists in PySpark for backward compatibility, but it’s identical to union(). Don’t be confused by the name—it doesn’t behave differently:

# These are identical operations
result1 = df1.union(df2)
result2 = df1.unionAll(df2)  # Deprecated, avoid in new code

Null handling works consistently across union operations, but be aware that nulls in different columns are preserved:

df_with_nulls = spark.createDataFrame([
    (1, None, "Engineering"),
    (2, "Bob", None),
], ["id", "name", "department"])

df_complete = spark.createDataFrame([
    (3, "Charlie", "Sales"),
], ["id", "name", "department"])

combined = df_with_nulls.union(df_complete)
combined.show()
# Nulls are preserved exactly as they appear

Quick Reference and Decision Matrix

When to use union() (UNION ALL):

  • Merging partitioned data files from the same source
  • Appending incremental loads to existing datasets
  • Combining regional data known to be disjoint
  • Maximum performance is required

When to use union().distinct() (UNION):

  • Merging overlapping datasets where duplicates must be eliminated
  • Combining data from multiple sources with potential overlap
  • Data quality requirements mandate uniqueness
  • Dataset size makes shuffle operations acceptable

Syntax cheat sheet:

# Basic union (keeps duplicates)
df1.union(df2)

# Union with deduplication
df1.union(df2).distinct()

# Union by column name (recommended)
df1.unionByName(df2)

# Union with schema flexibility
df1.unionByName(df2, allowMissingColumns=True)

# Multiple DataFrames
reduce(DataFrame.union, [df1, df2, df3, df4])

The key to effective union operations in PySpark is understanding that the framework optimizes for distributed performance over SQL convention compliance. Always prefer unionByName() for maintainable code, avoid unnecessary distinct() calls on large datasets, and validate schemas before combining DataFrames in production pipelines.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.