PySpark - Write to Hive Table

Before writing to Hive tables, enable Hive support in your SparkSession. This requires the Hive metastore configuration and appropriate warehouse directory permissions.

Key Insights

  • PySpark provides multiple methods to write data to Hive tables including saveAsTable(), insertInto(), and direct SQL commands, each suited for different use cases and partition handling requirements
  • Understanding partition modes (static vs dynamic), write modes (append, overwrite, error), and bucketing strategies is critical for optimizing write performance and query efficiency in production environments
  • Proper Hive metastore configuration and format selection (Parquet, ORC) directly impact storage efficiency and downstream query performance, with Parquet generally preferred for analytical workloads

Configuring Hive Support in PySpark

Before writing to Hive tables, enable Hive support in your SparkSession. This requires the Hive metastore configuration and appropriate warehouse directory permissions.

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("HiveWriteExample") \
    .config("spark.sql.warehouse.dir", "/user/hive/warehouse") \
    .config("hive.metastore.uris", "thrift://metastore-host:9083") \
    .enableHiveSupport() \
    .getOrCreate()

# Verify Hive support is enabled
print(f"Hive support enabled: {spark.conf.get('spark.sql.catalogImplementation')}")

For local development without a remote metastore:

spark = SparkSession.builder \
    .appName("HiveLocalExample") \
    .config("spark.sql.warehouse.dir", "file:///tmp/spark-warehouse") \
    .enableHiveSupport() \
    .getOrCreate()

Writing with saveAsTable()

The saveAsTable() method creates or replaces a Hive table directly from a DataFrame. This is the most straightforward approach for new tables.

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from datetime import date

# Sample data
data = [
    ("user_001", "New York", date(2024, 1, 15), 1200),
    ("user_002", "San Francisco", date(2024, 1, 15), 950),
    ("user_003", "New York", date(2024, 1, 16), 1100),
    ("user_004", "Chicago", date(2024, 1, 16), 800)
]

schema = StructType([
    StructField("user_id", StringType(), False),
    StructField("city", StringType(), False),
    StructField("event_date", DateType(), False),
    StructField("revenue", IntegerType(), False)
])

df = spark.createDataFrame(data, schema)

# Write as managed table
df.write \
    .mode("overwrite") \
    .format("parquet") \
    .saveAsTable("sales.user_events")

For external tables with specific locations:

df.write \
    .mode("overwrite") \
    .format("parquet") \
    .option("path", "/data/external/user_events") \
    .saveAsTable("sales.user_events_external")

Write Modes and Their Implications

Understanding write modes prevents data loss and manages table state correctly.

# Append: Add new records without touching existing data
df.write \
    .mode("append") \
    .saveAsTable("sales.user_events")

# Overwrite: Replace all existing data
df.write \
    .mode("overwrite") \
    .saveAsTable("sales.user_events")

# ErrorIfExists: Fail if table exists (default)
df.write \
    .mode("error") \
    .saveAsTable("sales.user_events")

# Ignore: Do nothing if table exists
df.write \
    .mode("ignore") \
    .saveAsTable("sales.user_events")

For selective overwrites with partitions:

spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")

df.write \
    .mode("overwrite") \
    .partitionBy("event_date") \
    .saveAsTable("sales.user_events")

Working with Partitioned Tables

Partitioning improves query performance by reducing data scanned. Dynamic partitioning automatically creates partitions based on data values.

# Static partitioning - specify partition values explicitly
df.filter(df.event_date == date(2024, 1, 15)) \
    .write \
    .mode("append") \
    .insertInto("sales.user_events", overwrite=False)

# Dynamic partitioning - automatic partition creation
spark.conf.set("hive.exec.dynamic.partition", "true")
spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")

df.write \
    .mode("append") \
    .partitionBy("event_date", "city") \
    .format("parquet") \
    .saveAsTable("sales.user_events_partitioned")

Multi-level partitioning for large datasets:

from pyspark.sql.functions import year, month, dayofmonth

df_with_parts = df \
    .withColumn("year", year("event_date")) \
    .withColumn("month", month("event_date")) \
    .withColumn("day", dayofmonth("event_date"))

df_with_parts.write \
    .mode("overwrite") \
    .partitionBy("year", "month", "day") \
    .format("parquet") \
    .saveAsTable("sales.user_events_hierarchical")

Using insertInto() for Schema Enforcement

The insertInto() method writes data to an existing table with strict schema validation, making it safer for production pipelines.

# Create table first with explicit schema
spark.sql("""
    CREATE TABLE IF NOT EXISTS sales.user_events_strict (
        user_id STRING,
        city STRING,
        event_date DATE,
        revenue INT
    )
    PARTITIONED BY (event_date)
    STORED AS PARQUET
""")

# Insert data - schema must match exactly
df.write \
    .mode("append") \
    .insertInto("sales.user_events_strict")

# Overwrite specific partitions
df.write \
    .mode("overwrite") \
    .insertInto("sales.user_events_strict", overwrite=True)

Bucketing for Join Optimization

Bucketing pre-shuffles data into fixed buckets, eliminating shuffle operations during joins on bucketed columns.

df.write \
    .mode("overwrite") \
    .bucketBy(10, "user_id") \
    .sortBy("event_date") \
    .format("parquet") \
    .saveAsTable("sales.user_events_bucketed")

# Verify bucketing
spark.sql("DESCRIBE FORMATTED sales.user_events_bucketed").show(100, False)

Combined partitioning and bucketing:

df.write \
    .mode("overwrite") \
    .partitionBy("event_date") \
    .bucketBy(10, "user_id") \
    .sortBy("revenue") \
    .format("parquet") \
    .saveAsTable("sales.user_events_optimized")

Direct SQL Approach

For complex scenarios or existing SQL workflows, use direct SQL statements with temporary views.

df.createOrReplaceTempView("temp_user_events")

# Create table from query
spark.sql("""
    CREATE TABLE sales.user_events_summary
    STORED AS PARQUET
    AS
    SELECT 
        city,
        event_date,
        COUNT(DISTINCT user_id) as unique_users,
        SUM(revenue) as total_revenue
    FROM temp_user_events
    GROUP BY city, event_date
""")

# Insert with transformations
spark.sql("""
    INSERT INTO TABLE sales.user_events_strict
    PARTITION (event_date='2024-01-15')
    SELECT user_id, city, revenue
    FROM temp_user_events
    WHERE event_date = '2024-01-15'
""")

Compression and Storage Formats

Choose appropriate compression codecs and storage formats based on workload characteristics.

# Parquet with Snappy compression (default, balanced)
df.write \
    .mode("overwrite") \
    .format("parquet") \
    .option("compression", "snappy") \
    .saveAsTable("sales.events_snappy")

# ORC with Zlib compression (better compression ratio)
df.write \
    .mode("overwrite") \
    .format("orc") \
    .option("compression", "zlib") \
    .saveAsTable("sales.events_orc")

# Parquet with GZIP (maximum compression, slower reads)
df.write \
    .mode("overwrite") \
    .format("parquet") \
    .option("compression", "gzip") \
    .saveAsTable("sales.events_gzip")

Performance Considerations and Best Practices

Optimize write operations with proper partitioning and parallelism tuning.

# Repartition before write to control file count
df.repartition(10, "event_date") \
    .write \
    .mode("overwrite") \
    .partitionBy("event_date") \
    .saveAsTable("sales.user_events_optimized")

# Coalesce for small datasets to reduce small files
df.coalesce(1) \
    .write \
    .mode("overwrite") \
    .saveAsTable("sales.user_events_single")

# Enable statistics collection
spark.sql("ANALYZE TABLE sales.user_events COMPUTE STATISTICS")
spark.sql("ANALYZE TABLE sales.user_events COMPUTE STATISTICS FOR COLUMNS user_id, revenue")

Monitor write performance:

import time

start_time = time.time()

df.write \
    .mode("overwrite") \
    .partitionBy("event_date") \
    .format("parquet") \
    .saveAsTable("sales.user_events")

duration = time.time() - start_time
record_count = df.count()

print(f"Wrote {record_count} records in {duration:.2f} seconds")
print(f"Throughput: {record_count/duration:.2f} records/second")

These patterns cover the essential techniques for writing data to Hive tables in production environments. Select the appropriate method based on your schema requirements, partition strategy, and performance needs.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.