PySpark - Write DataFrame to Parquet
• Parquet's columnar storage format reduces file sizes by 75-90% compared to CSV while enabling faster analytical queries through predicate pushdown and column pruning
Key Insights
• Parquet’s columnar storage format reduces file sizes by 75-90% compared to CSV while enabling faster analytical queries through predicate pushdown and column pruning • PySpark provides fine-grained control over partitioning, compression codecs (snappy, gzip, lzo), and write modes that directly impact query performance and storage costs • Understanding partition strategies and avoiding small file problems (files under 128MB) is critical for maintaining optimal read performance in distributed systems
Basic DataFrame to Parquet Conversion
Writing a PySpark DataFrame to Parquet requires a single method call, but understanding the underlying mechanics ensures optimal performance. The write.parquet() method handles schema inference, data serialization, and file distribution across your cluster.
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
# Initialize Spark session
spark = SparkSession.builder \
.appName("ParquetWriter") \
.config("spark.sql.adaptive.enabled", "true") \
.getOrCreate()
# Create sample DataFrame
data = [
("user_001", "John Doe", 28, 75000.50),
("user_002", "Jane Smith", 34, 92000.75),
("user_003", "Bob Johnson", 45, 105000.00)
]
schema = StructType([
StructField("user_id", StringType(), False),
StructField("name", StringType(), True),
StructField("age", IntegerType(), True),
StructField("salary", DoubleType(), True)
])
df = spark.createDataFrame(data, schema)
# Write to Parquet
df.write.parquet("output/users.parquet")
This creates a directory containing Parquet files with automatic partitioning based on your cluster configuration. Each executor writes its partition to separate files.
Write Modes and Overwrite Behavior
PySpark supports four write modes that control how existing data is handled. Choosing the wrong mode causes data loss or job failures in production environments.
# Append: Add data to existing files
df.write.mode("append").parquet("output/users.parquet")
# Overwrite: Delete existing data and write new data
df.write.mode("overwrite").parquet("output/users.parquet")
# ErrorIfExists: Fail if path exists (default behavior)
df.write.mode("errorifexists").parquet("output/users.parquet")
# Ignore: Skip write if path exists
df.write.mode("ignore").parquet("output/users.parquet")
# Alternative syntax using save()
df.write \
.format("parquet") \
.mode("overwrite") \
.save("output/users.parquet")
The overwrite mode with partitioned data requires careful consideration. By default, it removes the entire directory. Use partitionOverwriteMode to overwrite only specific partitions.
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
df.write \
.mode("overwrite") \
.partitionBy("year", "month") \
.parquet("output/events.parquet")
Compression Codecs and Performance Trade-offs
Parquet supports multiple compression algorithms, each with distinct CPU, storage, and I/O characteristics. The default snappy codec balances compression ratio with speed.
# Snappy: Fast compression/decompression, moderate compression ratio
df.write \
.option("compression", "snappy") \
.parquet("output/users_snappy.parquet")
# Gzip: High compression ratio, slower processing
df.write \
.option("compression", "gzip") \
.parquet("output/users_gzip.parquet")
# LZO: Splittable, good for MapReduce workloads
df.write \
.option("compression", "lzo") \
.parquet("output/users_lzo.parquet")
# Uncompressed: Maximum speed, largest files
df.write \
.option("compression", "uncompressed") \
.parquet("output/users_uncompressed.parquet")
# Zstandard: Better compression than Snappy, faster than Gzip
df.write \
.option("compression", "zstd") \
.parquet("output/users_zstd.parquet")
Benchmark results from a 10GB dataset show typical compression ratios: snappy (40% reduction), gzip (60% reduction), zstd (55% reduction). For write-once, read-many workloads, gzip or zstd provide better storage efficiency despite slower write times.
Partitioning Strategies
Partitioning organizes data into subdirectories based on column values, enabling partition pruning during queries. Poor partitioning creates thousands of small files that degrade performance.
from pyspark.sql.functions import year, month, dayofmonth
# Create sample time-series data
events_data = [
("2024-01-15", "login", "user_001", "US"),
("2024-01-16", "purchase", "user_002", "UK"),
("2024-02-01", "logout", "user_001", "US"),
("2024-02-15", "login", "user_003", "DE")
]
events_df = spark.createDataFrame(
events_data,
["event_date", "event_type", "user_id", "country"]
)
# Single column partitioning
events_df.write \
.partitionBy("country") \
.parquet("output/events_by_country.parquet")
# Multi-level partitioning
events_df.write \
.partitionBy("country", "event_type") \
.parquet("output/events_hierarchical.parquet")
# Date-based partitioning (recommended for time-series)
from pyspark.sql.functions import col, to_date
events_df = events_df.withColumn("date", to_date(col("event_date")))
events_df = events_df.withColumn("year", year(col("date")))
events_df = events_df.withColumn("month", month(col("date")))
events_df.write \
.partitionBy("year", "month") \
.parquet("output/events_by_date.parquet")
Partition cardinality matters. Aim for partitions containing 128MB-1GB of data. Partitioning by high-cardinality columns like user_id creates excessive small files.
Controlling File Size and Parallelism
PySpark’s default parallelism often produces numerous small files. Use coalesce() or repartition() to control output file count.
# Reduce file count without shuffling (for reducing partitions)
df.coalesce(4).write.parquet("output/users_coalesced.parquet")
# Repartition with shuffling (for increasing partitions)
df.repartition(10).write.parquet("output/users_repartitioned.parquet")
# Repartition by column for better data locality
df.repartition("country").write.parquet("output/users_by_country.parquet")
# Control maximum records per file
df.write \
.option("maxRecordsPerFile", 100000) \
.parquet("output/users_controlled.parquet")
# Combine repartitioning with partitionBy
df.repartition(4, "country") \
.write \
.partitionBy("country") \
.parquet("output/users_optimized.parquet")
The maxRecordsPerFile option prevents individual files from growing too large, which helps with memory management during reads.
Schema Evolution and Merging
Parquet supports schema evolution, allowing you to add columns without rewriting existing data. Enable mergeSchema when appending DataFrames with different schemas.
# Initial write
initial_df = spark.createDataFrame(
[("user_001", "John", 28)],
["user_id", "name", "age"]
)
initial_df.write.parquet("output/evolving_schema.parquet")
# Append with additional column
extended_df = spark.createDataFrame(
[("user_002", "Jane", 34, "US")],
["user_id", "name", "age", "country"]
)
extended_df.write \
.mode("append") \
.option("mergeSchema", "true") \
.parquet("output/evolving_schema.parquet")
# Read with merged schema
merged_df = spark.read.parquet("output/evolving_schema.parquet")
merged_df.printSchema()
# Shows all columns with null values for missing data
Schema merging incurs overhead during reads. For production systems, define schemas explicitly and handle evolution through versioned datasets.
Advanced Options and Optimization
Fine-tuning Parquet writes requires understanding row group size, page size, and dictionary encoding parameters.
# Configure row group size (default: 128MB)
spark.conf.set("spark.sql.parquet.block.size", 134217728) # 128MB
# Enable dictionary encoding for low-cardinality columns
df.write \
.option("parquet.enable.dictionary", "true") \
.parquet("output/users_dictionary.parquet")
# Set page size (default: 1MB)
df.write \
.option("parquet.page.size", 1048576) \
.parquet("output/users_pagesize.parquet")
# Complete optimized write configuration
df.repartition(8) \
.write \
.mode("overwrite") \
.option("compression", "snappy") \
.option("maxRecordsPerFile", 500000) \
.option("parquet.block.size", 134217728) \
.partitionBy("year", "month") \
.parquet("output/users_production.parquet")
Monitor output file sizes using df.write.parquet() with subsequent file system checks. Files under 10MB indicate over-partitioning; files over 1GB may cause memory pressure during processing.
Verification and Best Practices
Always verify Parquet writes by reading back data and checking row counts, schemas, and partition structure.
# Verify write operation
written_df = spark.read.parquet("output/users.parquet")
print(f"Row count: {written_df.count()}")
written_df.printSchema()
# Check partition information
written_df.explain(True)
# Inspect file structure programmatically
from pyspark.sql.functions import spark_partition_id, input_file_name
written_df.select(
input_file_name().alias("file_name"),
spark_partition_id().alias("partition_id")
).distinct().show(truncate=False)
For production environments, implement these practices: use dynamic partition overwrite mode, set appropriate compression based on workload, maintain partition sizes between 128MB-1GB, enable adaptive query execution, and monitor small file accumulation through scheduled compaction jobs.