How to Write to Parquet in PySpark

Parquet has become the de facto standard for storing analytical data in distributed systems. Its columnar storage format means queries that touch only a subset of columns skip reading irrelevant data...

Key Insights

  • Parquet’s columnar format with built-in compression makes it the default choice for PySpark workloads, but proper partitioning and file sizing are critical for query performance.
  • Always use repartition() or coalesce() before writing to control output file count—the small files problem is the most common performance killer in production Spark jobs.
  • Choose your compression codec based on your read/write ratio: Snappy for balanced workloads, ZSTD for storage-heavy scenarios, and uncompressed for write-heavy pipelines with cheap storage.

Why Parquet Dominates Big Data Storage

Parquet has become the de facto standard for storing analytical data in distributed systems. Its columnar storage format means queries that touch only a subset of columns skip reading irrelevant data entirely. Combined with efficient compression, predicate pushdown, and native schema support, Parquet files typically achieve 75% smaller storage footprints compared to row-based formats like CSV or JSON.

PySpark treats Parquet as a first-class citizen. The integration is tight, the defaults are sensible, and the performance is excellent. But writing Parquet files correctly requires understanding the options available and their implications for downstream consumers.

Basic Parquet Write Operations

The simplest Parquet write is a single method call:

df.write.parquet("/data/output/users")

This creates a directory containing Parquet files partitioned by Spark’s default parallelism. Each executor writes its partition as a separate file, resulting in a number of files equal to your DataFrame’s partition count.

The mode() method controls behavior when the output path already exists:

# Overwrite existing data completely
df.write.mode("overwrite").parquet("/data/output/users")

# Append to existing data
df.write.mode("append").parquet("/data/output/users")

# Silently skip if path exists
df.write.mode("ignore").parquet("/data/output/users")

# Fail with error if path exists (default behavior)
df.write.mode("error").parquet("/data/output/users")

For production pipelines, I recommend being explicit about mode. The default error mode catches accidental overwrites, but overwrite with idempotent job design is often cleaner for batch ETL.

You can also use the more verbose format() syntax, which becomes useful when building dynamic writers:

df.write \
    .format("parquet") \
    .mode("overwrite") \
    .save("/data/output/users")

Partitioning Strategies

Partitioning organizes your Parquet files into a directory structure based on column values. This enables partition pruning—Spark skips entire directories when your query filters on partition columns.

# Partition by date hierarchy
df.write \
    .partitionBy("year", "month", "day") \
    .parquet("/data/output/events")

This creates a structure like:

/data/output/events/
  year=2024/
    month=01/
      day=15/
        part-00000.parquet
        part-00001.parquet

Choose partition columns with low cardinality that appear frequently in WHERE clauses. High-cardinality columns like user_id create too many small directories and hurt performance.

Before writing, control the number of output files per partition using repartition() or coalesce():

# Repartition to exactly 10 files per partition
df.repartition(10, "year", "month") \
    .write \
    .partitionBy("year", "month") \
    .parquet("/data/output/events")

# Reduce partitions without shuffle (only decreases)
df.coalesce(5) \
    .write \
    .parquet("/data/output/events")

Use repartition() when you need to increase file count or ensure even distribution. Use coalesce() when reducing file count from a larger number—it avoids a full shuffle by combining existing partitions.

A practical pattern for time-series data:

from pyspark.sql import functions as F

# Add partition columns derived from timestamp
df_partitioned = df \
    .withColumn("year", F.year("event_timestamp")) \
    .withColumn("month", F.month("event_timestamp")) \
    .withColumn("day", F.dayofmonth("event_timestamp"))

# Write with sensible file count per partition
df_partitioned \
    .repartition("year", "month", "day") \
    .write \
    .partitionBy("year", "month", "day") \
    .mode("overwrite") \
    .parquet("/data/output/events")

Compression Options

Parquet supports multiple compression codecs. Set compression at write time:

df.write \
    .option("compression", "snappy") \
    .parquet("/data/output/users")

Available codecs and their characteristics:

Codec Compression Ratio Speed Use Case
snappy Medium Fast Default choice, balanced
gzip High Slow Archival, storage-constrained
lz4 Low Very Fast CPU-constrained, fast reads
zstd High Medium Best ratio/speed trade-off
none None Fastest Write-heavy, cheap storage

Set the default compression for your entire SparkSession:

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("ParquetWriter") \
    .config("spark.sql.parquet.compression.codec", "zstd") \
    .getOrCreate()

# All subsequent writes use ZSTD
df.write.parquet("/data/output/users")

My recommendation: use ZSTD for most workloads. It offers compression ratios close to GZIP with speeds approaching Snappy. If you’re on an older Spark version without ZSTD support, stick with Snappy.

Schema Management

Parquet embeds schema information in each file. When appending data with a different schema, you need to handle schema evolution explicitly.

Enable schema merging when reading files with evolved schemas:

# Write initial data
df_v1.write.parquet("/data/output/users")

# Later, write data with additional column
df_v2.write.mode("append").parquet("/data/output/users")

# Read with schema merging enabled
df_merged = spark.read \
    .option("mergeSchema", "true") \
    .parquet("/data/output/users")

For stricter control, define and enforce schemas explicitly:

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

user_schema = StructType([
    StructField("id", IntegerType(), nullable=False),
    StructField("name", StringType(), nullable=True),
    StructField("email", StringType(), nullable=True),
    StructField("age", IntegerType(), nullable=True)
])

# Validate DataFrame matches expected schema before write
def validate_and_write(df, schema, path):
    # Select only expected columns in correct order
    df_validated = df.select([field.name for field in schema.fields])
    df_validated.write.mode("overwrite").parquet(path)

validate_and_write(df, user_schema, "/data/output/users")

Enable schema merging at the session level for consistent behavior:

spark.conf.set("spark.sql.parquet.mergeSchema", "true")

Performance Optimization Tips

The small files problem is the most common Parquet performance issue. Too many small files means excessive metadata overhead and poor read performance.

Target file sizes between 128MB and 1GB. Control this with maxRecordsPerFile:

df.write \
    .option("maxRecordsPerFile", 1000000) \
    .parquet("/data/output/events")

This caps each file at 1 million records, preventing oversized files while still allowing Spark to create appropriately sized outputs.

Combine with repartitioning for predictable file counts:

# Calculate optimal partition count based on data size
data_size_bytes = df.rdd.map(lambda row: len(str(row))).sum()
target_file_size = 256 * 1024 * 1024  # 256MB
optimal_partitions = max(1, int(data_size_bytes / target_file_size))

df.repartition(optimal_partitions) \
    .write \
    .mode("overwrite") \
    .parquet("/data/output/events")

For write-heavy workloads, disable Parquet summary metadata files:

spark.conf.set("parquet.enable.summary-metadata", "false")

This eliminates the _metadata and _common_metadata files that can become bottlenecks with many partitions.

Common Pitfalls and Troubleshooting

Overwrite fails with “Cannot overwrite path”: This happens when reading and writing to the same path. Spark can’t overwrite files it’s currently reading. Write to a temporary path first, then rename:

temp_path = "/data/output/users_temp"
final_path = "/data/output/users"

df.write.mode("overwrite").parquet(temp_path)

# Use Hadoop FileSystem to rename
fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
fs.delete(spark._jvm.org.apache.hadoop.fs.Path(final_path), True)
fs.rename(
    spark._jvm.org.apache.hadoop.fs.Path(temp_path),
    spark._jvm.org.apache.hadoop.fs.Path(final_path)
)

Partition columns contain nulls: Null values in partition columns create a __HIVE_DEFAULT_PARTITION__ directory. Filter nulls before writing or use a default value:

df_clean = df.fillna({"region": "unknown"})
df_clean.write.partitionBy("region").parquet("/data/output/sales")

Partition columns missing from data: Partition columns are removed from the Parquet file data since they’re encoded in the directory structure. They’re automatically added back when reading. Don’t expect them in the file schema.

Too many small files after append operations: Each append creates new files. Periodically compact your data:

# Read all existing data
df_existing = spark.read.parquet("/data/output/events")

# Rewrite with optimal file count
df_existing \
    .repartition(100) \
    .write \
    .mode("overwrite") \
    .parquet("/data/output/events")

Parquet writes in PySpark are straightforward once you understand the options. Focus on appropriate partitioning, sensible file sizes, and consistent schema management. These fundamentals will serve you well as your data volumes grow.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.