PySpark - Read Multiple CSV Files
The simplest approach to reading multiple CSV files uses wildcard patterns. PySpark's `spark.read.csv()` method accepts glob patterns to match multiple files simultaneously.
Key Insights
- PySpark provides multiple methods to read CSV files in bulk: wildcard patterns, directory paths, and explicit file lists, each suited for different data organization scenarios
- Understanding schema inference versus explicit schema definition is critical for performance—auto-inference requires scanning files twice while explicit schemas enable immediate processing
- Proper handling of inconsistent schemas, headers, and file encodings across multiple CSVs prevents data quality issues and job failures in production environments
Basic Wildcard Pattern Reading
The simplest approach to reading multiple CSV files uses wildcard patterns. PySpark’s spark.read.csv() method accepts glob patterns to match multiple files simultaneously.
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("MultipleCSVReader") \
.getOrCreate()
# Read all CSV files in a directory
df = spark.read.csv("data/sales/*.csv", header=True, inferSchema=True)
# Read files matching specific pattern
df = spark.read.csv("data/sales/2024-*.csv", header=True, inferSchema=True)
# Read from multiple patterns
df = spark.read.csv(["data/sales/*.csv", "data/archive/*.csv"],
header=True,
inferSchema=True)
df.show()
The asterisk wildcard matches any characters within a directory level. For recursive directory traversal, use double asterisks:
# Read CSV files from all subdirectories
df = spark.read.csv("data/**/*.csv", header=True, inferSchema=True)
Reading from Directory Paths
When all files in a directory should be processed, specify the directory path directly. PySpark automatically reads all files within that directory.
# Read all files in directory (CSV and non-CSV)
df = spark.read \
.option("header", "true") \
.option("inferSchema", "true") \
.csv("data/sales/")
# Alternative with format specification
df = spark.read \
.format("csv") \
.option("header", "true") \
.option("inferSchema", "true") \
.load("data/sales/")
This approach works best when directories contain only CSV files. Mixed file types require filtering or explicit pattern matching.
Explicit Schema Definition
Schema inference is convenient but inefficient for large datasets. Defining schemas explicitly improves performance and ensures data type consistency across files.
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, DateType
# Define explicit schema
schema = StructType([
StructField("order_id", StringType(), False),
StructField("customer_id", IntegerType(), True),
StructField("order_date", DateType(), True),
StructField("amount", DoubleType(), True),
StructField("status", StringType(), True)
])
# Read with explicit schema
df = spark.read \
.schema(schema) \
.option("header", "true") \
.option("dateFormat", "yyyy-MM-dd") \
.csv("data/sales/*.csv")
df.printSchema()
Explicit schemas eliminate the initial file scan required for inference, reducing read time by 30-50% on large datasets.
Handling Files with Different Schemas
Real-world scenarios often involve CSV files with evolving schemas. PySpark provides options to handle schema mismatches.
# Read with schema merging (union of all columns)
df = spark.read \
.option("header", "true") \
.option("mergeSchema", "true") \
.csv("data/sales/*.csv")
# Columns missing in some files will have null values
df.show()
# Alternative: permissive mode for malformed records
df = spark.read \
.option("header", "true") \
.option("mode", "PERMISSIVE") \
.option("columnNameOfCorruptRecord", "_corrupt_record") \
.csv("data/sales/*.csv")
# Filter and analyze corrupt records
corrupt_records = df.filter(df._corrupt_record.isNotNull())
corrupt_records.show(truncate=False)
The PERMISSIVE mode (default) loads malformed rows into a special column, while DROPMALFORMED skips bad rows and FAILFAST aborts on errors.
Reading with Custom Delimiters and Options
CSV files don’t always follow standard conventions. Configure parsing options to handle various formats.
# Read pipe-delimited files with custom options
df = spark.read \
.option("header", "true") \
.option("delimiter", "|") \
.option("quote", "\"") \
.option("escape", "\\") \
.option("nullValue", "NA") \
.option("emptyValue", "") \
.option("ignoreLeadingWhiteSpace", "true") \
.option("ignoreTrailingWhiteSpace", "true") \
.csv("data/exports/*.txt")
# Handle files with different encodings
df = spark.read \
.option("header", "true") \
.option("encoding", "ISO-8859-1") \
.csv("data/legacy/*.csv")
# Skip comment lines
df = spark.read \
.option("header", "true") \
.option("comment", "#") \
.csv("data/annotated/*.csv")
Adding Source File Information
Track which file each record originated from using input file metadata functions.
from pyspark.sql.functions import input_file_name, regexp_extract
# Read files and add source filename
df = spark.read.csv("data/sales/*.csv", header=True, inferSchema=True)
# Add full file path
df_with_source = df.withColumn("source_file", input_file_name())
# Extract just the filename
df_with_filename = df.withColumn(
"filename",
regexp_extract(input_file_name(), r"([^/]+)$", 1)
)
df_with_filename.select("order_id", "amount", "filename").show(truncate=False)
This technique is invaluable for debugging data quality issues and maintaining data lineage.
Performance Optimization Strategies
Optimize multi-file CSV reading for production workloads with these configurations.
# Configure partition settings for large file sets
spark.conf.set("spark.sql.files.maxPartitionBytes", "134217728") # 128 MB
spark.conf.set("spark.sql.files.openCostInBytes", "4194304") # 4 MB
# Read with repartitioning
df = spark.read \
.option("header", "true") \
.schema(schema) \
.csv("data/sales/*.csv") \
.repartition(200)
# Cache if reading multiple times
df.cache()
df.count() # Trigger caching
# Read with specific number of partitions
df = spark.read \
.option("header", "true") \
.csv("data/sales/*.csv") \
.coalesce(50)
For very large file collections, consider using maxFilesPerTrigger in structured streaming mode:
# Streaming read for incremental processing
streaming_df = spark.readStream \
.option("header", "true") \
.option("maxFilesPerTrigger", 10) \
.schema(schema) \
.csv("data/incoming/")
query = streaming_df.writeStream \
.format("parquet") \
.option("path", "data/processed/") \
.option("checkpointLocation", "data/checkpoint/") \
.start()
Handling Headers in Multiple Files
When combining files, ensure consistent header handling to avoid treating header rows as data.
# All files have headers - standard approach
df = spark.read \
.option("header", "true") \
.csv("data/sales/*.csv")
# Files without headers - provide column names
df = spark.read \
.option("header", "false") \
.schema(schema) \
.csv("data/sales/*.csv")
# Mixed scenarios - some files have headers
# Requires preprocessing or separate reads
df_with_headers = spark.read.option("header", "true").csv("data/sales/export_*.csv")
df_without_headers = spark.read.schema(schema).option("header", "false").csv("data/sales/raw_*.csv")
# Union the datasets
combined_df = df_with_headers.union(df_without_headers)
Reading multiple CSV files in PySpark requires balancing convenience with performance. Use wildcard patterns for simple scenarios, explicit schemas for production workloads, and appropriate error handling for diverse data sources. Monitor partition sizes and file counts to maintain optimal cluster utilization.