PySpark - Union DataFrames with Different Columns

When working with PySpark, you'll frequently need to combine DataFrames from different sources. The challenge arises when these DataFrames don't share identical schemas. Unlike pandas, which handles...

Key Insights

  • PySpark’s standard union() fails when DataFrames have different columns because it matches by position, not name—use unionByName(allowMissingColumns=True) in Spark 3.1+ to automatically handle schema mismatches with null filling
  • For older Spark versions or more control, manually align schemas by adding missing columns with lit(None) before union, ensuring type consistency across DataFrames
  • Always validate data types when unioning by name, as matching column names with incompatible types will cause runtime errors—cast columns explicitly to a common type before union operations

Understanding the Problem

When working with PySpark, you’ll frequently need to combine DataFrames from different sources. The challenge arises when these DataFrames don’t share identical schemas. Unlike pandas, which handles this gracefully, PySpark’s standard union() method strictly requires DataFrames to have the same number of columns in the same order.

Let’s see what happens when you attempt a basic union with mismatched schemas:

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

spark = SparkSession.builder.appName("UnionExample").getOrCreate()

# DataFrame with columns: id, name, age
df1 = spark.createDataFrame([
    (1, "Alice", 30),
    (2, "Bob", 25)
], ["id", "name", "age"])

# DataFrame with columns: id, name, city
df2 = spark.createDataFrame([
    (3, "Charlie", "NYC"),
    (4, "Diana", "LA")
], ["id", "name", "city"])

# This will raise an error
try:
    result = df1.union(df2)
    result.show()
except Exception as e:
    print(f"Error: {e}")
# Error: Union can only be performed on tables with the same number of columns

The error is clear: union() expects identical column counts and matches columns by position, not by name. This positional matching means even if you have the same columns in different orders, you’ll get unexpected results.

Union vs UnionByName

PySpark provides two primary methods for combining DataFrames vertically:

union() matches columns by position. The first column of df1 aligns with the first column of df2, regardless of column names. This is fast but dangerous when schemas differ.

unionByName() matches columns by name, which is more intuitive and safer when working with DataFrames that might have columns in different orders.

Here’s how they differ with identical schemas:

# Create two DataFrames with same columns but different order
df_a = spark.createDataFrame([
    (1, "Alice", 30)
], ["id", "name", "age"])

df_b = spark.createDataFrame([
    (25, 2, "Bob")
], ["age", "id", "name"])

# Using union() - matches by position (WRONG!)
union_result = df_a.union(df_b)
print("union() result:")
union_result.show()
# +---+-----+---+
# | id| name|age|
# +---+-----+---+
# |  1|Alice| 30|
# | 25|    2|Bob|  <- Wrong! Age in id column
# +---+-----+---+

# Using unionByName() - matches by name (CORRECT)
union_by_name_result = df_a.unionByName(df_b)
print("unionByName() result:")
union_by_name_result.show()
# +---+-----+---+
# | id| name|age|
# +---+-----+---+
# |  1|Alice| 30|
# |  2|  Bob| 25|  <- Correct alignment
# +---+-----+---+

The Modern Solution: unionByName with allowMissingColumns

Spark 3.1.0 introduced the allowMissingColumns parameter to unionByName(), which elegantly solves the mismatched schema problem. When set to True, it fills missing columns with null values.

from pyspark.sql.functions import lit

# DataFrame with columns: id, name, age
df1 = spark.createDataFrame([
    (1, "Alice", 30),
    (2, "Bob", 25)
], ["id", "name", "age"])

# DataFrame with columns: id, name, city
df2 = spark.createDataFrame([
    (3, "Charlie", "NYC"),
    (4, "Diana", "LA")
], ["id", "name", "city"])

# Union with allowMissingColumns=True
result = df1.unionByName(df2, allowMissingColumns=True)
result.show()

# +---+-------+----+----+
# | id|   name| age|city|
# +---+-------+----+----+
# |  1|  Alice|  30|null|
# |  2|    Bob|  25|null|
# |  3|Charlie|null| NYC|
# |  4|  Diana|null|  LA|
# +---+-------+----+----+

This is the cleanest approach and should be your default choice if you’re on Spark 3.1 or later. The resulting DataFrame contains all columns from both DataFrames, with nulls filling gaps where data doesn’t exist.

Manual Column Alignment for Older Spark Versions

If you’re working with Spark versions before 3.1.0, you’ll need to manually align schemas before performing the union. The strategy is to add missing columns to each DataFrame with null values.

from pyspark.sql.functions import lit

def add_missing_columns(df, all_columns):
    """Add missing columns to DataFrame with null values"""
    existing_columns = df.columns
    for col_name in all_columns:
        if col_name not in existing_columns:
            df = df.withColumn(col_name, lit(None))
    return df

def union_dataframes(df1, df2):
    """Union two DataFrames with different columns"""
    # Get all unique column names
    all_columns = list(set(df1.columns + df2.columns))
    
    # Add missing columns to both DataFrames
    df1_aligned = add_missing_columns(df1, all_columns)
    df2_aligned = add_missing_columns(df2, all_columns)
    
    # Select columns in same order
    df1_aligned = df1_aligned.select(all_columns)
    df2_aligned = df2_aligned.select(all_columns)
    
    # Now union works
    return df1_aligned.union(df2_aligned)

# Using our custom function
result = union_dataframes(df1, df2)
result.show()

This approach gives you full control over the process and works with any Spark version. It’s also useful when you need to add default values other than null.

Handling Data Type Mismatches

A common pitfall occurs when column names match but data types differ. unionByName() will fail in this scenario because Spark can’t automatically reconcile type conflicts.

# DataFrame with age as Integer
df1 = spark.createDataFrame([
    (1, "Alice", 30)
], ["id", "name", "age"])

# DataFrame with age as String
df2 = spark.createDataFrame([
    (2, "Bob", "25")
], ["id", "name", "age"])

# This will fail
try:
    result = df1.unionByName(df2)
    result.show()
except Exception as e:
    print(f"Error: {e}")

The solution is to explicitly cast columns to a common type:

from pyspark.sql.functions import col

# Cast age to string in df1 to match df2
df1_casted = df1.withColumn("age", col("age").cast(StringType()))

# Now union works
result = df1_casted.unionByName(df2)
result.show()

# +---+-----+---+
# | id| name|age|
# +---+-----+---+
# |  1|Alice| 30|
# |  2|  Bob| 25|
# +---+-----+---+

When dealing with multiple DataFrames, determine a target schema first and cast all DataFrames to match it before union operations.

Best Practices and Performance Considerations

Choose the Right Method: Use unionByName(allowMissingColumns=True) for Spark 3.1+. It’s concise and handles edge cases well. For older versions, use manual alignment with proper error handling.

Schema Validation: Always validate schemas before union operations in production code. Check both column names and data types:

def safe_union_by_name(dataframes):
    """
    Union multiple DataFrames with schema validation and alignment.
    Works with Spark 3.1+ and handles type mismatches.
    """
    if not dataframes:
        raise ValueError("No DataFrames provided")
    
    if len(dataframes) == 1:
        return dataframes[0]
    
    # Collect all unique columns with their types
    schema_map = {}
    for df in dataframes:
        for field in df.schema.fields:
            if field.name in schema_map:
                # Check type consistency
                if schema_map[field.name] != field.dataType:
                    print(f"Warning: Column '{field.name}' has inconsistent types")
                    # Use string as common type for conflicts
                    schema_map[field.name] = StringType()
            else:
                schema_map[field.name] = field.dataType
    
    # Cast all DataFrames to consistent schema
    aligned_dfs = []
    for df in dataframes:
        for col_name, target_type in schema_map.items():
            if col_name in df.columns:
                current_type = df.schema[col_name].dataType
                if current_type != target_type:
                    df = df.withColumn(col_name, col(col_name).cast(target_type))
            else:
                df = df.withColumn(col_name, lit(None).cast(target_type))
        aligned_dfs.append(df)
    
    # Union all DataFrames
    result = aligned_dfs[0]
    for df in aligned_dfs[1:]:
        result = result.unionByName(df)
    
    return result

# Example usage
df1 = spark.createDataFrame([(1, "Alice", 30)], ["id", "name", "age"])
df2 = spark.createDataFrame([(2, "Bob", "NYC")], ["id", "name", "city"])
df3 = spark.createDataFrame([(3, "Charlie", "25", "LA")], ["id", "name", "age", "city"])

result = safe_union_by_name([df1, df2, df3])
result.show()

Performance Implications: Adding many null columns doesn’t significantly impact performance in most cases, as Spark optimizes null storage. However, if you’re adding hundreds of columns, consider whether you actually need all of them in the final result. Use select() to keep only necessary columns after union.

Null Handling: Remember that missing columns become null. Plan your downstream operations accordingly. Use fillna() or coalesce() to provide defaults where appropriate.

The ability to union DataFrames with different schemas is essential for real-world data engineering where sources evolve independently. With these techniques, you can handle schema mismatches confidently and build robust data pipelines.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.