Apache Spark - Redshift Connector
• The Spark-Redshift connector enables bidirectional data transfer between Apache Spark and Amazon Redshift using S3 as an intermediate staging layer, leveraging Redshift's COPY and UNLOAD commands...
Key Insights
• The Spark-Redshift connector enables bidirectional data transfer between Apache Spark and Amazon Redshift using S3 as an intermediate staging layer, leveraging Redshift’s COPY and UNLOAD commands for optimal performance. • Proper configuration of IAM roles, S3 temporary directories, and connection parameters is critical—misconfiguration leads to performance bottlenecks or permission errors that are difficult to debug. • Production deployments require careful consideration of data types, compression codecs, and query pushdown capabilities to avoid full table scans and minimize data movement between systems.
Understanding the Connector Architecture
The Spark-Redshift connector doesn’t establish direct data pipelines between Spark and Redshift. Instead, it uses Amazon S3 as an intermediate staging area. When reading from Redshift, the connector executes an UNLOAD command to export data to S3, then reads the files into Spark DataFrames. For writes, Spark writes data to S3 first, then Redshift loads it using the COPY command.
This architecture provides two key advantages: it leverages Redshift’s highly optimized bulk operations, and it separates compute resources, preventing Spark jobs from overwhelming your data warehouse.
Setting Up Dependencies
Add the connector to your build.sbt or pom.xml:
// build.sbt
libraryDependencies ++= Seq(
"io.github.spark-redshift-community" %% "spark-redshift" % "6.2.0",
"org.apache.spark" %% "spark-sql" % "3.5.0",
"com.amazonaws" % "aws-java-sdk-bundle" % "1.12.500"
)
For Maven projects:
<dependency>
<groupId>io.github.spark-redshift-community</groupId>
<artifactId>spark-redshift_2.12</artifactId>
<version>6.2.0</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-bundle</artifactId>
<version>1.12.500</version>
</dependency>
Reading Data from Redshift
Here’s a basic read operation with essential configurations:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName("RedshiftConnector")
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
.getOrCreate()
val df = spark.read
.format("io.github.spark_redshift_community.spark.redshift")
.option("url", "jdbc:redshift://cluster-name.region.redshift.amazonaws.com:5439/database?user=username&password=password")
.option("dbtable", "public.sales_data")
.option("tempdir", "s3a://my-bucket/temp/")
.option("aws_iam_role", "arn:aws:iam::123456789012:role/RedshiftCopyUnloadRole")
.load()
df.show()
The tempdir must be accessible to both Spark and Redshift. Use IAM roles rather than access keys for authentication—it’s more secure and eliminates credential management overhead.
Query Pushdown Optimization
The connector supports predicate and column pruning pushdown, significantly reducing data transfer:
// This query pushes the filter to Redshift
val filteredDF = spark.read
.format("io.github.spark_redshift_community.spark.redshift")
.option("url", jdbcUrl)
.option("dbtable", "public.orders")
.option("tempdir", tempS3Path)
.option("aws_iam_role", iamRole)
.load()
.select("order_id", "customer_id", "total_amount")
.filter("order_date >= '2024-01-01'")
// For complex queries, use a custom query
val customQueryDF = spark.read
.format("io.github.spark_redshift_community.spark.redshift")
.option("url", jdbcUrl)
.option("query", """
SELECT o.order_id, o.total_amount, c.customer_name
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id
WHERE o.order_date >= '2024-01-01'
AND o.status = 'COMPLETED'
""")
.option("tempdir", tempS3Path)
.option("aws_iam_role", iamRole)
.load()
Use the query option instead of dbtable when you need joins, aggregations, or complex filters executed in Redshift before data reaches Spark.
Writing Data to Redshift
Writing data requires consideration of table creation, append modes, and distribution keys:
import org.apache.spark.sql.SaveMode
val newData = spark.createDataFrame(Seq(
(1001, "Product A", 29.99, "2024-01-15"),
(1002, "Product B", 49.99, "2024-01-15"),
(1003, "Product C", 19.99, "2024-01-16")
)).toDF("product_id", "product_name", "price", "date_added")
newData.write
.format("io.github.spark_redshift_community.spark.redshift")
.option("url", jdbcUrl)
.option("dbtable", "public.products")
.option("tempdir", tempS3Path)
.option("aws_iam_role", iamRole)
.option("diststyle", "KEY")
.option("distkey", "product_id")
.option("sortkeyspec", "SORTKEY(date_added)")
.mode(SaveMode.Append)
.save()
The diststyle and distkey options optimize Redshift table distribution. For dimension tables under 10 million rows, use ALL. For fact tables, use KEY with an appropriate distribution key.
Handling Data Types
Data type mapping between Spark and Redshift requires attention:
import org.apache.spark.sql.types._
val schema = StructType(Array(
StructField("id", LongType, nullable = false),
StructField("name", StringType, nullable = true),
StructField("price", DecimalType(10, 2), nullable = false),
StructField("created_at", TimestampType, nullable = false),
StructField("metadata", StringType, nullable = true) // JSON stored as VARCHAR
))
val df = spark.read.schema(schema)
.option("inferSchema", "false")
.csv("s3a://input-bucket/data.csv")
// Write with explicit type handling
df.write
.format("io.github.spark_redshift_community.spark.redshift")
.option("url", jdbcUrl)
.option("dbtable", "public.product_catalog")
.option("tempdir", tempS3Path)
.option("aws_iam_role", iamRole)
.option("extracopyoptions", "DATEFORMAT 'auto' TIMEFORMAT 'auto'")
.mode(SaveMode.Overwrite)
.save()
Redshift doesn’t support nested structures. Flatten complex types or serialize them as JSON strings before writing.
Production Configuration
For production workloads, configure connection pooling, retries, and performance parameters:
val productionDF = spark.read
.format("io.github.spark_redshift_community.spark.redshift")
.option("url", jdbcUrl)
.option("dbtable", "public.large_table")
.option("tempdir", tempS3Path)
.option("aws_iam_role", iamRole)
.option("tempformat", "CSV GZIP") // Use compression
.option("csvnullstring", "\\N") // Handle nulls explicitly
.option("autopushdown", "true") // Enable query pushdown
.option("autopushdown.s3_result_cache", "true") // Cache S3 results
.option("unload_s3_format", "PARQUET") // Use Parquet for better performance
.load()
The unload_s3_format option with PARQUET provides better compression and faster reads compared to CSV. However, it requires Redshift to support Parquet UNLOAD (available in recent versions).
Error Handling and Monitoring
Implement robust error handling for connection failures and data issues:
import scala.util.{Try, Success, Failure}
def readFromRedshiftWithRetry(
spark: SparkSession,
jdbcUrl: String,
table: String,
maxRetries: Int = 3
): Try[org.apache.spark.sql.DataFrame] = {
def attempt(retriesLeft: Int): Try[org.apache.spark.sql.DataFrame] = {
Try {
spark.read
.format("io.github.spark_redshift_community.spark.redshift")
.option("url", jdbcUrl)
.option("dbtable", table)
.option("tempdir", tempS3Path)
.option("aws_iam_role", iamRole)
.load()
} match {
case Success(df) => Success(df)
case Failure(e) if retriesLeft > 0 =>
println(s"Read failed, retrying... ($retriesLeft attempts left)")
Thread.sleep(5000)
attempt(retriesLeft - 1)
case Failure(e) => Failure(e)
}
}
attempt(maxRetries)
}
// Usage
readFromRedshiftWithRetry(spark, jdbcUrl, "public.critical_data") match {
case Success(df) =>
println(s"Successfully loaded ${df.count()} rows")
df.show()
case Failure(e) =>
println(s"Failed to load data: ${e.getMessage}")
// Implement alerting or fallback logic
}
Performance Considerations
Monitor and optimize these key areas:
-
S3 temporary directory cleanup: The connector doesn’t automatically clean up S3 staging files. Implement lifecycle policies on your temp bucket.
-
Parallel reads: Control parallelism with Redshift slices:
val parallelDF = spark.read
.format("io.github.spark_redshift_community.spark.redshift")
.option("url", jdbcUrl)
.option("dbtable", "public.large_table")
.option("tempdir", tempS3Path)
.option("aws_iam_role", iamRole)
.option("unload_s3_format", "PARQUET")
.option("numPartitions", "16") // Match Redshift slice count
.load()
- Compression: Always enable compression for staging data to reduce S3 costs and transfer times.
The Spark-Redshift connector provides a robust bridge between distributed processing and data warehousing. Success depends on understanding its S3-mediated architecture and configuring it appropriately for your data volumes and query patterns.