Apache Spark - Apache Hudi Integration

Apache Hudi supports two fundamental table types that determine how data updates are handled. Copy-on-Write (CoW) tables create new versions of files during writes, ensuring optimal read performance...

Key Insights

  • Apache Hudi provides incremental data processing capabilities on top of Spark, enabling efficient upserts, deletes, and record-level changes in data lakes without full table rewrites
  • Hudi’s Copy-on-Write and Merge-on-Read table types offer different trade-offs between write latency and query performance, with CoW optimized for read-heavy workloads and MoR for write-heavy scenarios
  • Integration requires minimal configuration changes to existing Spark jobs while providing ACID transactions, time travel queries, and automatic file management that significantly reduce operational overhead

Understanding Hudi Table Types

Apache Hudi supports two fundamental table types that determine how data updates are handled. Copy-on-Write (CoW) tables create new versions of files during writes, ensuring optimal read performance but with higher write latency. Merge-on-Read (MoR) tables append changes to delta logs and merge them during reads, optimizing for write throughput.

import org.apache.hudi.QuickstartUtils._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._

val tableName = "user_events"
val basePath = "s3://data-lake/hudi/user_events"

// Copy-on-Write configuration
val cowOptions = Map[String, String](
  "hoodie.table.name" -> tableName,
  "hoodie.datasource.write.recordkey.field" -> "user_id",
  "hoodie.datasource.write.partitionpath.field" -> "event_date",
  "hoodie.datasource.write.table.name" -> tableName,
  "hoodie.datasource.write.operation" -> "upsert",
  "hoodie.datasource.write.precombine.field" -> "timestamp",
  "hoodie.upsert.shuffle.parallelism" -> "200",
  "hoodie.insert.shuffle.parallelism" -> "200"
)

// Merge-on-Read configuration
val morOptions = cowOptions ++ Map(
  "hoodie.datasource.write.table.type" -> "MERGE_ON_READ",
  "hoodie.compact.inline" -> "true",
  "hoodie.compact.inline.max.delta.commits" -> "5"
)

Writing Data to Hudi Tables

The core Hudi write operation leverages Spark’s DataFrameWriter API with Hudi-specific options. The framework automatically handles file layout, indexing, and versioning.

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .appName("HudiIntegration")
  .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
  .getOrCreate()

// Sample data
case class UserEvent(
  user_id: String,
  event_type: String,
  timestamp: Long,
  event_date: String,
  properties: Map[String, String]
)

val events = Seq(
  UserEvent("user_001", "login", 1703001600000L, "2023-12-19", Map("ip" -> "192.168.1.1")),
  UserEvent("user_002", "purchase", 1703002800000L, "2023-12-19", Map("amount" -> "99.99")),
  UserEvent("user_003", "logout", 1703004000000L, "2023-12-19", Map("duration" -> "3600"))
)

val df = spark.createDataFrame(events)

df.write
  .format("hudi")
  .options(cowOptions)
  .mode(Overwrite)
  .save(basePath)

Incremental Upserts and Deletes

Hudi’s primary strength lies in efficient incremental processing. Upserts merge new records with existing ones based on the record key, while deletes mark records for removal without immediate physical deletion.

// Upsert new events
val newEvents = Seq(
  UserEvent("user_001", "login", 1703005200000L, "2023-12-19", Map("ip" -> "192.168.1.2")),
  UserEvent("user_004", "signup", 1703006400000L, "2023-12-19", Map("referral" -> "organic"))
)

spark.createDataFrame(newEvents)
  .write
  .format("hudi")
  .options(cowOptions)
  .mode(Append)
  .save(basePath)

// Soft delete records
val deleteKeys = Seq("user_003")
val deleteDF = spark.createDataFrame(
  deleteKeys.map(id => UserEvent(id, "", 0L, "2023-12-19", Map.empty))
)

deleteDF.write
  .format("hudi")
  .options(cowOptions ++ Map(
    "hoodie.datasource.write.operation" -> "delete"
  ))
  .mode(Append)
  .save(basePath)

Querying Hudi Tables

Hudi supports three query types: Snapshot queries return the latest table state, Incremental queries fetch only changed records since a commit, and Read Optimized queries (MoR only) read base files without merging delta logs.

// Snapshot query - latest data
val snapshotDF = spark.read
  .format("hudi")
  .load(basePath)

snapshotDF.createOrReplaceTempView("user_events_snapshot")
spark.sql("SELECT user_id, event_type, timestamp FROM user_events_snapshot WHERE event_date = '2023-12-19'").show()

// Incremental query - changes since specific commit
val commits = spark.read
  .format("hudi")
  .load(basePath)
  .select("_hoodie_commit_time")
  .distinct()
  .orderBy("_hoodie_commit_time")
  .collect()
  .map(_.getString(0))

val beginTime = commits(commits.length - 2)

val incrementalDF = spark.read
  .format("hudi")
  .option("hoodie.datasource.query.type", "incremental")
  .option("hoodie.datasource.read.begin.instanttime", beginTime)
  .load(basePath)

incrementalDF.show()

Time Travel and Point-in-Time Queries

Hudi maintains a complete timeline of commits, enabling queries against historical table states without maintaining separate snapshots.

// Query data as of specific timestamp
val historicalDF = spark.read
  .format("hudi")
  .option("as.of.instant", "20231219120000000")
  .load(basePath)

// Query data as of specific commit
val commitDF = spark.read
  .format("hudi")
  .option("as.of.instant", commits.head)
  .load(basePath)

// List all commits
spark.sql(s"""
  SELECT DISTINCT _hoodie_commit_time 
  FROM hudi_table_changes('$basePath', 'latest_state')
  ORDER BY _hoodie_commit_time DESC
""").show()

Compaction and Cleaning

For MoR tables, compaction merges delta logs with base files to optimize read performance. Cleaning removes old file versions based on retention policies.

import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.HoodieDataSourceHelpers

// Trigger compaction manually
val compactionOptions = morOptions ++ Map(
  "hoodie.compact.inline" -> "false"
)

// Schedule compaction
spark.read.format("hudi").load(basePath).write
  .format("hudi")
  .options(compactionOptions ++ Map(
    "hoodie.datasource.write.operation" -> "compact",
    "hoodie.compact.schedule" -> "true"
  ))
  .mode(Append)
  .save(basePath)

// Configure cleaning
val cleaningOptions = Map(
  "hoodie.clean.automatic" -> "true",
  "hoodie.clean.async" -> "true",
  "hoodie.cleaner.policy" -> "KEEP_LATEST_COMMITS",
  "hoodie.cleaner.commits.retained" -> "10"
)

Optimizing Hudi Performance

Performance tuning requires balancing parallelism, file sizes, and indexing strategies based on workload characteristics.

val optimizedOptions = Map(
  // Parallelism
  "hoodie.upsert.shuffle.parallelism" -> "400",
  "hoodie.insert.shuffle.parallelism" -> "400",
  "hoodie.bulkinsert.shuffle.parallelism" -> "400",
  
  // File sizing
  "hoodie.parquet.max.file.size" -> "134217728", // 128MB
  "hoodie.parquet.small.file.limit" -> "104857600", // 100MB
  
  // Indexing
  "hoodie.index.type" -> "BLOOM",
  "hoodie.bloom.index.parallelism" -> "200",
  
  // Memory
  "hoodie.memory.merge.max.size" -> "1073741824", // 1GB
  
  // Metadata
  "hoodie.metadata.enable" -> "true",
  "hoodie.metadata.index.column.stats.enable" -> "true"
)

df.write
  .format("hudi")
  .options(cowOptions ++ optimizedOptions)
  .mode(Append)
  .save(basePath)

Partition Pruning and Predicate Pushdown

Hudi leverages Spark’s optimization capabilities while adding its own metadata-based pruning for improved query performance.

// Partition pruning automatically applied
val partitionedQuery = spark.read
  .format("hudi")
  .load(basePath)
  .filter("event_date >= '2023-12-19' AND event_date <= '2023-12-20'")
  .filter("event_type = 'purchase'")

// Enable column statistics for better pruning
val statsOptions = Map(
  "hoodie.metadata.index.column.stats.enable" -> "true",
  "hoodie.metadata.index.column.stats.column.list" -> "user_id,event_type,timestamp"
)

Apache Hudi transforms Spark-based data lakes from append-only systems into mutable, transactional platforms. The framework’s automatic file management, incremental processing capabilities, and flexible query modes eliminate much of the complexity traditionally associated with managing large-scale analytical datasets. By choosing appropriate table types and tuning configurations for specific workload patterns, teams can achieve both high write throughput and low query latency while maintaining ACID guarantees across petabyte-scale datasets.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.