PySpark - Read Delta Lake Table
Reading a Delta Lake table in PySpark requires minimal configuration. The Delta Lake format is built on top of Parquet files with a transaction log, making it straightforward to query.
Key Insights
- Delta Lake tables can be read in PySpark using the
format("delta")method or by directly referencing the table path, with native support for time travel queries and schema evolution - Reading Delta tables provides ACID transaction guarantees and automatic metadata handling, eliminating the need for manual schema inference or partition discovery
- Performance optimizations like predicate pushdown, column pruning, and Z-ordering work automatically when reading Delta tables, significantly improving query execution times
Basic Delta Table Reading
Reading a Delta Lake table in PySpark requires minimal configuration. The Delta Lake format is built on top of Parquet files with a transaction log, making it straightforward to query.
from pyspark.sql import SparkSession
# Initialize Spark session with Delta Lake support
spark = SparkSession.builder \
.appName("DeltaLakeReader") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
# Read Delta table using path
df = spark.read.format("delta").load("/path/to/delta/table")
df.show()
# Alternative: Read using table name (if registered in metastore)
df = spark.read.table("database_name.table_name")
df.show()
The format("delta") method tells Spark to use the Delta Lake reader, which handles the transaction log parsing and provides access to the latest version of your data.
Reading Specific Versions with Time Travel
Delta Lake’s time travel feature allows you to query historical versions of your data. This is invaluable for auditing, debugging, or reproducing analysis from a specific point in time.
# Read specific version by version number
df_version = spark.read.format("delta") \
.option("versionAsOf", 5) \
.load("/path/to/delta/table")
# Read data as it existed at a specific timestamp
df_timestamp = spark.read.format("delta") \
.option("timestampAsOf", "2024-01-15 10:30:00") \
.load("/path/to/delta/table")
# Using SQL syntax for time travel
spark.sql("""
SELECT * FROM delta.`/path/to/delta/table`
VERSION AS OF 5
""").show()
spark.sql("""
SELECT * FROM delta.`/path/to/delta/table`
TIMESTAMP AS OF '2024-01-15 10:30:00'
""").show()
You can view available versions using the Delta Lake history API:
from delta.tables import DeltaTable
deltaTable = DeltaTable.forPath(spark, "/path/to/delta/table")
history_df = deltaTable.history()
history_df.select("version", "timestamp", "operation").show(truncate=False)
Filtering and Predicate Pushdown
Delta Lake automatically optimizes queries by pushing predicates down to the file level, reading only the necessary data files based on partition information and statistics.
# Read with filter conditions
df_filtered = spark.read.format("delta") \
.load("/path/to/delta/table") \
.filter("order_date >= '2024-01-01' AND status = 'completed'")
# Column selection with predicate pushdown
df_optimized = spark.read.format("delta") \
.load("/path/to/delta/table") \
.select("customer_id", "order_total", "order_date") \
.where("order_total > 1000")
# Using SQL for complex filtering
result = spark.sql("""
SELECT
customer_id,
SUM(order_total) as total_spent
FROM delta.`/path/to/delta/table`
WHERE order_date BETWEEN '2024-01-01' AND '2024-03-31'
GROUP BY customer_id
HAVING total_spent > 5000
""")
result.show()
The Delta Lake transaction log contains min/max statistics for each data file, enabling efficient data skipping without reading unnecessary files.
Reading Partitioned Delta Tables
Partitioned Delta tables organize data into subdirectories based on column values. Reading from partitioned tables is transparent, but you can leverage partition pruning for better performance.
# Read entire partitioned table
df_all = spark.read.format("delta").load("/path/to/partitioned/table")
# Read specific partitions using filter
df_partition = spark.read.format("delta") \
.load("/path/to/partitioned/table") \
.filter("year = 2024 AND month = 3")
# Explicitly specify partition columns (optional, auto-discovered)
df_explicit = spark.read.format("delta") \
.load("/path/to/partitioned/table") \
.where("year IN (2023, 2024)")
# Check partition information
spark.sql("""
DESCRIBE DETAIL delta.`/path/to/partitioned/table`
""").select("partitionColumns").show(truncate=False)
Schema Evolution and Handling
Delta Lake tracks schema changes in the transaction log. You can read tables with evolved schemas and handle schema mismatches gracefully.
# Read with schema evolution enabled
df_evolved = spark.read.format("delta") \
.option("mergeSchema", "true") \
.load("/path/to/delta/table")
# Check current schema
df_evolved.printSchema()
# Read specific schema version
df_old_schema = spark.read.format("delta") \
.option("versionAsOf", 0) \
.load("/path/to/delta/table")
# Handle missing columns with defaults
from pyspark.sql.functions import col, lit
df_safe = spark.read.format("delta").load("/path/to/delta/table")
if "new_column" not in df_safe.columns:
df_safe = df_safe.withColumn("new_column", lit(None))
Reading with Advanced Options
Delta Lake provides various options to customize read behavior for specific use cases.
# Read with specific read options
df_custom = spark.read.format("delta") \
.option("ignoreDeletes", "true") \
.option("ignoreChanges", "true") \
.load("/path/to/delta/table")
# Read deleted files (for debugging)
df_deleted = spark.read.format("delta") \
.option("readChangeFeed", "true") \
.option("startingVersion", 10) \
.option("endingVersion", 20) \
.load("/path/to/delta/table")
# Configure read parallelism
df_parallel = spark.read.format("delta") \
.option("maxFilesPerTrigger", 1000) \
.load("/path/to/delta/table")
Streaming Reads from Delta Tables
Delta Lake supports streaming reads, enabling real-time data processing pipelines.
# Read Delta table as a stream
stream_df = spark.readStream.format("delta") \
.load("/path/to/delta/table")
# Process stream with watermarking
from pyspark.sql.functions import window, current_timestamp
stream_aggregated = stream_df \
.withWatermark("event_time", "10 minutes") \
.groupBy(
window("event_time", "5 minutes"),
"category"
) \
.count()
# Write stream results
query = stream_aggregated.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "/path/to/checkpoint") \
.start("/path/to/output/delta/table")
query.awaitTermination()
Performance Optimization Tips
Maximize read performance by leveraging Delta Lake’s optimization features.
# Enable adaptive query execution
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", "true")
# Use Z-ordering for frequently filtered columns
from delta.tables import DeltaTable
deltaTable = DeltaTable.forPath(spark, "/path/to/delta/table")
deltaTable.optimize().executeZOrderBy("customer_id", "order_date")
# Read with caching for repeated access
df_cached = spark.read.format("delta") \
.load("/path/to/delta/table") \
.cache()
# Verify data skipping statistics
spark.sql("""
DESCRIBE DETAIL delta.`/path/to/delta/table`
""").select("numFiles", "sizeInBytes").show()
Reading Delta Lake tables in PySpark combines simplicity with powerful features like time travel, automatic optimization, and ACID guarantees. By understanding these patterns, you can build robust data pipelines that handle schema evolution, support historical queries, and deliver optimal performance at scale.