Apache Spark - Snowflake Connector
The Snowflake Connector for Spark uses Snowflake's internal stage and COPY command to transfer data, avoiding the performance bottlenecks of traditional JDBC row-by-row operations. Data flows through...
Key Insights
- The Spark-Snowflake connector enables bidirectional data transfer between Apache Spark and Snowflake using the COPY command for optimal performance, achieving up to 10x faster loads compared to JDBC
- Connection pooling, query pushdown, and partition management are critical for production deployments—improper configuration can lead to warehouse credit waste and degraded performance
- Authentication methods include basic credentials, key-pair authentication, and OAuth, with key-pair being the recommended approach for service accounts in production environments
Connector Architecture and Setup
The Snowflake Connector for Spark uses Snowflake’s internal stage and COPY command to transfer data, avoiding the performance bottlenecks of traditional JDBC row-by-row operations. Data flows through temporary stages in Snowflake’s cloud storage layer, enabling parallel reads and writes.
Add the connector dependency to your Spark application:
// build.sbt
libraryDependencies += "net.snowflake" % "spark-snowflake_2.12" % "2.12.0-spark_3.4"
// Maven pom.xml
<dependency>
<groupId>net.snowflake</groupId>
<artifactId>spark-snowflake_2.12</artifactId>
<version>2.12.0-spark_3.4</version>
</dependency>
For PySpark installations:
pip install snowflake-connector-python
Basic Connection Configuration
Establish a connection using the Snowflake options map. Store credentials in environment variables or a secrets management system—never hardcode them.
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("SnowflakeIntegration") \
.config("spark.jars.packages", "net.snowflake:spark-snowflake_2.12:2.12.0-spark_3.4") \
.getOrCreate()
snowflake_options = {
"sfURL": "your_account.snowflakecomputing.com",
"sfUser": "your_username",
"sfPassword": "your_password",
"sfDatabase": "ANALYTICS_DB",
"sfSchema": "PUBLIC",
"sfWarehouse": "COMPUTE_WH",
"sfRole": "DATA_ENGINEER"
}
# Read from Snowflake
df = spark.read \
.format("snowflake") \
.options(**snowflake_options) \
.option("dbtable", "CUSTOMER_ORDERS") \
.load()
df.show()
Scala equivalent:
val snowflakeOptions = Map(
"sfURL" -> "your_account.snowflakecomputing.com",
"sfUser" -> "your_username",
"sfPassword" -> "your_password",
"sfDatabase" -> "ANALYTICS_DB",
"sfSchema" -> "PUBLIC",
"sfWarehouse" -> "COMPUTE_WH"
)
val df = spark.read
.format("snowflake")
.options(snowflakeOptions)
.option("dbtable", "CUSTOMER_ORDERS")
.load()
Key-Pair Authentication
Key-pair authentication eliminates password rotation issues and integrates with automated workflows. Generate an RSA key pair and configure Snowflake to accept it.
# Generate private key
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt
# Generate public key
openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
Configure the public key in Snowflake:
ALTER USER your_username SET RSA_PUBLIC_KEY='MIIBIjANBgkqhki...';
Use the private key in your Spark application:
snowflake_options = {
"sfURL": "your_account.snowflakecomputing.com",
"sfUser": "your_username",
"pem_private_key": "/path/to/rsa_key.p8",
"sfDatabase": "ANALYTICS_DB",
"sfSchema": "PUBLIC",
"sfWarehouse": "COMPUTE_WH"
}
Query Pushdown Optimization
The connector supports query pushdown, executing filters, aggregations, and projections directly in Snowflake rather than pulling all data into Spark. This dramatically reduces data transfer and processing time.
# Inefficient: Filters in Spark after loading all data
df = spark.read.format("snowflake") \
.options(**snowflake_options) \
.option("dbtable", "SALES") \
.load()
filtered_df = df.filter(df.sale_date >= "2024-01-01")
# Efficient: Pushdown filter to Snowflake
df = spark.read.format("snowflake") \
.options(**snowflake_options) \
.option("query", "SELECT * FROM SALES WHERE sale_date >= '2024-01-01'") \
.load()
Verify pushdown with query profiling:
snowflake_options["query"] = """
SELECT region, SUM(revenue) as total_revenue
FROM SALES
WHERE sale_date >= '2024-01-01'
GROUP BY region
"""
df = spark.read.format("snowflake") \
.options(**snowflake_options) \
.load()
# The aggregation happens in Snowflake, not Spark
df.explain(True)
Writing Data to Snowflake
Write operations use Snowflake’s COPY command for bulk loading. Configure write modes and options based on your use case.
# Overwrite existing table
sales_df.write \
.format("snowflake") \
.options(**snowflake_options) \
.option("dbtable", "PROCESSED_SALES") \
.mode("overwrite") \
.save()
# Append to existing table
sales_df.write \
.format("snowflake") \
.options(**snowflake_options) \
.option("dbtable", "SALES_HISTORY") \
.mode("append") \
.save()
# Create table with specific options
sales_df.write \
.format("snowflake") \
.options(**snowflake_options) \
.option("dbtable", "NEW_SALES") \
.option("column_mapping", "name") \
.option("truncate_table", "off") \
.option("usestagingtable", "on") \
.mode("overwrite") \
.save()
Partition Management
Proper partitioning prevents full table scans and reduces warehouse costs. Use partition columns that align with your query patterns.
# Read with partition column awareness
snowflake_options["partition_column"] = "sale_date"
snowflake_options["lower_bound"] = "2024-01-01"
snowflake_options["upper_bound"] = "2024-12-31"
snowflake_options["num_partitions"] = "12"
df = spark.read.format("snowflake") \
.options(**snowflake_options) \
.option("dbtable", "SALES") \
.load()
# Write with partitioning
sales_df.write \
.format("snowflake") \
.options(**snowflake_options) \
.option("dbtable", "PARTITIONED_SALES") \
.partitionBy("year", "month") \
.mode("overwrite") \
.save()
Performance Tuning Parameters
Configure connector-specific parameters to optimize throughput and resource utilization.
performance_options = {
**snowflake_options,
# Staging table for atomic writes
"usestagingtable": "on",
# Parallel upload threads
"parallel": "8",
# Compression for data transfer
"compression": "gzip",
# Connection pooling
"connection_pool_size": "10",
# Query result caching
"use_cached_result": "true",
# Batch size for COPY
"batch_size": "100000"
}
df = spark.read.format("snowflake") \
.options(**performance_options) \
.option("query", "SELECT * FROM LARGE_TABLE") \
.load()
Error Handling and Retry Logic
Implement retry logic for transient failures and configure appropriate timeouts.
from pyspark.sql.utils import AnalysisException
import time
def read_with_retry(options, max_retries=3):
for attempt in range(max_retries):
try:
df = spark.read.format("snowflake") \
.options(**options) \
.load()
return df
except AnalysisException as e:
if attempt < max_retries - 1:
wait_time = 2 ** attempt
print(f"Attempt {attempt + 1} failed. Retrying in {wait_time}s...")
time.sleep(wait_time)
else:
raise
# Add timeout configurations
snowflake_options["connect_timeout"] = "300"
snowflake_options["network_timeout"] = "600"
df = read_with_retry(snowflake_options)
Monitoring and Warehouse Management
Monitor query performance and warehouse utilization to optimize costs.
# Enable query tagging for tracking
snowflake_options["query_tag"] = "spark_etl_pipeline_v1"
# Use separate warehouses for different workloads
read_options = {**snowflake_options, "sfWarehouse": "LOAD_WH"}
write_options = {**snowflake_options, "sfWarehouse": "TRANSFORM_WH"}
# Auto-suspend and auto-resume are handled by Snowflake
# Configure warehouse size based on data volume
df = spark.read.format("snowflake") \
.options(read_options) \
.option("dbtable", "SOURCE_TABLE") \
.load()
transformed_df = df.groupBy("category").agg({"amount": "sum"})
transformed_df.write.format("snowflake") \
.options(write_options) \
.option("dbtable", "AGGREGATED_RESULTS") \
.mode("overwrite") \
.save()
The Spark-Snowflake connector bridges distributed processing with cloud data warehousing. Focus on query pushdown, appropriate partitioning, and warehouse sizing to minimize data transfer and compute costs. Monitor query patterns through Snowflake’s query history and adjust connector parameters based on actual performance metrics.