Apache Spark - Read/Write JDBC Databases
Before reading or writing data, ensure the appropriate JDBC driver is available to all Spark executors. For cluster deployments, include the driver JAR using `--jars` or `--packages`:
Key Insights
- Spark’s JDBC API enables parallel reads from databases using partition columns, achieving 10-50x speedups over single-threaded extraction by distributing queries across executors
- Write performance critically depends on batch size, isolation level, and connection pooling—default settings often leave 5-10x performance gains on the table
- Connection credentials and driver management require careful handling in production; never hardcode passwords and always validate JDBC driver availability before job submission
JDBC Driver Setup
Before reading or writing data, ensure the appropriate JDBC driver is available to all Spark executors. For cluster deployments, include the driver JAR using --jars or --packages:
spark-submit --packages org.postgresql:postgresql:42.6.0 \
--class com.example.JdbcApp \
app.jar
For interactive sessions:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("JDBC Operations") \
.config("spark.jars.packages", "org.postgresql:postgresql:42.6.0") \
.getOrCreate()
Common JDBC drivers and their Maven coordinates:
- PostgreSQL:
org.postgresql:postgresql:42.6.0 - MySQL:
com.mysql:mysql-connector-j:8.0.33 - SQL Server:
com.microsoft.sqlserver:mssql-jdbc:12.2.0.jre11 - Oracle:
com.oracle.database.jdbc:ojdbc8:21.9.0.0
Basic Read Operations
The simplest JDBC read loads an entire table into a DataFrame:
df = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql://localhost:5432/mydb") \
.option("dbtable", "users") \
.option("user", "admin") \
.option("password", "secret") \
.option("driver", "org.postgresql.Driver") \
.load()
df.show()
For custom queries, use a subquery with parentheses and an alias:
query = """
(SELECT u.id, u.name, o.total
FROM users u
JOIN orders o ON u.id = o.user_id
WHERE o.created_at > '2024-01-01') AS filtered_data
"""
df = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql://localhost:5432/mydb") \
.option("dbtable", query) \
.option("user", "admin") \
.option("password", "secret") \
.load()
Parallel Reading with Partitioning
Without partitioning, Spark reads the entire table through a single connection—a bottleneck for large datasets. Enable parallel reads by specifying partition parameters:
df = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql://localhost:5432/mydb") \
.option("dbtable", "transactions") \
.option("user", "admin") \
.option("password", "secret") \
.option("partitionColumn", "id") \
.option("lowerBound", "1") \
.option("upperBound", "1000000") \
.option("numPartitions", "10") \
.load()
Spark divides the range [lowerBound, upperBound] into numPartitions intervals and generates queries like:
SELECT * FROM transactions WHERE id >= 1 AND id < 100000
SELECT * FROM transactions WHERE id >= 100000 AND id < 200000
-- ... 8 more partitions
Critical considerations:
- Partition column must be numeric (INTEGER, BIGINT, DECIMAL)
- Column should have an index for efficient range scans
- Bounds don’t filter data—they only define partition boundaries; rows outside bounds are still read
- Choose numPartitions based on cluster size: typically 2-3x the number of executor cores
For non-numeric partition columns, use predicates:
predicates = [
"country = 'US'",
"country = 'CA'",
"country = 'MX'",
"country NOT IN ('US', 'CA', 'MX')"
]
df = spark.read \
.format("jdbc") \
.option("url", "jdbc:mysql://localhost:3306/mydb") \
.option("dbtable", "customers") \
.option("user", "admin") \
.option("password", "secret") \
.option("predicates", predicates) \
.load()
Optimizing Read Performance
Fetch Size: Control how many rows are retrieved per round trip:
df = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql://localhost:5432/mydb") \
.option("dbtable", "large_table") \
.option("user", "admin") \
.option("password", "secret") \
.option("fetchsize", "10000") \
.option("partitionColumn", "id") \
.option("lowerBound", "1") \
.option("upperBound", "10000000") \
.option("numPartitions", "20") \
.load()
Default fetch sizes vary by driver (PostgreSQL: 0 = all rows, MySQL: varies). Values between 1,000-50,000 typically work well.
Session Init SQL: Execute setup commands before reading:
df = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql://localhost:5432/mydb") \
.option("dbtable", "users") \
.option("user", "admin") \
.option("password", "secret") \
.option("sessionInitStatement", "SET work_mem = '256MB'; SET random_page_cost = 1.1;") \
.load()
Writing to Databases
Basic write operation:
df.write \
.format("jdbc") \
.option("url", "jdbc:postgresql://localhost:5432/mydb") \
.option("dbtable", "output_table") \
.option("user", "admin") \
.option("password", "secret") \
.mode("append") \
.save()
Save modes:
append: Insert new rowsoverwrite: Drop and recreate tableignore: Skip if table existserror: Fail if table exists (default)
Batch Inserts: Critical for write performance:
df.write \
.format("jdbc") \
.option("url", "jdbc:postgresql://localhost:5432/mydb") \
.option("dbtable", "metrics") \
.option("user", "admin") \
.option("password", "secret") \
.option("batchsize", "10000") \
.option("isolationLevel", "READ_UNCOMMITTED") \
.mode("append") \
.save()
Default batch size is 1,000. Increase to 10,000-50,000 for significant speedups. Test with your database—very large batches can cause memory issues.
Truncate Instead of Drop: When using overwrite mode, preserve table structure:
df.write \
.format("jdbc") \
.option("url", "jdbc:postgresql://localhost:5432/mydb") \
.option("dbtable", "staging_table") \
.option("user", "admin") \
.option("password", "secret") \
.option("truncate", "true") \
.mode("overwrite") \
.save()
This executes TRUNCATE TABLE instead of DROP TABLE, preserving indexes, constraints, and permissions.
Connection Pooling and Security
Never hardcode credentials. Use environment variables or secret management:
import os
jdbc_url = os.environ.get("JDBC_URL")
jdbc_user = os.environ.get("JDBC_USER")
jdbc_password = os.environ.get("JDBC_PASSWORD")
df = spark.read \
.format("jdbc") \
.option("url", jdbc_url) \
.option("dbtable", "users") \
.option("user", jdbc_user) \
.option("password", jdbc_password) \
.load()
For connection pooling with HikariCP (improves performance for multiple operations):
spark = SparkSession.builder \
.appName("JDBC with Pooling") \
.config("spark.executor.extraClassPath", "/path/to/HikariCP.jar") \
.getOrCreate()
connection_properties = {
"user": "admin",
"password": "secret",
"driver": "org.postgresql.Driver",
"connectionProvider": "hikari",
"hikari.maximumPoolSize": "10",
"hikari.connectionTimeout": "30000"
}
df = spark.read.jdbc(
url="jdbc:postgresql://localhost:5432/mydb",
table="users",
properties=connection_properties
)
Transaction Control and Error Handling
Control transaction isolation for writes:
df.write \
.format("jdbc") \
.option("url", "jdbc:postgresql://localhost:5432/mydb") \
.option("dbtable", "financial_transactions") \
.option("user", "admin") \
.option("password", "secret") \
.option("isolationLevel", "SERIALIZABLE") \
.option("batchsize", "5000") \
.mode("append") \
.save()
Isolation levels: NONE, READ_UNCOMMITTED, READ_COMMITTED, REPEATABLE_READ, SERIALIZABLE. Higher levels ensure consistency but reduce concurrency.
Handle write failures gracefully:
try:
df.write \
.format("jdbc") \
.option("url", "jdbc:postgresql://localhost:5432/mydb") \
.option("dbtable", "events") \
.option("user", "admin") \
.option("password", "secret") \
.option("batchsize", "10000") \
.mode("append") \
.save()
except Exception as e:
print(f"Write failed: {str(e)}")
# Log to monitoring system, write to backup location, etc.
df.write.parquet("s3://backup-bucket/failed-writes/")
Performance Benchmarks
Real-world example reading 10M rows from PostgreSQL:
- Single partition: 8 minutes, 1 executor active
- 10 partitions: 52 seconds, 10 executors active (9.2x speedup)
- 20 partitions + fetchsize 50000: 31 seconds (15.5x speedup)
Write performance for 5M rows:
- Default settings (batchsize=1000): 12 minutes
- batchsize=10000: 3.5 minutes (3.4x speedup)
- batchsize=50000 + READ_UNCOMMITTED: 2.1 minutes (5.7x speedup)
Always test with your specific database, network, and data characteristics. Monitor database connection counts and CPU usage to avoid overwhelming the source system.