PySpark - Read CSV File into DataFrame
PySpark's `spark.read.csv()` method provides the simplest approach to load CSV files into DataFrames. The method accepts file paths from local filesystems, HDFS, S3, or other distributed storage...
Key Insights
- PySpark provides multiple methods to read CSV files with fine-grained control over schema inference, delimiters, headers, and data types through the DataFrameReader API
- Reading CSV files efficiently requires understanding options like
inferSchema,header, and explicit schema definition to avoid performance bottlenecks and type mismatches - Production environments benefit from explicit schema definitions, custom delimiters, and proper handling of malformed records rather than relying on automatic inference
Basic CSV Reading
PySpark’s spark.read.csv() method provides the simplest approach to load CSV files into DataFrames. The method accepts file paths from local filesystems, HDFS, S3, or other distributed storage systems.
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("CSV Reader") \
.getOrCreate()
# Basic read with header
df = spark.read.csv("data/users.csv", header=True)
df.show()
# Alternative syntax using format
df = spark.read \
.format("csv") \
.option("header", "true") \
.load("data/users.csv")
The header=True option tells PySpark that the first row contains column names. Without this option, PySpark generates default column names like _c0, _c1, etc.
Schema Inference vs Explicit Schema
Schema inference scans the entire dataset to determine data types, which can be expensive for large files. Explicit schema definition improves performance and ensures type safety.
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
# With schema inference (slower for large files)
df_inferred = spark.read.csv(
"data/sales.csv",
header=True,
inferSchema=True
)
# Explicit schema definition (recommended for production)
schema = StructType([
StructField("transaction_id", StringType(), True),
StructField("customer_id", IntegerType(), True),
StructField("amount", DoubleType(), True),
StructField("product_name", StringType(), True),
StructField("quantity", IntegerType(), True)
])
df_explicit = spark.read.csv(
"data/sales.csv",
header=True,
schema=schema
)
df_explicit.printSchema()
The explicit schema approach eliminates the inference overhead and guarantees consistent data types across multiple reads.
Advanced CSV Options
PySpark supports numerous options for handling various CSV formats and edge cases. These options control delimiter characters, quote handling, null values, and date formats.
# Comprehensive options for complex CSV files
df = spark.read \
.option("header", "true") \
.option("delimiter", "|") \
.option("quote", "\"") \
.option("escape", "\\") \
.option("nullValue", "NA") \
.option("nanValue", "NaN") \
.option("dateFormat", "yyyy-MM-dd") \
.option("timestampFormat", "yyyy-MM-dd HH:mm:ss") \
.option("mode", "DROPMALFORMED") \
.csv("data/complex_data.csv")
# Reading tab-separated values
df_tsv = spark.read \
.option("header", "true") \
.option("delimiter", "\t") \
.csv("data/data.tsv")
# Handling multiline records
df_multiline = spark.read \
.option("header", "true") \
.option("multiLine", "true") \
.option("escape", "\"") \
.csv("data/multiline.csv")
The mode option controls behavior when encountering malformed records:
PERMISSIVE(default): Sets malformed fields to nullDROPMALFORMED: Drops rows with malformed dataFAILFAST: Throws exception on malformed data
Reading Multiple CSV Files
PySpark can read multiple CSV files simultaneously using wildcards or by passing a list of paths. This is particularly useful for partitioned data or time-series files.
# Using wildcards to read multiple files
df_multiple = spark.read.csv(
"data/logs/*.csv",
header=True,
inferSchema=True
)
# Reading from multiple specific paths
paths = [
"data/2024/january.csv",
"data/2024/february.csv",
"data/2024/march.csv"
]
df_combined = spark.read.csv(
paths,
header=True,
schema=schema
)
# Reading entire directory recursively
df_recursive = spark.read \
.option("header", "true") \
.option("recursiveFileLookup", "true") \
.csv("data/")
# Add filename to track source
from pyspark.sql.functions import input_file_name
df_with_source = df_multiple.withColumn("source_file", input_file_name())
df_with_source.select("source_file").distinct().show(truncate=False)
Handling Corrupt Records
Production systems require robust error handling for malformed CSV data. PySpark provides options to capture and analyze corrupt records.
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
# Define schema with corrupt record column
schema_with_corrupt = StructType([
StructField("id", IntegerType(), True),
StructField("name", StringType(), True),
StructField("age", IntegerType(), True),
StructField("_corrupt_record", StringType(), True)
])
# Read with corrupt record tracking
df_with_corrupt = spark.read \
.option("header", "true") \
.option("mode", "PERMISSIVE") \
.option("columnNameOfCorruptRecord", "_corrupt_record") \
.schema(schema_with_corrupt) \
.csv("data/dirty_data.csv")
# Filter and analyze corrupt records
corrupt_records = df_with_corrupt.filter(
df_with_corrupt._corrupt_record.isNotNull()
)
print(f"Total corrupt records: {corrupt_records.count()}")
corrupt_records.show(truncate=False)
# Get clean records
clean_records = df_with_corrupt.filter(
df_with_corrupt._corrupt_record.isNull()
).drop("_corrupt_record")
Performance Optimization
Reading large CSV files efficiently requires understanding partitioning and resource allocation. PySpark automatically partitions data, but you can control this behavior.
# Control number of partitions during read
df_partitioned = spark.read \
.option("header", "true") \
.csv("data/large_file.csv") \
.repartition(100)
print(f"Number of partitions: {df_partitioned.rdd.getNumPartitions()}")
# Coalesce to reduce partitions (no shuffle)
df_coalesced = df_partitioned.coalesce(10)
# Use sampling for development/testing
df_sample = spark.read \
.option("header", "true") \
.option("samplingRatio", "0.1") \
.csv("data/huge_file.csv")
# Cache frequently accessed DataFrames
df_cached = spark.read.csv("data/reference.csv", header=True)
df_cached.cache()
df_cached.count() # Trigger caching
Reading from Cloud Storage
PySpark seamlessly integrates with cloud storage systems. Configure credentials and read CSV files from S3, Azure Blob Storage, or Google Cloud Storage.
# AWS S3 configuration
spark.conf.set("spark.hadoop.fs.s3a.access.key", "YOUR_ACCESS_KEY")
spark.conf.set("spark.hadoop.fs.s3a.secret.key", "YOUR_SECRET_KEY")
df_s3 = spark.read.csv(
"s3a://bucket-name/data/file.csv",
header=True,
inferSchema=True
)
# Azure Blob Storage
spark.conf.set(
"fs.azure.account.key.ACCOUNT_NAME.dfs.core.windows.net",
"YOUR_ACCOUNT_KEY"
)
df_azure = spark.read.csv(
"abfss://container@account.dfs.core.windows.net/data/file.csv",
header=True
)
# Google Cloud Storage
df_gcs = spark.read.csv(
"gs://bucket-name/data/file.csv",
header=True,
inferSchema=True
)
Validation and Data Quality Checks
After reading CSV files, implement validation checks to ensure data quality before processing.
from pyspark.sql.functions import col, count, when, isnan
df = spark.read.csv("data/users.csv", header=True, inferSchema=True)
# Check for null values
null_counts = df.select([
count(when(col(c).isNull(), c)).alias(c)
for c in df.columns
])
null_counts.show()
# Validate data ranges
df.select(
count(when(col("age") < 0, 1)).alias("negative_age"),
count(when(col("salary") > 1000000, 1)).alias("high_salary")
).show()
# Check for duplicates
duplicate_count = df.groupBy(df.columns).count().filter(col("count") > 1).count()
print(f"Duplicate rows: {duplicate_count}")
# Display basic statistics
df.describe().show()
Reading CSV files in PySpark requires balancing convenience with performance. Use schema inference for exploration, but define explicit schemas for production workloads. Handle corrupt records appropriately and implement validation checks to maintain data quality throughout your pipeline.