Apache Spark - Read/Write from Azure Blob/ADLS

Apache Spark requires specific libraries to communicate with Azure storage. Add these dependencies to your `pom.xml` for Maven projects:

Key Insights

  • Azure Blob Storage and ADLS Gen2 use different connector configurations in Spark, with ADLS Gen2 requiring the abfss:// protocol and OAuth2 authentication for production workloads
  • Spark’s DataFrameReader/Writer API provides unified access patterns across storage systems, but performance tuning requires understanding partition strategies and file formats like Parquet and Delta Lake
  • Credential management through Azure Key Vault or service principals prevents hardcoding secrets, while SAS tokens offer time-limited access for specific use cases

Connector Setup and Dependencies

Apache Spark requires specific libraries to communicate with Azure storage. Add these dependencies to your pom.xml for Maven projects:

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-azure</artifactId>
    <version>3.3.4</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-azure-datalake</artifactId>
    <version>3.3.4</version>
</dependency>

For PySpark, these libraries come pre-configured in Azure Databricks and Synapse Analytics. For standalone Spark clusters, include the JARs at runtime:

spark-submit --packages org.apache.hadoop:hadoop-azure:3.3.4 your_script.py

Authentication Methods

Storage Account Key Authentication

The simplest but least secure method uses the storage account key directly:

spark.conf.set(
    f"fs.azure.account.key.{storage_account_name}.dfs.core.windows.net",
    storage_account_key
)

Shared Access Signature (SAS) Token

SAS tokens provide time-limited, permission-scoped access:

sas_token = "sp=rl&st=2024-01-01T00:00:00Z&se=2024-12-31T23:59:59Z&sv=2022-11-02&sr=c&sig=..."

spark.conf.set(
    f"fs.azure.sas.{container_name}.{storage_account_name}.blob.core.windows.net",
    sas_token
)

Production environments should use service principal authentication:

tenant_id = "your-tenant-id"
client_id = "your-client-id"
client_secret = "your-client-secret"

spark.conf.set(f"fs.azure.account.auth.type.{storage_account_name}.dfs.core.windows.net", "OAuth")
spark.conf.set(f"fs.azure.account.oauth.provider.type.{storage_account_name}.dfs.core.windows.net", 
               "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set(f"fs.azure.account.oauth2.client.id.{storage_account_name}.dfs.core.windows.net", client_id)
spark.conf.set(f"fs.azure.account.oauth2.client.secret.{storage_account_name}.dfs.core.windows.net", client_secret)
spark.conf.set(f"fs.azure.account.oauth2.client.endpoint.{storage_account_name}.dfs.core.windows.net", 
               f"https://login.microsoftonline.com/{tenant_id}/oauth2/token")

Reading from Azure Blob Storage

Azure Blob Storage uses the wasbs:// protocol. Here’s reading a CSV file:

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("AzureBlobReader").getOrCreate()

# Configure authentication
spark.conf.set(
    "fs.azure.account.key.mystorageaccount.blob.core.windows.net",
    storage_account_key
)

# Read CSV
blob_path = "wasbs://mycontainer@mystorageaccount.blob.core.windows.net/data/sales.csv"
df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv(blob_path)

df.show(5)

Reading Parquet files with partition discovery:

parquet_path = "wasbs://mycontainer@mystorageaccount.blob.core.windows.net/data/partitioned_sales"
df = spark.read.parquet(parquet_path)

# Spark automatically discovers partitions like year=2024/month=01/
df.filter("year = 2024 AND month = 1").show()

Reading from ADLS Gen2

ADLS Gen2 uses the abfss:// protocol and provides better performance for big data workloads:

# Using service principal authentication
adls_path = "abfss://mycontainer@mystorageaccount.dfs.core.windows.net/data/transactions.parquet"

df = spark.read.parquet(adls_path)

# Read with schema enforcement
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

schema = StructType([
    StructField("transaction_id", StringType(), False),
    StructField("amount", DoubleType(), False),
    StructField("timestamp", TimestampType(), False),
    StructField("customer_id", StringType(), False)
])

df = spark.read.schema(schema).parquet(adls_path)

Reading multiple file formats from a directory:

# Read all JSON files in a directory
json_path = "abfss://mycontainer@mystorageaccount.dfs.core.windows.net/logs/*.json"
logs_df = spark.read.json(json_path)

# Read with wildcard patterns
specific_logs = spark.read.json(
    "abfss://mycontainer@mystorageaccount.dfs.core.windows.net/logs/2024/01/*/*.json"
)

Writing to Azure Storage

Writing DataFrames to Blob Storage

# Write as Parquet with compression
output_path = "wasbs://output@mystorageaccount.blob.core.windows.net/processed/sales_summary"

df.write \
    .mode("overwrite") \
    .option("compression", "snappy") \
    .parquet(output_path)

# Write partitioned data
df.write \
    .mode("append") \
    .partitionBy("year", "month") \
    .parquet(output_path)

Writing to ADLS Gen2

adls_output = "abfss://output@mystorageaccount.dfs.core.windows.net/curated/customer_metrics"

# Write with specific number of files
df.coalesce(10) \
    .write \
    .mode("overwrite") \
    .option("maxRecordsPerFile", 100000) \
    .parquet(adls_output)

Delta Lake Integration

Delta Lake provides ACID transactions on Azure storage:

delta_path = "abfss://delta@mystorageaccount.dfs.core.windows.net/tables/customer_events"

# Write Delta table
df.write \
    .format("delta") \
    .mode("overwrite") \
    .save(delta_path)

# Append with schema evolution
new_data.write \
    .format("delta") \
    .mode("append") \
    .option("mergeSchema", "true") \
    .save(delta_path)

# Read Delta table
delta_df = spark.read.format("delta").load(delta_path)

# Time travel
historical_df = spark.read \
    .format("delta") \
    .option("versionAsOf", 5) \
    .load(delta_path)

Performance Optimization

Partition Pruning

Structure data to minimize reads:

# Write with optimal partitioning
large_df.write \
    .partitionBy("country", "date") \
    .parquet("abfss://data@account.dfs.core.windows.net/sales")

# Read specific partitions only
filtered = spark.read \
    .parquet("abfss://data@account.dfs.core.windows.net/sales") \
    .filter("country = 'US' AND date >= '2024-01-01'")

File Size Management

Avoid small file problems:

# Repartition before writing
df.repartition(50) \
    .write \
    .mode("overwrite") \
    .parquet(output_path)

# Adaptive repartitioning based on data size
num_partitions = max(1, df.count() // 1000000)  # 1M records per partition
df.repartition(num_partitions).write.parquet(output_path)

Caching for Iterative Workloads

# Cache frequently accessed data
df = spark.read.parquet("abfss://data@account.dfs.core.windows.net/reference_data")
df.cache()

# Perform multiple operations
result1 = df.filter("status = 'active'").count()
result2 = df.groupBy("category").count()  # Uses cached data

Error Handling and Monitoring

Implement robust error handling:

from pyspark.sql.utils import AnalysisException

try:
    df = spark.read.parquet(adls_path)
except AnalysisException as e:
    if "Path does not exist" in str(e):
        print(f"Path not found: {adls_path}")
        df = spark.createDataFrame([], schema)
    else:
        raise

# Monitor read/write operations
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")

# Log execution plans
df.explain(mode="formatted")

This configuration enables production-grade Spark applications to efficiently process data in Azure storage while maintaining security and performance standards.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.