Spark Scala - Read Parquet File

Apache Parquet has become the de facto standard for storing analytical data in big data ecosystems. As a columnar storage format, Parquet stores data by column rather than by row, which provides...

Key Insights

  • Parquet’s columnar format combined with Spark’s predicate pushdown and column projection can reduce I/O by 90% or more compared to row-based formats like CSV or JSON.
  • Always enable mergeSchema when reading Parquet files that may have evolved over time, but be aware of the performance cost during the initial read.
  • Partition pruning is automatic when your filter conditions match partition columns—structure your data accordingly to maximize read performance.

Introduction to Parquet Format

Apache Parquet has become the de facto standard for storing analytical data in big data ecosystems. As a columnar storage format, Parquet stores data by column rather than by row, which provides significant advantages for analytical workloads where you typically query a subset of columns across many rows.

The benefits are substantial. Compression ratios improve dramatically because similar data types are stored together—a column of integers compresses far better than a row containing integers, strings, and timestamps mixed together. Parquet also supports schema evolution, allowing you to add columns without rewriting existing data. Most importantly for Spark, Parquet enables predicate pushdown and column projection, meaning Spark can skip reading data it doesn’t need at the storage layer rather than filtering after loading everything into memory.

Compared to CSV or JSON, Parquet files are typically 75% smaller and 10x faster to read for analytical queries. If you’re working with Spark and not using Parquet (or a similar columnar format like ORC), you’re leaving significant performance on the table.

Environment Setup

Before reading Parquet files, you need a properly configured SparkSession. If you’re using spark-shell or a managed environment like Databricks, this is handled for you. For standalone applications, you’ll need to set up the session explicitly.

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .appName("ParquetReader")
  .master("local[*]")  // Use all available cores locally
  .config("spark.sql.parquet.compression.codec", "snappy")
  .config("spark.sql.parquet.filterPushdown", "true")
  .getOrCreate()

// Import implicits for DataFrame operations
import spark.implicits._

For production deployments, remove the .master() call and let your cluster manager (YARN, Kubernetes, or Mesos) handle resource allocation. The filterPushdown configuration is enabled by default in modern Spark versions, but I include it explicitly for clarity.

Your build.sbt should include Spark SQL:

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-sql" % "3.5.0" % "provided"
)

Mark it as provided if you’re deploying to a cluster where Spark is already available.

Basic Parquet Read Operations

Reading Parquet files in Spark is straightforward. The spark.read.parquet() method handles both single files and directories containing multiple Parquet files.

// Read a single Parquet file
val df = spark.read.parquet("/data/users.parquet")

// Read all Parquet files in a directory
val dfFromDir = spark.read.parquet("/data/users/")

// Read from HDFS
val dfHdfs = spark.read.parquet("hdfs://namenode:8020/data/users/")

// Read from S3
val dfS3 = spark.read.parquet("s3a://my-bucket/data/users/")

// Inspect what you've loaded
df.printSchema()
df.show(10, truncate = false)

// Get row count (triggers a full scan)
println(s"Total records: ${df.count()}")

When reading from a directory, Spark automatically discovers and reads all Parquet files within it, including files in subdirectories if they follow Hive-style partitioning. This makes it trivial to read datasets that have been partitioned by date, region, or other dimensions.

You can also read multiple paths in a single call:

val dfMultiple = spark.read.parquet(
  "/data/users/year=2023/",
  "/data/users/year=2024/"
)

Schema Handling and Inference

Parquet files embed their schema, so Spark can infer column names and types automatically. However, there are situations where you want explicit control over the schema.

import org.apache.spark.sql.types._

// Define schema explicitly
val userSchema = StructType(Seq(
  StructField("user_id", LongType, nullable = false),
  StructField("username", StringType, nullable = true),
  StructField("email", StringType, nullable = true),
  StructField("created_at", TimestampType, nullable = true),
  StructField("is_active", BooleanType, nullable = true)
))

// Apply schema during read
val dfWithSchema = spark.read
  .schema(userSchema)
  .parquet("/data/users/")

Explicit schemas provide two benefits: faster reads (Spark skips schema inference) and protection against unexpected schema changes in your source data.

When your Parquet files have evolved over time—older files might lack columns that newer files have—enable schema merging:

val dfMerged = spark.read
  .option("mergeSchema", "true")
  .parquet("/data/users/")

With mergeSchema enabled, Spark reads the schema from all files and creates a unified schema. Missing columns in older files become null values. Be cautious with this option on large datasets with many files, as schema discovery adds overhead.

You can also enable this globally:

spark.conf.set("spark.sql.parquet.mergeSchema", "true")

Read Options and Performance Tuning

The real power of Parquet comes from its integration with Spark’s query optimizer. Three techniques matter most: column projection, predicate pushdown, and partition pruning.

Column Projection means reading only the columns you need. Spark pushes this down to the Parquet reader, so unused columns are never loaded from disk:

// Only reads user_id and email columns from storage
val projected = spark.read
  .parquet("/data/users/")
  .select("user_id", "email")

Predicate Pushdown filters data at the storage layer using Parquet’s row group statistics:

// Spark pushes this filter to Parquet reader
// Row groups where max(user_id) < 1000 are skipped entirely
val filtered = spark.read
  .parquet("/data/users/")
  .filter($"user_id" >= 1000 && $"user_id" < 2000)

Predicate pushdown works best with sorted data. If your Parquet files are sorted by user_id, the filter above might skip 90% of row groups without reading them.

Partition Pruning works with Hive-style partitioned data:

// Directory structure:
// /data/events/year=2024/month=01/
// /data/events/year=2024/month=02/
// ...

val events = spark.read.parquet("/data/events/")

// Spark only reads the month=03 partition directory
val marchEvents = events.filter($"year" === 2024 && $"month" === 3)

Combine all three for maximum performance:

val optimizedRead = spark.read
  .parquet("/data/events/")
  .filter($"year" === 2024 && $"month" === 3)  // Partition pruning
  .filter($"event_type" === "purchase")         // Predicate pushdown
  .select("user_id", "amount", "timestamp")     // Column projection

Additional configuration options worth knowing:

val dfConfigured = spark.read
  .option("recursiveFileLookup", "true")  // Read nested directories
  .option("pathGlobFilter", "*.parquet")  // Filter by filename pattern
  .option("modifiedBefore", "2024-01-01T00:00:00")  // Filter by modification time
  .parquet("/data/events/")

Common Issues and Troubleshooting

Parquet reads can fail for several reasons. Here’s how to handle the most common issues.

Schema Mismatches occur when files have incompatible schemas (e.g., a column is INT in one file and STRING in another):

// Force a specific schema to handle mismatches
val safeRead = spark.read
  .schema(expectedSchema)
  .parquet("/data/mixed/")

Corrupt Files can crash your entire job. Use the PERMISSIVE mode to handle them gracefully:

// For corrupt records, you'll need to read as generic format first
val dfWithCorruptHandling = spark.read
  .option("ignoreCorruptFiles", "true")
  .parquet("/data/potentially-corrupt/")

When ignoreCorruptFiles is enabled, Spark skips unreadable files and logs warnings instead of failing the job.

Memory Issues often arise when reading many small files or files with complex nested schemas:

// Increase parallelism for many small files
spark.conf.set("spark.sql.files.maxPartitionBytes", "64mb")
spark.conf.set("spark.sql.files.openCostInBytes", "4mb")

// For deeply nested schemas, limit recursion
spark.conf.set("spark.sql.parquet.enableNestedColumnVectorizedReader", "false")

Debugging Read Failures becomes easier with explain:

val df = spark.read.parquet("/data/users/")
  .filter($"user_id" > 1000)
  .select("user_id", "email")

// See the physical plan including pushdown filters
df.explain(true)

The explain output shows which filters are pushed down and which partitions are pruned, helping you verify that optimizations are working.

Conclusion

Reading Parquet files with Spark Scala is simple in the basic case but offers substantial optimization opportunities for production workloads. The key practices to remember:

Use Parquet over CSV or JSON for any data you’ll query repeatedly. The compression and read performance benefits compound over time. Always project only the columns you need—this is the easiest performance win. Structure your data with partition columns that match your common filter patterns. Enable mergeSchema only when necessary, and prefer explicit schemas for production pipelines.

When choosing between Parquet and ORC (the other major columnar format), Parquet has broader ecosystem support and better interoperability with non-Spark tools. ORC can offer slightly better compression and performance in pure Hive/Spark environments. For most teams, Parquet is the safer default choice.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.