PySpark - Write DataFrame to CSV File
Writing a DataFrame to CSV in PySpark is straightforward using the DataFrameWriter API. The basic syntax uses the `write` property followed by format specification and save path.
Key Insights
- PySpark provides multiple methods to write DataFrames to CSV, including
write.csv()with configurable options for headers, delimiters, compression, and partitioning strategies - Writing CSV files in PySpark creates a directory structure with multiple part files by default due to distributed processing; use
coalesce(1)orrepartition(1)to generate a single output file - Performance optimization requires careful consideration of partition count, compression codecs, and write modes to balance between processing speed and output file management
Basic CSV Write Operations
Writing a DataFrame to CSV in PySpark is straightforward using the DataFrameWriter API. The basic syntax uses the write property followed by format specification and save path.
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder \
.appName("CSV Write Example") \
.getOrCreate()
# Create sample DataFrame
data = [
(1, "John Doe", "Engineering", 75000),
(2, "Jane Smith", "Marketing", 68000),
(3, "Mike Johnson", "Sales", 72000),
(4, "Sarah Williams", "Engineering", 82000)
]
columns = ["id", "name", "department", "salary"]
df = spark.createDataFrame(data, columns)
# Basic CSV write
df.write.csv("output/employees", header=True)
This creates a directory named employees containing multiple CSV part files. Each part file represents data from a different partition of the distributed DataFrame.
Configuring CSV Write Options
PySpark offers extensive configuration options for CSV output through the option() method or options() for multiple settings.
# Write with comprehensive options
df.write \
.option("header", "true") \
.option("delimiter", ",") \
.option("quote", '"') \
.option("escape", "\\") \
.option("nullValue", "NULL") \
.option("emptyValue", "") \
.option("dateFormat", "yyyy-MM-dd") \
.option("timestampFormat", "yyyy-MM-dd HH:mm:ss") \
.option("compression", "gzip") \
.mode("overwrite") \
.csv("output/employees_configured")
# Alternative: using options() method
csv_options = {
"header": "true",
"delimiter": "|",
"quote": "'",
"compression": "snappy",
"nullValue": "N/A"
}
df.write \
.options(**csv_options) \
.mode("overwrite") \
.csv("output/employees_pipe_delimited")
The mode() parameter controls behavior when the target path exists:
overwrite: Replaces existing dataappend: Adds to existing dataignore: Skips write if path existserror(default): Throws exception if path exists
Writing Single CSV Files
By default, PySpark writes multiple part files based on DataFrame partitions. To generate a single CSV file, reduce partitions to one.
# Using coalesce - more efficient for reducing partitions
df.coalesce(1) \
.write \
.option("header", "true") \
.mode("overwrite") \
.csv("output/single_file_coalesce")
# Using repartition - triggers full shuffle
df.repartition(1) \
.write \
.option("header", "true") \
.mode("overwrite") \
.csv("output/single_file_repartition")
The difference between coalesce() and repartition():
coalesce(): Minimizes data movement, better for reducing partitionsrepartition(): Performs full shuffle, use when increasing partitions or need even distribution
For large datasets, writing to a single file creates a bottleneck. Consider the tradeoff between convenience and performance.
Handling Special Characters and Encoding
When dealing with data containing special characters, configure escape and quote options appropriately.
# Sample data with special characters
special_data = [
(1, 'Product "A"', "Description with, comma", 29.99),
(2, "Product's B", 'Contains "quotes" and, commas', 39.99),
(3, "Product\nC", "Multi\nline\ndescription", 49.99)
]
special_df = spark.createDataFrame(special_data, ["id", "name", "description", "price"])
# Configure for special characters
special_df.write \
.option("header", "true") \
.option("quote", '"') \
.option("escape", '"') \
.option("quoteAll", "true") \
.option("encoding", "UTF-8") \
.mode("overwrite") \
.csv("output/special_chars")
# Alternative: escape with backslash
special_df.write \
.option("header", "true") \
.option("escape", "\\") \
.option("escapeQuotes", "true") \
.mode("overwrite") \
.csv("output/special_chars_backslash")
Partitioned Writes for Large Datasets
Partitioning output by specific columns creates subdirectories, improving query performance and data organization.
# Create larger dataset with date column
from datetime import datetime, timedelta
date_data = []
base_date = datetime(2024, 1, 1)
for i in range(100):
date = base_date + timedelta(days=i % 30)
dept = ["Engineering", "Marketing", "Sales"][i % 3]
date_data.append((i, f"Employee_{i}", dept, date.strftime("%Y-%m-%d"), 50000 + (i * 100)))
date_df = spark.createDataFrame(date_data, ["id", "name", "department", "date", "salary"])
# Partition by department
date_df.write \
.option("header", "true") \
.partitionBy("department") \
.mode("overwrite") \
.csv("output/partitioned_by_dept")
# Partition by multiple columns
date_df.write \
.option("header", "true") \
.partitionBy("department", "date") \
.mode("overwrite") \
.csv("output/partitioned_multi")
This creates directory structures like:
output/partitioned_by_dept/
├── department=Engineering/
├── department=Marketing/
└── department=Sales/
Compression Options
Compression reduces storage requirements and can improve I/O performance, especially for large datasets.
# Different compression codecs
compression_codecs = ["none", "gzip", "snappy", "lz4", "bzip2"]
for codec in compression_codecs:
df.write \
.option("header", "true") \
.option("compression", codec) \
.mode("overwrite") \
.csv(f"output/compressed_{codec}")
# Check file sizes (pseudo-code for comparison)
# gzip: Best compression ratio, slower
# snappy: Balanced compression and speed
# lz4: Fast compression, moderate ratio
# bzip2: High compression, slowest
Compression tradeoffs:
- gzip: Best compression (~2-3x), moderate speed, widely compatible
- snappy: Fast compression/decompression, moderate ratio (~1.5-2x)
- lz4: Fastest, lower compression ratio
- bzip2: Highest compression, slowest processing
Performance Optimization
Optimize CSV write operations by controlling partition count and leveraging cluster resources effectively.
# Calculate optimal partition count
# Rule of thumb: 128MB - 256MB per partition
import math
total_size_mb = 1024 # Estimate your DataFrame size
target_partition_size_mb = 128
optimal_partitions = math.ceil(total_size_mb / target_partition_size_mb)
df.repartition(optimal_partitions) \
.write \
.option("header", "true") \
.option("compression", "snappy") \
.mode("overwrite") \
.csv("output/optimized")
# For sorted output
df.repartition(4, "department") \
.sortWithinPartitions("id") \
.write \
.option("header", "true") \
.partitionBy("department") \
.mode("overwrite") \
.csv("output/sorted_partitioned")
# Monitor write performance
df.write \
.option("header", "true") \
.option("compression", "snappy") \
.mode("overwrite") \
.csv("output/monitored")
# Check execution plan
df.explain(mode="formatted")
Error Handling and Validation
Implement robust error handling when writing CSV files in production environments.
from pyspark.sql.utils import AnalysisException
import os
def safe_csv_write(df, path, options=None, mode="overwrite"):
"""
Safely write DataFrame to CSV with error handling
"""
default_options = {
"header": "true",
"compression": "snappy",
"encoding": "UTF-8"
}
if options:
default_options.update(options)
try:
# Validate DataFrame
if df.count() == 0:
print(f"Warning: DataFrame is empty, skipping write to {path}")
return False
# Write with options
writer = df.write.options(**default_options).mode(mode)
writer.csv(path)
print(f"Successfully wrote {df.count()} records to {path}")
return True
except AnalysisException as e:
print(f"Analysis error writing to {path}: {str(e)}")
return False
except Exception as e:
print(f"Unexpected error writing to {path}: {str(e)}")
return False
# Usage
success = safe_csv_write(
df,
"output/safe_write",
options={"delimiter": "|", "nullValue": "NULL"}
)
Choose write strategies based on your use case: single files for small datasets or reports, partitioned writes for large analytical workloads, and appropriate compression based on storage and processing constraints. Always test with representative data volumes to validate performance characteristics in your specific environment.