How to Read CSV Files in PySpark

CSV files refuse to die. Despite better alternatives like Parquet, Avro, and ORC, you'll encounter CSV data constantly in real-world data engineering. Vendors export it, analysts create it, legacy...

Key Insights

  • Always define explicit schemas in production PySpark jobs—schema inference requires a full data scan that doubles your read time and can produce inconsistent types across runs.
  • Use mode="PERMISSIVE" with columnNameOfCorruptRecord to capture malformed rows for debugging rather than silently dropping data or failing entire jobs.
  • Treat CSV as an ingestion format, not a storage format—read once, validate, then convert to Parquet or Delta Lake for all downstream processing.

Introduction

CSV files refuse to die. Despite better alternatives like Parquet, Avro, and ORC, you’ll encounter CSV data constantly in real-world data engineering. Vendors export it, analysts create it, legacy systems depend on it. PySpark handles CSV ingestion well, but the default settings will bite you in production if you’re not careful.

This guide covers everything you need to read CSV files reliably in PySpark, from basic syntax to production-hardened patterns. I’ll focus on the DataFrame API through SparkSession, which is the standard approach for PySpark 2.0 and later.

Basic CSV Reading with spark.read.csv()

Every PySpark application starts with a SparkSession. This is your entry point to all DataFrame operations, including file reads.

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("CSVReader") \
    .getOrCreate()

# Simplest possible read
df = spark.read.csv("data/sales.csv")
df.show()

This works, but it’s almost never what you want. Without configuration, PySpark treats every column as a string, ignores headers, and uses comma as the default delimiter. Your first row becomes data instead of column names, and you get generic column names like _c0, _c1, _c2.

The format() method provides an alternative syntax that some teams prefer for consistency across file types:

df = spark.read.format("csv").load("data/sales.csv")

Both approaches accept the same options. Use whichever your team standardizes on.

Essential Configuration Options

Real CSV reading requires configuration. Here are the options you’ll use constantly:

df = spark.read.csv(
    "data/sales.csv",
    header=True,           # First row contains column names
    inferSchema=True,      # Attempt to detect column types
    sep=",",               # Field delimiter (default is comma)
    nullValue="NULL",      # String to interpret as null
    encoding="UTF-8",      # Character encoding
    quote='"',             # Quote character for fields containing delimiters
    escape="\\",           # Escape character within quoted fields
    multiLine=True         # Allow fields to span multiple lines
)

df.printSchema()
df.show(5)

The inferSchema=True option is convenient for exploration but problematic in production. Spark must scan your entire dataset to infer types, effectively reading your data twice. It also makes inconsistent decisions—a column with mostly integers but one float value becomes a double, and a column with one malformed row might become a string.

For quick data exploration, this pattern works fine:

# Development/exploration only
df = spark.read.csv(
    "data/sales.csv",
    header=True,
    inferSchema=True
)

For anything running in production, define your schema explicitly.

Defining Explicit Schemas

Explicit schemas eliminate inference overhead and guarantee consistent types across runs. You define schemas using StructType and StructField:

from pyspark.sql.types import (
    StructType, StructField, StringType, IntegerType, 
    DoubleType, DateType, TimestampType
)

sales_schema = StructType([
    StructField("order_id", StringType(), nullable=False),
    StructField("customer_id", StringType(), nullable=True),
    StructField("product_name", StringType(), nullable=True),
    StructField("quantity", IntegerType(), nullable=True),
    StructField("unit_price", DoubleType(), nullable=True),
    StructField("order_date", DateType(), nullable=True),
    StructField("region", StringType(), nullable=True)
])

df = spark.read.csv(
    "data/sales.csv",
    header=True,
    schema=sales_schema,
    dateFormat="yyyy-MM-dd"
)

df.printSchema()

The nullable parameter in StructField doesn’t enforce constraints during reading—Spark will still load null values regardless. It’s metadata that downstream operations and optimizers can use.

For date and timestamp columns, specify the format explicitly:

df = spark.read.csv(
    "data/events.csv",
    header=True,
    schema=event_schema,
    dateFormat="MM/dd/yyyy",
    timestampFormat="yyyy-MM-dd HH:mm:ss"
)

A useful pattern is inferring the schema once during development, then hardcoding it for production:

# Run once to get schema
temp_df = spark.read.csv("data/sales.csv", header=True, inferSchema=True)
print(temp_df.schema.json())  # Copy this output

# Then use the schema directly in production code

Handling Malformed Data

Real-world CSV files contain garbage. Rows have missing columns, fields contain unescaped delimiters, encodings are wrong. PySpark offers three modes for handling parse failures:

  • PERMISSIVE (default): Sets malformed fields to null, optionally captures the raw line
  • DROPMALFORMED: Silently drops rows that can’t be parsed
  • FAILFAST: Throws an exception on the first malformed row

Never use DROPMALFORMED in production—you’ll lose data without knowing it. Use PERMISSIVE with a corrupt record column:

schema_with_corrupt = StructType([
    StructField("order_id", StringType(), nullable=False),
    StructField("customer_id", StringType(), nullable=True),
    StructField("amount", DoubleType(), nullable=True),
    StructField("_corrupt_record", StringType(), nullable=True)  # Must be last
])

df = spark.read.csv(
    "data/messy_orders.csv",
    header=True,
    schema=schema_with_corrupt,
    mode="PERMISSIVE",
    columnNameOfCorruptRecord="_corrupt_record"
)

# Separate good and bad records
good_records = df.filter(df._corrupt_record.isNull()).drop("_corrupt_record")
bad_records = df.filter(df._corrupt_record.isNotNull())

# Log bad records for investigation
bad_records.write.mode("overwrite").text("data/corrupt_records/")

print(f"Loaded {good_records.count()} valid records")
print(f"Found {bad_records.count()} corrupt records")

For pipelines where data quality is critical and you want immediate failure on bad data:

df = spark.read.csv(
    "data/critical_data.csv",
    header=True,
    schema=strict_schema,
    mode="FAILFAST"
)

This fails the job immediately when encountering unparseable rows, which is appropriate for pipelines where partial loads are worse than no loads.

Reading Multiple Files and Partitioned Data

PySpark reads multiple files naturally. Pass a glob pattern or a list of paths:

# Glob pattern - all CSV files in directory
df = spark.read.csv("data/sales/*.csv", header=True, schema=sales_schema)

# Multiple specific files
df = spark.read.csv(
    ["data/sales_2023.csv", "data/sales_2024.csv"],
    header=True,
    schema=sales_schema
)

# Recursive glob for nested directories
df = spark.read.csv("data/sales/**/*.csv", header=True, schema=sales_schema)

For partitioned data organized in Hive-style directories, Spark automatically extracts partition columns:

# Directory structure:
# data/events/year=2024/month=01/data.csv
# data/events/year=2024/month=02/data.csv

df = spark.read.csv(
    "data/events/year=*/month=*/",
    header=True,
    schema=event_schema
)

# 'year' and 'month' columns are automatically added
df.select("year", "month", "event_type").show()

When reading multiple files, ensure they share the same schema. PySpark won’t warn you if column orders differ between files—it reads by position when headers are disabled and by name when headers are enabled.

Performance Considerations

CSV reading performance depends heavily on your configuration choices. Here’s what matters:

Avoid schema inference in production. I’ve mentioned this repeatedly because it’s the most common mistake. Inference scans your entire dataset before reading it. On a 10GB file, you’re reading 20GB.

Partition appropriately. Spark creates one partition per file by default for small files, or splits large files based on spark.sql.files.maxPartitionBytes (default 128MB). For many small files, repartition after reading:

df = spark.read.csv("data/small_files/*.csv", header=True, schema=schema)
df = df.repartition(200)  # Adjust based on cluster size

Convert to Parquet immediately. CSV is a terrible format for analytical queries—no predicate pushdown, no column pruning, no compression efficiency. Read CSV once, then save as Parquet:

# Initial ingestion
raw_df = spark.read.csv(
    "landing/daily_extract.csv",
    header=True,
    schema=defined_schema
)

# Validate and transform
clean_df = raw_df.filter(raw_df._corrupt_record.isNull())

# Save as Parquet for all downstream processing
clean_df.write.mode("overwrite").parquet("processed/daily_extract/")

Use appropriate compression. When writing intermediate results, gzip provides good compression but isn’t splittable. For large files, use snappy or lz4:

df.write.option("compression", "snappy").parquet("output/")

CSV remains unavoidable in data engineering, but it shouldn’t be your storage format. Treat it as an ingestion format: read it once with explicit schemas and proper error handling, validate the data, then convert to something better for all downstream work.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.