Spark Scala - Read CSV File

CSV files refuse to die. Despite the rise of Parquet, ORC, and Avro, you'll still encounter CSV in nearly every data engineering project. Legacy systems export it. Business users create it in Excel....

Key Insights

  • Always define explicit schemas in production—inferSchema reads your data twice and can misinterpret types, causing silent bugs downstream
  • Use PERMISSIVE mode with columnNameOfCorruptRecord to capture malformed rows without losing data or crashing your pipeline
  • CSV lacks partition pruning and predicate pushdown, so consider converting to Parquet after initial ingestion for repeated analytical queries

Why CSV Still Matters

CSV files refuse to die. Despite the rise of Parquet, ORC, and Avro, you’ll still encounter CSV in nearly every data engineering project. Legacy systems export it. Business users create it in Excel. Third-party vendors deliver it via SFTP. It’s the lowest common denominator of data exchange.

Apache Spark handles CSV files well, but the default settings will bite you in production. This guide covers the practical patterns you need for reliable CSV ingestion in Spark Scala applications.

Basic CSV Reading

The simplest approach uses Spark’s DataFrameReader with the csv format:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .appName("CSV Reader")
  .master("local[*]")
  .getOrCreate()

val df = spark.read.csv("data/users.csv")
df.show()

This works, but it’s almost never what you want. Without options, Spark treats every column as a string, ignores headers, and names columns _c0, _c1, etc. The output looks like this:

+-------+---+------------------+
|    _c0|_c1|               _c2|
+-------+---+------------------+
|   name|age|             email|
|   John| 32|  john@example.com|
|   Jane| 28|  jane@example.com|
+-------+---+------------------+

Your header row becomes data. Your integers become strings. This is a recipe for downstream failures.

Common Read Options

Spark’s CSV reader supports dozens of options. Here are the ones you’ll use constantly:

val df = spark.read
  .option("header", "true")
  .option("inferSchema", "true")
  .option("delimiter", ",")
  .option("quote", "\"")
  .option("escape", "\\")
  .option("nullValue", "NA")
  .option("dateFormat", "yyyy-MM-dd")
  .option("timestampFormat", "yyyy-MM-dd HH:mm:ss")
  .option("multiLine", "true")
  .csv("data/users.csv")

df.printSchema()
df.show()

Let’s break down each option:

header: When true, uses the first row as column names. Almost always set this to true.

inferSchema: Spark scans the data to guess column types. Convenient for exploration, dangerous for production. More on this later.

delimiter: The field separator. Defaults to comma. Set to \t for TSV files, | for pipe-delimited.

quote: Character used to quote fields containing special characters. Defaults to double-quote.

escape: Character used to escape quotes within quoted fields. Defaults to backslash.

nullValue: String representation of null values. Common values include NA, NULL, N/A, or empty string.

dateFormat/timestampFormat: Java SimpleDateFormat patterns for parsing date and timestamp columns.

multiLine: When true, allows fields to contain newline characters. Slower but necessary for some data sources.

You can also use the options method to pass a Map:

val options = Map(
  "header" -> "true",
  "inferSchema" -> "true",
  "nullValue" -> "NA"
)

val df = spark.read.options(options).csv("data/users.csv")

Schema Definition

In production, always define your schema explicitly. Here’s why:

  1. Performance: inferSchema requires an extra pass through your data. On large files, this doubles read time.
  2. Type safety: Schema inference guesses. It might interpret "001" as an integer 1, losing leading zeros. It might see "true" in one row and "yes" in another, picking string instead of boolean.
  3. Consistency: The same file might infer different schemas on different runs if the sample data varies.

Define schemas using StructType:

import org.apache.spark.sql.types._

val userSchema = StructType(Array(
  StructField("user_id", IntegerType, nullable = false),
  StructField("name", StringType, nullable = true),
  StructField("email", StringType, nullable = true),
  StructField("age", IntegerType, nullable = true),
  StructField("created_at", TimestampType, nullable = true),
  StructField("is_active", BooleanType, nullable = true),
  StructField("balance", DecimalType(10, 2), nullable = true)
))

val df = spark.read
  .option("header", "true")
  .option("timestampFormat", "yyyy-MM-dd HH:mm:ss")
  .schema(userSchema)
  .csv("data/users.csv")

df.printSchema()

Output:

root
 |-- user_id: integer (nullable = false)
 |-- name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- created_at: timestamp (nullable = true)
 |-- is_active: boolean (nullable = true)
 |-- balance: decimal(10,2) (nullable = true)

For complex schemas, consider storing the schema definition as JSON and loading it:

// Save schema to JSON (do this once)
val schemaJson = userSchema.json
println(schemaJson)

// Load schema from JSON
val loadedSchema = DataType.fromJson(schemaJson).asInstanceOf[StructType]

val df = spark.read
  .option("header", "true")
  .schema(loadedSchema)
  .csv("data/users.csv")

This approach lets you version control your schemas separately from your code.

Handling Malformed Data

Real-world CSV files contain garbage. Missing columns, extra columns, invalid types, encoding issues. Spark provides three modes for handling malformed records:

PERMISSIVE (default): Sets malformed fields to null and optionally captures the entire corrupt record in a designated column.

DROPMALFORMED: Silently drops rows that can’t be parsed. Dangerous—you might not realize you’re losing data.

FAILFAST: Throws an exception on the first malformed record. Good for catching issues early, bad for batch processing.

Here’s the recommended pattern for production:

val schema = StructType(Array(
  StructField("id", IntegerType, nullable = true),
  StructField("name", StringType, nullable = true),
  StructField("value", DoubleType, nullable = true),
  StructField("_corrupt_record", StringType, nullable = true)
))

val df = spark.read
  .option("header", "true")
  .option("mode", "PERMISSIVE")
  .option("columnNameOfCorruptRecord", "_corrupt_record")
  .schema(schema)
  .csv("data/messy_data.csv")

// Separate good and bad records
val goodRecords = df.filter(df("_corrupt_record").isNull)
  .drop("_corrupt_record")

val badRecords = df.filter(df("_corrupt_record").isNotNull)
  .select("_corrupt_record")

// Log or save bad records for investigation
badRecords.show(truncate = false)

// Continue processing with good records
goodRecords.show()

This pattern lets you process valid data while preserving problematic rows for later investigation. Write bad records to a dead-letter queue or error table for manual review.

Reading Multiple Files

Spark excels at reading multiple files as a single DataFrame. Pass a directory path, glob pattern, or list of paths:

// Read all CSVs in a directory
val df1 = spark.read
  .option("header", "true")
  .schema(userSchema)
  .csv("data/users/")

// Use glob patterns
val df2 = spark.read
  .option("header", "true")
  .schema(userSchema)
  .csv("data/users/*.csv")

// Read specific date partitions
val df3 = spark.read
  .option("header", "true")
  .schema(userSchema)
  .csv("data/users/date=2024-01-*/")

// Read multiple explicit paths
val df4 = spark.read
  .option("header", "true")
  .schema(userSchema)
  .csv("data/users_2024.csv", "data/users_2023.csv", "data/archive/users_old.csv")

When reading multiple files, ensure they share the same schema. Spark won’t warn you if column orders differ between files—it reads by position, not name, unless you’re using header=true with matching headers.

Add the source filename to your DataFrame for debugging:

import org.apache.spark.sql.functions._

val df = spark.read
  .option("header", "true")
  .schema(userSchema)
  .csv("data/users/*.csv")
  .withColumn("source_file", input_file_name())

df.select("user_id", "name", "source_file").show(truncate = false)

Performance Considerations

CSV is inherently slow compared to columnar formats. Here’s how to minimize the pain:

Never use inferSchema in production. It reads your data twice—once to infer types, once to actually load. On a 10GB file, that’s 20GB of I/O.

// Bad: reads data twice
val slow = spark.read
  .option("header", "true")
  .option("inferSchema", "true")
  .csv("data/large_file.csv")

// Good: single pass
val fast = spark.read
  .option("header", "true")
  .schema(explicitSchema)
  .csv("data/large_file.csv")

CSV doesn’t support predicate pushdown. When you filter a Parquet file, Spark skips irrelevant row groups. With CSV, it reads everything. If you query the same CSV repeatedly, convert it to Parquet first:

// One-time conversion
spark.read
  .option("header", "true")
  .schema(userSchema)
  .csv("data/users.csv")
  .write
  .parquet("data/users.parquet")

// Fast repeated queries
val df = spark.read.parquet("data/users.parquet")
  .filter($"age" > 25)  // Predicate pushdown works here

Partition your output. If you’re writing processed CSV data for downstream consumption, partition by a high-cardinality column to enable parallel reads:

df.write
  .partitionBy("date", "region")
  .option("header", "true")
  .csv("output/partitioned_users/")

Consider compression. Spark reads gzipped CSV files automatically, but gzip isn’t splittable—one file means one partition. For large files, use bzip2 (splittable but slow) or split files before compression.

Wrapping Up

Reading CSV in Spark is straightforward once you internalize a few rules: always define schemas explicitly, handle malformed data gracefully with PERMISSIVE mode, and convert to Parquet when you need repeated fast queries. The DataFrameReader API is flexible enough to handle most CSV variations you’ll encounter, from simple comma-delimited files to complex multi-line quoted formats.

The patterns in this guide will handle 95% of CSV ingestion scenarios. For the remaining 5%—truly pathological files with inconsistent quoting or encoding issues—you might need to preprocess with a dedicated parsing library before bringing data into Spark.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.