PySpark - Union and UnionAll DataFrames
Combining DataFrames is a fundamental operation in distributed data processing. Whether you're merging incremental data loads, consolidating multi-source datasets, or appending historical records,...
Key Insights
- PySpark’s
union()method combines DataFrames by column position, not name—column order matters, and mismatched schemas will cause runtime errors or incorrect data alignment. - The
unionAll()method was deprecated in PySpark 2.0 becauseunion()already preserves duplicates (unlike SQL UNION), making two separate methods redundant. - Use
unionByName()for safer schema-flexible unions when combining DataFrames from different sources where column order may vary but names are consistent.
Introduction to DataFrame Combining Operations
Combining DataFrames is a fundamental operation in distributed data processing. Whether you’re merging incremental data loads, consolidating multi-source datasets, or appending historical records, you need reliable methods to stack DataFrames vertically. PySpark provides union operations specifically for this purpose.
The confusion around union() and unionAll() stems from SQL semantics. In standard SQL, UNION removes duplicates while UNION ALL preserves them. However, PySpark’s union() method has always behaved like SQL’s UNION ALL—it keeps all rows, including duplicates. The unionAll() method existed in early PySpark versions but was deprecated in 2.0 because it was functionally identical to union(), creating unnecessary API bloat.
Let’s look at a simple scenario where we need to combine two DataFrames:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
spark = SparkSession.builder.appName("UnionExample").getOrCreate()
# January sales data
jan_data = [
("Alice", "Electronics", 1200),
("Bob", "Clothing", 800)
]
# February sales data
feb_data = [
("Charlie", "Electronics", 1500),
("Diana", "Books", 600)
]
schema = StructType([
StructField("employee", StringType(), True),
StructField("department", StringType(), True),
StructField("sales", IntegerType(), True)
])
df_jan = spark.createDataFrame(jan_data, schema)
df_feb = spark.createDataFrame(feb_data, schema)
We need to combine these monthly snapshots into a single DataFrame for quarterly analysis.
Union() Method - Syntax and Basic Usage
The union() method is straightforward—it appends rows from one DataFrame to another. The syntax is simple:
result_df = df1.union(df2)
Here’s the critical detail: union() combines DataFrames based on column position, not column names. The first column of df2 is appended to the first column of df1, the second to the second, and so on.
Let’s combine our sales data:
# Combine January and February sales
quarterly_sales = df_jan.union(df_feb)
quarterly_sales.show()
Output:
+--------+------------+-----+
|employee| department|sales|
+--------+------------+-----+
| Alice| Electronics| 1200|
| Bob| Clothing| 800|
| Charlie| Electronics| 1500|
| Diana| Books| 600|
+--------+------------+-----+
The operation is efficient because Spark doesn’t shuffle data—it simply creates a logical plan to read from both DataFrames sequentially. You can chain multiple unions:
# Combine three months
df_mar = spark.createDataFrame([("Eve", "Electronics", 1100)], schema)
q1_sales = df_jan.union(df_feb).union(df_mar)
Schema Requirements and Column Matching
Schema compatibility is where developers encounter the most issues. Since union() operates by position, your DataFrames must have the same number of columns. The column names don’t need to match, but the data types should be compatible.
Here’s where things go wrong:
# DataFrame with different column order
wrong_order_data = [
("Electronics", "Frank", 1300), # department and employee swapped
("Books", "Grace", 700)
]
wrong_schema = StructType([
StructField("department", StringType(), True),
StructField("employee", StringType(), True),
StructField("sales", IntegerType(), True)
])
df_wrong = spark.createDataFrame(wrong_order_data, wrong_schema)
# This executes without error but produces incorrect results!
bad_union = df_jan.union(df_wrong)
bad_union.show()
Output:
+--------+------------+-----+
|employee| department|sales|
+--------+------------+-----+
| Alice| Electronics| 1200|
| Bob| Clothing| 800|
|Electronics| Frank| 1300| # Wrong! Department in employee column
| Books| Grace| 700| # Wrong! Department in employee column
+--------+------------+-----+
The solution is unionByName(), which matches columns by their names:
# Safe union by column name
correct_union = df_jan.unionByName(df_wrong)
correct_union.show()
Output:
+--------+------------+-----+
|employee| department|sales|
+--------+------------+-----+
| Alice| Electronics| 1200|
| Bob| Clothing| 800|
| Frank| Electronics| 1300| # Correct!
| Grace| Books| 700| # Correct!
+--------+------------+-----+
For DataFrames with different column sets, use unionByName(allowMissingColumns=True):
# DataFrame with additional column
extended_data = [("Henry", "Electronics", 1400, "Q1")]
extended_schema = StructType([
StructField("employee", StringType(), True),
StructField("department", StringType(), True),
StructField("sales", IntegerType(), True),
StructField("quarter", StringType(), True)
])
df_extended = spark.createDataFrame(extended_data, extended_schema)
# This fills missing columns with null
flexible_union = df_jan.unionByName(df_extended, allowMissingColumns=True)
flexible_union.show()
Handling Duplicates and Data Quality
Unlike SQL’s UNION, PySpark’s union() preserves all rows, including exact duplicates. This is intentional—in big data scenarios, you often want to preserve duplicate records for counting or auditing purposes.
# Create duplicate records
duplicate_data = [
("Alice", "Electronics", 1200), # Exact duplicate from January
("Bob", "Clothing", 800) # Exact duplicate from January
]
df_duplicates = spark.createDataFrame(duplicate_data, schema)
# Union preserves duplicates
with_duplicates = df_jan.union(df_duplicates)
print(f"Row count with duplicates: {with_duplicates.count()}") # 4 rows
with_duplicates.show()
To remove duplicates, use distinct():
# Remove exact duplicate rows
deduplicated = with_duplicates.distinct()
print(f"Row count after deduplication: {deduplicated.count()}") # 2 rows
For more granular control, use dropDuplicates() with specific columns:
# Remove duplicates based only on employee name
unique_employees = with_duplicates.dropDuplicates(["employee"])
unique_employees.show()
UnionAll() - Deprecated Method
The unionAll() method existed in PySpark versions before 2.0. It was functionally identical to union():
# Both produce the same result
result1 = df_jan.union(df_feb)
result2 = df_jan.unionAll(df_feb) # Deprecated, but still works in some versions
# Verify they're identical
print(result1.count() == result2.count()) # True
The deprecation happened because having two methods that do exactly the same thing violated the principle of having one obvious way to accomplish a task. If you encounter unionAll() in legacy code, simply replace it with union()—no behavioral changes needed.
Practical Use Cases and Performance Considerations
Real-world union operations often involve combining data from multiple sources with different formats:
from pyspark.sql.functions import lit
# Read from different sources
csv_df = spark.read.csv("sales_2023.csv", header=True, inferSchema=True)
parquet_df = spark.read.parquet("sales_2024.parquet")
json_df = spark.read.json("sales_2025.json")
# Add source tracking column before union
csv_with_source = csv_df.withColumn("source", lit("csv"))
parquet_with_source = parquet_df.withColumn("source", lit("parquet"))
json_with_source = json_df.withColumn("source", lit("json"))
# Combine all sources
unified_df = csv_with_source \
.unionByName(parquet_with_source) \
.unionByName(json_with_source)
# Repartition for optimal processing
optimized_df = unified_df.repartition(200)
Performance considerations:
- Partition management: Unioning many small DataFrames creates many small partitions. Use
coalesce()orrepartition()afterward. - Lazy evaluation: Union operations are lazy—no data movement happens until an action is triggered.
- Broadcast joins: If you’re filtering after union, push down filters when possible to reduce data movement.
# Efficient: filter before union when possible
filtered_jan = df_jan.filter("sales > 1000")
filtered_feb = df_feb.filter("sales > 1000")
high_sales = filtered_jan.union(filtered_feb)
Common Pitfalls and Troubleshooting
Type mismatches cause runtime errors or silent data corruption. Always validate and cast types before union:
from pyspark.sql.functions import col
# DataFrame with sales as string instead of integer
string_sales_data = [("Ian", "Books", "950")]
string_schema = StructType([
StructField("employee", StringType(), True),
StructField("department", StringType(), True),
StructField("sales", StringType(), True) # String instead of Int
])
df_string_sales = spark.createDataFrame(string_sales_data, string_schema)
# Cast to correct type before union
df_string_sales_fixed = df_string_sales.withColumn(
"sales",
col("sales").cast(IntegerType())
)
# Now safe to union
safe_union = df_jan.union(df_string_sales_fixed)
Nullable column handling requires attention. If one DataFrame has a non-nullable column and another has the same column as nullable, Spark will make the result nullable:
# Explicitly handle nullability in schema
consistent_schema = StructType([
StructField("employee", StringType(), True),
StructField("department", StringType(), True),
StructField("sales", IntegerType(), True) # Consistently nullable
])
Memory issues with large unions can be mitigated by writing intermediate results:
# For very large unions, checkpoint to break lineage
large_union = df1.union(df2).union(df3).union(df4)
large_union.checkpoint() # Requires checkpoint directory configuration
Union operations are fundamental to PySpark data pipelines. Master column matching behavior, use unionByName() when dealing with heterogeneous sources, and always validate schemas before combining DataFrames. The simplicity of the API belies the importance of understanding position-based versus name-based semantics—get this wrong, and you’ll corrupt your data silently.