Apache Spark - Apache Iceberg Integration

Traditional Hive tables struggle with concurrent writes, schema evolution, and partition management at scale. Iceberg solves these problems by maintaining a complete metadata layer that tracks all...

Key Insights

  • Apache Iceberg provides ACID transactions and time-travel capabilities for data lakes, addressing fundamental limitations in traditional Hive table formats that make managing large-scale datasets error-prone and inefficient.
  • Spark’s native Iceberg integration through catalog implementations enables seamless table operations, schema evolution, and partition management without the overhead of maintaining separate metadata stores.
  • Performance optimizations like metadata caching, predicate pushdown, and hidden partitioning in Iceberg significantly reduce query planning time and eliminate common partitioning mistakes that plague production data pipelines.

Why Iceberg Matters for Spark Workloads

Traditional Hive tables struggle with concurrent writes, schema evolution, and partition management at scale. Iceberg solves these problems by maintaining a complete metadata layer that tracks all data files, snapshots, and schema changes. For Spark users, this means reliable concurrent writes, consistent reads, and the ability to query historical data states without maintaining separate backup tables.

The integration eliminates common production issues: no more small file problems from streaming ingestion, no partition overwrites destroying unrelated data, and no schema migration downtime. Iceberg tables are backward compatible with existing Spark code while providing enterprise-grade data management features.

Setting Up Spark with Iceberg

Start by adding Iceberg dependencies to your Spark configuration. The version must match your Spark version:

// build.sbt for Spark 3.4
libraryDependencies ++= Seq(
  "org.apache.iceberg" %% "iceberg-spark-runtime-3.4" % "1.4.3"
)

Configure Spark to use Iceberg catalogs. The catalog determines where table metadata is stored:

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("iceberg-integration") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.local.type", "hadoop") \
    .config("spark.sql.catalog.local.warehouse", "s3://my-bucket/warehouse") \
    .getOrCreate()

For production environments using AWS Glue or Hive Metastore:

spark = SparkSession.builder \
    .config("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \
    .config("spark.sql.catalog.glue_catalog.warehouse", "s3://my-bucket/warehouse") \
    .config("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \
    .getOrCreate()

Creating and Writing Iceberg Tables

Create tables using standard Spark SQL with Iceberg-specific properties:

spark.sql("""
    CREATE TABLE local.db.events (
        event_id STRING,
        user_id LONG,
        event_type STRING,
        timestamp TIMESTAMP,
        properties MAP<STRING, STRING>
    ) USING iceberg
    PARTITIONED BY (days(timestamp))
    TBLPROPERTIES (
        'write.format.default'='parquet',
        'write.parquet.compression-codec'='zstd',
        'write.metadata.delete-after-commit.enabled'='true',
        'write.metadata.previous-versions-max'='5'
    )
""")

Hidden partitioning is a killer feature. Unlike Hive, you don’t include partition columns in your INSERT statements:

from pyspark.sql.functions import current_timestamp, lit
from datetime import datetime

# Generate sample data
events_df = spark.createDataFrame([
    ("evt_001", 12345, "click", datetime(2024, 1, 15, 10, 30)),
    ("evt_002", 12346, "view", datetime(2024, 1, 15, 11, 45)),
    ("evt_003", 12345, "purchase", datetime(2024, 1, 16, 9, 15))
], ["event_id", "user_id", "event_type", "timestamp"])

# Write without specifying partition values
events_df.writeTo("local.db.events").append()

Iceberg automatically handles partitioning based on the table definition. No more df.write.partitionBy() mistakes that create incorrect directory structures.

Schema Evolution Without Downtime

Add columns, rename fields, or change types without rewriting data:

# Add a new column
spark.sql("ALTER TABLE local.db.events ADD COLUMN session_id STRING")

# Rename a column
spark.sql("ALTER TABLE local.db.events RENAME COLUMN event_type TO action_type")

# Change column type (with compatible types)
spark.sql("ALTER TABLE local.db.events ALTER COLUMN user_id TYPE STRING")

Schema evolution is tracked in metadata. Old data files remain unchanged, and Iceberg handles the schema mapping during reads:

# Read works seamlessly across schema versions
df = spark.table("local.db.events")
df.printSchema()

Time Travel and Snapshot Management

Query historical table states using timestamp or snapshot ID:

# Query data as of a specific timestamp
historical_df = spark.read \
    .option("as-of-timestamp", "2024-01-15 12:00:00") \
    .table("local.db.events")

# Query a specific snapshot
snapshot_df = spark.read \
    .option("snapshot-id", 8744736658442914487) \
    .table("local.db.events")

# List all snapshots
spark.sql("SELECT * FROM local.db.events.snapshots").show(truncate=False)

Rollback to previous snapshots when deployments go wrong:

# Rollback to a specific snapshot
spark.sql("""
    CALL local.system.rollback_to_snapshot(
        table => 'db.events',
        snapshot_id => 8744736658442914487
    )
""")

# Rollback to a timestamp
spark.sql("""
    CALL local.system.rollback_to_timestamp(
        table => 'db.events',
        timestamp => TIMESTAMP '2024-01-15 12:00:00'
    )
""")

Incremental Processing Patterns

Process only new data using incremental reads, eliminating the need for complex watermark logic:

# First read - store the snapshot ID
df_initial = spark.read.table("local.db.events")
initial_snapshot = spark.sql(
    "SELECT snapshot_id FROM local.db.events.snapshots ORDER BY committed_at DESC LIMIT 1"
).collect()[0][0]

# Later, read only new data
df_incremental = spark.read \
    .option("start-snapshot-id", initial_snapshot) \
    .table("local.db.events")

# Process and write results
results = df_incremental.groupBy("action_type").count()
results.writeTo("local.db.event_counts").append()

For streaming workloads, use Iceberg as both source and sink:

# Streaming read from Iceberg
stream_df = spark.readStream \
    .format("iceberg") \
    .load("local.db.events")

# Process and write back
query = stream_df \
    .groupBy("user_id") \
    .count() \
    .writeStream \
    .format("iceberg") \
    .outputMode("complete") \
    .option("checkpointLocation", "s3://bucket/checkpoints/user_counts") \
    .toTable("local.db.user_event_counts")

Maintenance Operations

Compact small files generated by streaming or frequent appends:

# Compact data files
spark.sql("""
    CALL local.system.rewrite_data_files(
        table => 'db.events',
        options => map('target-file-size-bytes', '536870912')
    )
""")

Remove old metadata and data files to control storage costs:

# Expire snapshots older than 7 days
spark.sql("""
    CALL local.system.expire_snapshots(
        table => 'db.events',
        older_than => TIMESTAMP '2024-01-08 00:00:00',
        retain_last => 5
    )
""")

# Remove orphaned files
spark.sql("""
    CALL local.system.remove_orphan_files(
        table => 'db.events',
        older_than => TIMESTAMP '2024-01-08 00:00:00'
    )
""")

Performance Considerations

Enable metadata caching to speed up query planning:

spark.conf.set("spark.sql.catalog.local.cache-enabled", "true")
spark.conf.set("spark.sql.catalog.local.cache.expiration-interval-ms", "300000")

Use partition evolution when access patterns change:

# Change partitioning strategy without rewriting data
spark.sql("""
    ALTER TABLE local.db.events 
    REPLACE PARTITION FIELD days(timestamp) WITH hours(timestamp)
""")

Monitor table metadata growth and file counts:

# Check table statistics
spark.sql("SELECT * FROM local.db.events.files").show()
spark.sql("SELECT * FROM local.db.events.partitions").show()

# View table history
spark.sql("SELECT * FROM local.db.events.history").show()

Iceberg’s integration with Spark transforms data lake operations from fragile scripts into reliable, maintainable pipelines. The combination provides production-grade data management without sacrificing the flexibility and scale that made data lakes attractive in the first place.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.