Spark Scala - DataFrame Union

Union operations combine DataFrames vertically—stacking rows from multiple DataFrames into a single result. This differs fundamentally from join operations, which combine DataFrames horizontally...

Key Insights

  • Use unionByName() instead of union() when working with DataFrames from different sources—it matches columns by name rather than position, preventing subtle data corruption bugs.
  • The allowMissingColumns parameter in Spark 3.1+ eliminates boilerplate code for schema alignment, automatically filling missing columns with nulls.
  • Avoid calling union() in loops; use reduce() on a sequence of DataFrames to build an optimized single-stage execution plan.

Introduction to DataFrame Union Operations

Union operations combine DataFrames vertically—stacking rows from multiple DataFrames into a single result. This differs fundamentally from join operations, which combine DataFrames horizontally based on matching keys.

You’ll reach for union operations in several common scenarios: merging daily or hourly data files into a consolidated dataset, combining results from multiple data sources with identical schemas, aggregating partitioned data that was processed separately, or implementing fan-out/fan-in processing patterns.

The key distinction to remember: unions append rows, joins append columns. If you’re trying to enrich records with additional fields from another dataset, you want a join. If you’re stacking similar records together, you want a union.

Basic Union Operations

Spark provides the union() method for combining DataFrames. Prior to Spark 2.0, there was also unionAll(), but these are now equivalent—both preserve duplicates. The naming change aligned Spark with standard SQL semantics where UNION ALL keeps duplicates and UNION removes them.

Here’s a straightforward example:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .appName("DataFrame Union Examples")
  .master("local[*]")
  .getOrCreate()

import spark.implicits._

val salesQ1 = Seq(
  ("2024-01-15", "Widget A", 100),
  ("2024-02-20", "Widget B", 150),
  ("2024-03-10", "Widget A", 200)
).toDF("sale_date", "product", "quantity")

val salesQ2 = Seq(
  ("2024-04-05", "Widget C", 75),
  ("2024-05-18", "Widget A", 300),
  ("2024-06-22", "Widget B", 125)
).toDF("sale_date", "product", "quantity")

val allSales = salesQ1.union(salesQ2)
allSales.show()

Output:

+----------+---------+--------+
| sale_date|  product|quantity|
+----------+---------+--------+
|2024-01-15| Widget A|     100|
|2024-02-20| Widget B|     150|
|2024-03-10| Widget A|     200|
|2024-04-05| Widget C|      75|
|2024-05-18| Widget A|     300|
|2024-06-22| Widget B|     125|
+----------+---------+--------+

Critical warning: union() matches columns by position, not by name. If your DataFrames have columns in different orders, you’ll get silently corrupted data. This is one of the most common Spark bugs I encounter in production code.

// Dangerous: columns in different order
val dfA = Seq(("Alice", 30)).toDF("name", "age")
val dfB = Seq((25, "Bob")).toDF("age", "name")

// This produces WRONG results - age and name values are swapped for dfB
val broken = dfA.union(dfB)
broken.show()

Output:

+-----+---+
| name|age|
+-----+---+
|Alice| 30|
|   25|Bob|
+-----+---+

The data is silently corrupted. No error, no warning. This is why unionByName() exists.

Union by Column Name with unionByName()

Spark 2.3 introduced unionByName() to solve the column ordering problem. This method matches columns by their names rather than positions, making it safe for DataFrames from different sources or processing pipelines.

val employees1 = Seq(
  ("E001", "Alice", "Engineering"),
  ("E002", "Bob", "Marketing")
).toDF("employee_id", "name", "department")

val employees2 = Seq(
  ("Sales", "Charlie", "E003"),
  ("Engineering", "Diana", "E004")
).toDF("department", "name", "employee_id")

// Safe: matches by column name
val allEmployees = employees1.unionByName(employees2)
allEmployees.show()

Output:

+-----------+-------+-----------+
|employee_id|   name| department|
+-----------+-------+-----------+
|       E001|  Alice|Engineering|
|       E002|    Bob|  Marketing|
|       E003|Charlie|      Sales|
|       E004|  Diana|Engineering|
+-----------+-------+-----------+

The data is correctly aligned regardless of column order in the source DataFrames. Make unionByName() your default choice unless you have a specific reason to use positional matching.

Handling Schema Mismatches

Real-world data pipelines frequently encounter schema evolution. A new column gets added to one data source, or legacy data lacks fields that newer records contain. Spark 3.1 introduced the allowMissingColumns parameter to handle this gracefully.

val currentData = Seq(
  ("P001", "Laptop", 999.99, "Electronics"),
  ("P002", "Mouse", 29.99, "Electronics")
).toDF("product_id", "name", "price", "category")

val legacyData = Seq(
  ("P003", "Keyboard", 79.99),
  ("P004", "Monitor", 299.99)
).toDF("product_id", "name", "price")

// Spark 3.1+: automatically fills missing columns with null
val combined = currentData.unionByName(legacyData, allowMissingColumns = true)
combined.show()

Output:

+----------+--------+------+-----------+
|product_id|    name| price|   category|
+----------+--------+------+-----------+
|      P001|  Laptop|999.99|Electronics|
|      P002|   Mouse| 29.99|Electronics|
|      P003|Keyboard| 79.99|       null|
|      P004| Monitor|299.99|       null|
+----------+--------+------+-----------+

For Spark versions before 3.1, or when you need more control over default values, manually align schemas:

import org.apache.spark.sql.functions.{col, lit}
import org.apache.spark.sql.types.StringType

def alignSchema(df: org.apache.spark.sql.DataFrame, 
                targetColumns: Seq[String]): org.apache.spark.sql.DataFrame = {
  val currentColumns = df.columns.toSet
  val selectExprs = targetColumns.map { colName =>
    if (currentColumns.contains(colName)) col(colName)
    else lit(null).cast(StringType).as(colName)
  }
  df.select(selectExprs: _*)
}

val targetSchema = Seq("product_id", "name", "price", "category", "supplier")

val alignedCurrent = alignSchema(currentData, targetSchema)
val alignedLegacy = alignSchema(legacyData, targetSchema)

val result = alignedCurrent.union(alignedLegacy)
result.show()

This approach gives you explicit control over column types and default values when nulls aren’t appropriate.

Removing Duplicates After Union

Unlike SQL’s UNION (which removes duplicates by default), Spark’s union() and unionByName() always preserve duplicates—they behave like SQL’s UNION ALL. To remove duplicates, apply distinct() or dropDuplicates() after the union.

val batch1 = Seq(
  ("TX001", "2024-01-15", 100.00),
  ("TX002", "2024-01-16", 200.00),
  ("TX003", "2024-01-17", 150.00)
).toDF("transaction_id", "date", "amount")

val batch2 = Seq(
  ("TX003", "2024-01-17", 150.00),  // Duplicate
  ("TX004", "2024-01-18", 300.00),
  ("TX005", "2024-01-19", 250.00)
).toDF("transaction_id", "date", "amount")

val withDuplicates = batch1.union(batch2)
println(s"With duplicates: ${withDuplicates.count()} rows")

val deduplicated = batch1.union(batch2).distinct()
println(s"After distinct: ${deduplicated.count()} rows")

// For large DataFrames, dropDuplicates on key columns is more efficient
val deduplicatedByKey = batch1.union(batch2).dropDuplicates("transaction_id")
println(s"After dropDuplicates: ${deduplicatedByKey.count()} rows")

Use distinct() when you need exact row-level deduplication. Use dropDuplicates(columns) when you have a natural key and want better performance on large datasets—it avoids comparing all columns.

Performance Considerations

Union operations have significant performance implications that aren’t immediately obvious.

Partition explosion: Each union preserves the partitions of both input DataFrames. Unioning ten DataFrames with 200 partitions each gives you 2,000 partitions. This creates excessive task overhead and small file problems.

// After multiple unions, consolidate partitions
val consolidated = largeUnionResult.coalesce(200)

Avoid loops: Building unions iteratively in a loop creates a deeply nested execution plan that’s inefficient to optimize.

// BAD: Creates nested execution plan
var result = dataFrames.head
for (df <- dataFrames.tail) {
  result = result.union(df)
}

// GOOD: Single optimized plan
val dataFrames: Seq[org.apache.spark.sql.DataFrame] = Seq(salesQ1, salesQ2)
val efficientResult = dataFrames.reduce(_ unionByName _)

The reduce approach builds a balanced tree of unions that Spark can optimize more effectively. For very large numbers of DataFrames, consider batching:

def batchedUnion(dfs: Seq[org.apache.spark.sql.DataFrame], 
                 batchSize: Int = 10): org.apache.spark.sql.DataFrame = {
  if (dfs.size <= batchSize) {
    dfs.reduce(_ unionByName _)
  } else {
    val batches = dfs.grouped(batchSize).map(batch => batch.reduce(_ unionByName _)).toSeq
    batchedUnion(batches, batchSize)
  }
}

Summary and Best Practices

Choose your union method based on your specific situation:

Scenario Method
Same schema, same column order union()
Same columns, potentially different order unionByName()
Different schemas, Spark 3.1+ unionByName(allowMissingColumns = true)
Different schemas, older Spark Manual schema alignment + union()

Pitfalls to avoid:

  1. Never use union() with DataFrames from different sources without verifying column order. Use unionByName() instead.
  2. Don’t forget about duplicates. Spark unions preserve them—add distinct() if you need deduplication.
  3. Don’t union in loops. Use reduce() for better execution plans.
  4. Watch partition counts. Apply coalesce() after unioning many DataFrames.
  5. Check types, not just names. unionByName() requires matching column types—a StringType column won’t union with an IntegerType column of the same name.

When in doubt, use unionByName(). The slight overhead of name-based matching is negligible compared to the debugging time you’ll save when column orders inevitably drift between data sources.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.