Apache Spark - Read/Write from Azure Blob/ADLS
Apache Spark requires specific libraries to communicate with Azure storage. Add these dependencies to your `pom.xml` for Maven projects:
Key Insights
- Azure Blob Storage and ADLS Gen2 use different connector configurations in Spark, with ADLS Gen2 requiring the
abfss://protocol and OAuth2 authentication for production workloads - Spark’s DataFrameReader/Writer API provides unified access patterns across storage systems, but performance tuning requires understanding partition strategies and file formats like Parquet and Delta Lake
- Credential management through Azure Key Vault or service principals prevents hardcoding secrets, while SAS tokens offer time-limited access for specific use cases
Connector Setup and Dependencies
Apache Spark requires specific libraries to communicate with Azure storage. Add these dependencies to your pom.xml for Maven projects:
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-azure</artifactId>
<version>3.3.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-azure-datalake</artifactId>
<version>3.3.4</version>
</dependency>
For PySpark, these libraries come pre-configured in Azure Databricks and Synapse Analytics. For standalone Spark clusters, include the JARs at runtime:
spark-submit --packages org.apache.hadoop:hadoop-azure:3.3.4 your_script.py
Authentication Methods
Storage Account Key Authentication
The simplest but least secure method uses the storage account key directly:
spark.conf.set(
f"fs.azure.account.key.{storage_account_name}.dfs.core.windows.net",
storage_account_key
)
Shared Access Signature (SAS) Token
SAS tokens provide time-limited, permission-scoped access:
sas_token = "sp=rl&st=2024-01-01T00:00:00Z&se=2024-12-31T23:59:59Z&sv=2022-11-02&sr=c&sig=..."
spark.conf.set(
f"fs.azure.sas.{container_name}.{storage_account_name}.blob.core.windows.net",
sas_token
)
OAuth2 with Service Principal (Recommended)
Production environments should use service principal authentication:
tenant_id = "your-tenant-id"
client_id = "your-client-id"
client_secret = "your-client-secret"
spark.conf.set(f"fs.azure.account.auth.type.{storage_account_name}.dfs.core.windows.net", "OAuth")
spark.conf.set(f"fs.azure.account.oauth.provider.type.{storage_account_name}.dfs.core.windows.net",
"org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set(f"fs.azure.account.oauth2.client.id.{storage_account_name}.dfs.core.windows.net", client_id)
spark.conf.set(f"fs.azure.account.oauth2.client.secret.{storage_account_name}.dfs.core.windows.net", client_secret)
spark.conf.set(f"fs.azure.account.oauth2.client.endpoint.{storage_account_name}.dfs.core.windows.net",
f"https://login.microsoftonline.com/{tenant_id}/oauth2/token")
Reading from Azure Blob Storage
Azure Blob Storage uses the wasbs:// protocol. Here’s reading a CSV file:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("AzureBlobReader").getOrCreate()
# Configure authentication
spark.conf.set(
"fs.azure.account.key.mystorageaccount.blob.core.windows.net",
storage_account_key
)
# Read CSV
blob_path = "wasbs://mycontainer@mystorageaccount.blob.core.windows.net/data/sales.csv"
df = spark.read \
.option("header", "true") \
.option("inferSchema", "true") \
.csv(blob_path)
df.show(5)
Reading Parquet files with partition discovery:
parquet_path = "wasbs://mycontainer@mystorageaccount.blob.core.windows.net/data/partitioned_sales"
df = spark.read.parquet(parquet_path)
# Spark automatically discovers partitions like year=2024/month=01/
df.filter("year = 2024 AND month = 1").show()
Reading from ADLS Gen2
ADLS Gen2 uses the abfss:// protocol and provides better performance for big data workloads:
# Using service principal authentication
adls_path = "abfss://mycontainer@mystorageaccount.dfs.core.windows.net/data/transactions.parquet"
df = spark.read.parquet(adls_path)
# Read with schema enforcement
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
schema = StructType([
StructField("transaction_id", StringType(), False),
StructField("amount", DoubleType(), False),
StructField("timestamp", TimestampType(), False),
StructField("customer_id", StringType(), False)
])
df = spark.read.schema(schema).parquet(adls_path)
Reading multiple file formats from a directory:
# Read all JSON files in a directory
json_path = "abfss://mycontainer@mystorageaccount.dfs.core.windows.net/logs/*.json"
logs_df = spark.read.json(json_path)
# Read with wildcard patterns
specific_logs = spark.read.json(
"abfss://mycontainer@mystorageaccount.dfs.core.windows.net/logs/2024/01/*/*.json"
)
Writing to Azure Storage
Writing DataFrames to Blob Storage
# Write as Parquet with compression
output_path = "wasbs://output@mystorageaccount.blob.core.windows.net/processed/sales_summary"
df.write \
.mode("overwrite") \
.option("compression", "snappy") \
.parquet(output_path)
# Write partitioned data
df.write \
.mode("append") \
.partitionBy("year", "month") \
.parquet(output_path)
Writing to ADLS Gen2
adls_output = "abfss://output@mystorageaccount.dfs.core.windows.net/curated/customer_metrics"
# Write with specific number of files
df.coalesce(10) \
.write \
.mode("overwrite") \
.option("maxRecordsPerFile", 100000) \
.parquet(adls_output)
Delta Lake Integration
Delta Lake provides ACID transactions on Azure storage:
delta_path = "abfss://delta@mystorageaccount.dfs.core.windows.net/tables/customer_events"
# Write Delta table
df.write \
.format("delta") \
.mode("overwrite") \
.save(delta_path)
# Append with schema evolution
new_data.write \
.format("delta") \
.mode("append") \
.option("mergeSchema", "true") \
.save(delta_path)
# Read Delta table
delta_df = spark.read.format("delta").load(delta_path)
# Time travel
historical_df = spark.read \
.format("delta") \
.option("versionAsOf", 5) \
.load(delta_path)
Performance Optimization
Partition Pruning
Structure data to minimize reads:
# Write with optimal partitioning
large_df.write \
.partitionBy("country", "date") \
.parquet("abfss://data@account.dfs.core.windows.net/sales")
# Read specific partitions only
filtered = spark.read \
.parquet("abfss://data@account.dfs.core.windows.net/sales") \
.filter("country = 'US' AND date >= '2024-01-01'")
File Size Management
Avoid small file problems:
# Repartition before writing
df.repartition(50) \
.write \
.mode("overwrite") \
.parquet(output_path)
# Adaptive repartitioning based on data size
num_partitions = max(1, df.count() // 1000000) # 1M records per partition
df.repartition(num_partitions).write.parquet(output_path)
Caching for Iterative Workloads
# Cache frequently accessed data
df = spark.read.parquet("abfss://data@account.dfs.core.windows.net/reference_data")
df.cache()
# Perform multiple operations
result1 = df.filter("status = 'active'").count()
result2 = df.groupBy("category").count() # Uses cached data
Error Handling and Monitoring
Implement robust error handling:
from pyspark.sql.utils import AnalysisException
try:
df = spark.read.parquet(adls_path)
except AnalysisException as e:
if "Path does not exist" in str(e):
print(f"Path not found: {adls_path}")
df = spark.createDataFrame([], schema)
else:
raise
# Monitor read/write operations
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
# Log execution plans
df.explain(mode="formatted")
This configuration enables production-grade Spark applications to efficiently process data in Azure storage while maintaining security and performance standards.