PySpark - Write DataFrame to CSV File

Writing a DataFrame to CSV in PySpark is straightforward using the DataFrameWriter API. The basic syntax uses the `write` property followed by format specification and save path.

Key Insights

  • PySpark provides multiple methods to write DataFrames to CSV, including write.csv() with configurable options for headers, delimiters, compression, and partitioning strategies
  • Writing CSV files in PySpark creates a directory structure with multiple part files by default due to distributed processing; use coalesce(1) or repartition(1) to generate a single output file
  • Performance optimization requires careful consideration of partition count, compression codecs, and write modes to balance between processing speed and output file management

Basic CSV Write Operations

Writing a DataFrame to CSV in PySpark is straightforward using the DataFrameWriter API. The basic syntax uses the write property followed by format specification and save path.

from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder \
    .appName("CSV Write Example") \
    .getOrCreate()

# Create sample DataFrame
data = [
    (1, "John Doe", "Engineering", 75000),
    (2, "Jane Smith", "Marketing", 68000),
    (3, "Mike Johnson", "Sales", 72000),
    (4, "Sarah Williams", "Engineering", 82000)
]

columns = ["id", "name", "department", "salary"]
df = spark.createDataFrame(data, columns)

# Basic CSV write
df.write.csv("output/employees", header=True)

This creates a directory named employees containing multiple CSV part files. Each part file represents data from a different partition of the distributed DataFrame.

Configuring CSV Write Options

PySpark offers extensive configuration options for CSV output through the option() method or options() for multiple settings.

# Write with comprehensive options
df.write \
    .option("header", "true") \
    .option("delimiter", ",") \
    .option("quote", '"') \
    .option("escape", "\\") \
    .option("nullValue", "NULL") \
    .option("emptyValue", "") \
    .option("dateFormat", "yyyy-MM-dd") \
    .option("timestampFormat", "yyyy-MM-dd HH:mm:ss") \
    .option("compression", "gzip") \
    .mode("overwrite") \
    .csv("output/employees_configured")

# Alternative: using options() method
csv_options = {
    "header": "true",
    "delimiter": "|",
    "quote": "'",
    "compression": "snappy",
    "nullValue": "N/A"
}

df.write \
    .options(**csv_options) \
    .mode("overwrite") \
    .csv("output/employees_pipe_delimited")

The mode() parameter controls behavior when the target path exists:

  • overwrite: Replaces existing data
  • append: Adds to existing data
  • ignore: Skips write if path exists
  • error (default): Throws exception if path exists

Writing Single CSV Files

By default, PySpark writes multiple part files based on DataFrame partitions. To generate a single CSV file, reduce partitions to one.

# Using coalesce - more efficient for reducing partitions
df.coalesce(1) \
    .write \
    .option("header", "true") \
    .mode("overwrite") \
    .csv("output/single_file_coalesce")

# Using repartition - triggers full shuffle
df.repartition(1) \
    .write \
    .option("header", "true") \
    .mode("overwrite") \
    .csv("output/single_file_repartition")

The difference between coalesce() and repartition():

  • coalesce(): Minimizes data movement, better for reducing partitions
  • repartition(): Performs full shuffle, use when increasing partitions or need even distribution

For large datasets, writing to a single file creates a bottleneck. Consider the tradeoff between convenience and performance.

Handling Special Characters and Encoding

When dealing with data containing special characters, configure escape and quote options appropriately.

# Sample data with special characters
special_data = [
    (1, 'Product "A"', "Description with, comma", 29.99),
    (2, "Product's B", 'Contains "quotes" and, commas', 39.99),
    (3, "Product\nC", "Multi\nline\ndescription", 49.99)
]

special_df = spark.createDataFrame(special_data, ["id", "name", "description", "price"])

# Configure for special characters
special_df.write \
    .option("header", "true") \
    .option("quote", '"') \
    .option("escape", '"') \
    .option("quoteAll", "true") \
    .option("encoding", "UTF-8") \
    .mode("overwrite") \
    .csv("output/special_chars")

# Alternative: escape with backslash
special_df.write \
    .option("header", "true") \
    .option("escape", "\\") \
    .option("escapeQuotes", "true") \
    .mode("overwrite") \
    .csv("output/special_chars_backslash")

Partitioned Writes for Large Datasets

Partitioning output by specific columns creates subdirectories, improving query performance and data organization.

# Create larger dataset with date column
from datetime import datetime, timedelta

date_data = []
base_date = datetime(2024, 1, 1)

for i in range(100):
    date = base_date + timedelta(days=i % 30)
    dept = ["Engineering", "Marketing", "Sales"][i % 3]
    date_data.append((i, f"Employee_{i}", dept, date.strftime("%Y-%m-%d"), 50000 + (i * 100)))

date_df = spark.createDataFrame(date_data, ["id", "name", "department", "date", "salary"])

# Partition by department
date_df.write \
    .option("header", "true") \
    .partitionBy("department") \
    .mode("overwrite") \
    .csv("output/partitioned_by_dept")

# Partition by multiple columns
date_df.write \
    .option("header", "true") \
    .partitionBy("department", "date") \
    .mode("overwrite") \
    .csv("output/partitioned_multi")

This creates directory structures like:

output/partitioned_by_dept/
├── department=Engineering/
├── department=Marketing/
└── department=Sales/

Compression Options

Compression reduces storage requirements and can improve I/O performance, especially for large datasets.

# Different compression codecs
compression_codecs = ["none", "gzip", "snappy", "lz4", "bzip2"]

for codec in compression_codecs:
    df.write \
        .option("header", "true") \
        .option("compression", codec) \
        .mode("overwrite") \
        .csv(f"output/compressed_{codec}")

# Check file sizes (pseudo-code for comparison)
# gzip: Best compression ratio, slower
# snappy: Balanced compression and speed
# lz4: Fast compression, moderate ratio
# bzip2: High compression, slowest

Compression tradeoffs:

  • gzip: Best compression (~2-3x), moderate speed, widely compatible
  • snappy: Fast compression/decompression, moderate ratio (~1.5-2x)
  • lz4: Fastest, lower compression ratio
  • bzip2: Highest compression, slowest processing

Performance Optimization

Optimize CSV write operations by controlling partition count and leveraging cluster resources effectively.

# Calculate optimal partition count
# Rule of thumb: 128MB - 256MB per partition
import math

total_size_mb = 1024  # Estimate your DataFrame size
target_partition_size_mb = 128
optimal_partitions = math.ceil(total_size_mb / target_partition_size_mb)

df.repartition(optimal_partitions) \
    .write \
    .option("header", "true") \
    .option("compression", "snappy") \
    .mode("overwrite") \
    .csv("output/optimized")

# For sorted output
df.repartition(4, "department") \
    .sortWithinPartitions("id") \
    .write \
    .option("header", "true") \
    .partitionBy("department") \
    .mode("overwrite") \
    .csv("output/sorted_partitioned")

# Monitor write performance
df.write \
    .option("header", "true") \
    .option("compression", "snappy") \
    .mode("overwrite") \
    .csv("output/monitored")

# Check execution plan
df.explain(mode="formatted")

Error Handling and Validation

Implement robust error handling when writing CSV files in production environments.

from pyspark.sql.utils import AnalysisException
import os

def safe_csv_write(df, path, options=None, mode="overwrite"):
    """
    Safely write DataFrame to CSV with error handling
    """
    default_options = {
        "header": "true",
        "compression": "snappy",
        "encoding": "UTF-8"
    }
    
    if options:
        default_options.update(options)
    
    try:
        # Validate DataFrame
        if df.count() == 0:
            print(f"Warning: DataFrame is empty, skipping write to {path}")
            return False
        
        # Write with options
        writer = df.write.options(**default_options).mode(mode)
        writer.csv(path)
        
        print(f"Successfully wrote {df.count()} records to {path}")
        return True
        
    except AnalysisException as e:
        print(f"Analysis error writing to {path}: {str(e)}")
        return False
    except Exception as e:
        print(f"Unexpected error writing to {path}: {str(e)}")
        return False

# Usage
success = safe_csv_write(
    df, 
    "output/safe_write",
    options={"delimiter": "|", "nullValue": "NULL"}
)

Choose write strategies based on your use case: single files for small datasets or reports, partitioned writes for large analytical workloads, and appropriate compression based on storage and processing constraints. Always test with representative data volumes to validate performance characteristics in your specific environment.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.