Apache Spark - Read/Write from HDFS

Spark reads from and writes to HDFS through Hadoop's FileSystem API. When running on a Hadoop cluster with YARN or Mesos, Spark automatically detects HDFS configuration from `core-site.xml` and...

Key Insights

  • Spark’s native HDFS integration requires no additional dependencies when deployed on Hadoop clusters, but local development needs proper Hadoop libraries and configuration files
  • Performance optimization depends on partition count, file format choice (Parquet outperforms CSV by 3-5x), and compression codecs that balance CPU overhead with I/O reduction
  • HDFS write operations support multiple save modes (overwrite, append, errorIfExists) with different atomicity guarantees and failure recovery characteristics

Configuring Spark for HDFS Access

Spark reads from and writes to HDFS through Hadoop’s FileSystem API. When running on a Hadoop cluster with YARN or Mesos, Spark automatically detects HDFS configuration from core-site.xml and hdfs-site.xml in the Hadoop configuration directory.

For local development or standalone clusters, explicitly set the HDFS NameNode URI:

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("HDFS-ReadWrite") \
    .config("spark.hadoop.fs.defaultFS", "hdfs://namenode:9000") \
    .getOrCreate()

# Alternatively, specify HDFS path directly
df = spark.read.csv("hdfs://namenode:9000/data/input.csv")

For Scala applications:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .appName("HDFS-ReadWrite")
  .config("spark.hadoop.fs.defaultFS", "hdfs://namenode:9000")
  .getOrCreate()

val df = spark.read
  .option("header", "true")
  .csv("hdfs://namenode:9000/data/input.csv")

When working with Kerberized HDFS clusters, configure authentication:

spark = SparkSession.builder \
    .appName("HDFS-Kerberos") \
    .config("spark.hadoop.fs.defaultFS", "hdfs://namenode:9000") \
    .config("spark.yarn.keytab", "/path/to/user.keytab") \
    .config("spark.yarn.principal", "user@REALM") \
    .getOrCreate()

Reading Data from HDFS

Spark supports multiple file formats with varying performance characteristics. Parquet and ORC provide columnar storage with built-in compression and predicate pushdown.

# Read Parquet files with schema inference
df_parquet = spark.read.parquet("hdfs:///data/warehouse/transactions")

# Read CSV with explicit schema for better performance
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

schema = StructType([
    StructField("transaction_id", StringType(), False),
    StructField("user_id", IntegerType(), False),
    StructField("amount", DoubleType(), False),
    StructField("timestamp", StringType(), False)
])

df_csv = spark.read \
    .option("header", "true") \
    .option("delimiter", ",") \
    .option("mode", "DROPMALFORMED") \
    .schema(schema) \
    .csv("hdfs:///data/raw/transactions/*.csv")

Reading JSON data with schema evolution:

# Read JSON with automatic schema merging
df_json = spark.read \
    .option("mergeSchema", "true") \
    .json("hdfs:///data/logs/*/events.json")

# Read with multiline JSON objects
df_multiline = spark.read \
    .option("multiline", "true") \
    .json("hdfs:///data/api/responses/*.json")

For text files requiring custom parsing:

# Read raw text files
rdd_text = spark.sparkContext.textFile("hdfs:///data/logs/app.log")

# Apply custom parsing logic
parsed_rdd = rdd_text \
    .filter(lambda line: "ERROR" in line) \
    .map(lambda line: line.split("|"))

# Convert to DataFrame
from pyspark.sql import Row
df_logs = parsed_rdd.map(lambda x: Row(timestamp=x[0], level=x[1], message=x[2])).toDF()

Writing Data to HDFS

Spark provides multiple save modes controlling write behavior when destination paths exist:

# Overwrite existing data
df.write \
    .mode("overwrite") \
    .parquet("hdfs:///data/output/results")

# Append to existing data
df.write \
    .mode("append") \
    .parquet("hdfs:///data/output/results")

# Fail if path exists (default)
df.write \
    .mode("errorIfExists") \
    .parquet("hdfs:///data/output/results")

# Ignore if path exists
df.write \
    .mode("ignore") \
    .parquet("hdfs:///data/output/results")

Writing with partitioning improves query performance for downstream consumers:

# Partition by date and region
df.write \
    .partitionBy("date", "region") \
    .mode("overwrite") \
    .parquet("hdfs:///data/warehouse/sales")

# Results in directory structure:
# /data/warehouse/sales/date=2024-01-01/region=US/
# /data/warehouse/sales/date=2024-01-01/region=EU/

Controlling file output and compression:

# Write with specific number of output files
df.repartition(10).write \
    .mode("overwrite") \
    .parquet("hdfs:///data/output/results")

# Write with compression codec
df.write \
    .option("compression", "snappy") \
    .mode("overwrite") \
    .parquet("hdfs:///data/output/compressed")

# Available codecs: none, uncompressed, snappy, gzip, lzo, brotli, lz4, zstd

Writing CSV with custom options:

df.write \
    .option("header", "true") \
    .option("delimiter", "|") \
    .option("quote", "\"") \
    .option("escape", "\\") \
    .option("compression", "gzip") \
    .mode("overwrite") \
    .csv("hdfs:///data/export/transactions")

Performance Optimization Strategies

Partition count directly impacts parallelism and file size. Too few partitions underutilize cluster resources; too many create small files causing HDFS NameNode pressure.

# Check current partition count
print(f"Partitions: {df.rdd.getNumPartitions()}")

# Repartition for optimal parallelism (rule of thumb: 2-4x executor cores)
num_executors = 10
cores_per_executor = 4
target_partitions = num_executors * cores_per_executor * 3

df_optimized = df.repartition(target_partitions)

# Coalesce when reducing partitions (avoids full shuffle)
df_reduced = df.coalesce(50)

File format comparison for a 10GB dataset:

import time

# Benchmark different formats
formats = ["csv", "json", "parquet", "orc"]
for fmt in formats:
    start = time.time()
    if fmt in ["csv", "json"]:
        spark.read.format(fmt).option("header", "true").load(f"hdfs:///benchmark/data.{fmt}").count()
    else:
        spark.read.format(fmt).load(f"hdfs:///benchmark/data.{fmt}").count()
    print(f"{fmt}: {time.time() - start:.2f}s")

# Typical results:
# csv: 45.23s
# json: 38.67s
# parquet: 12.34s (with snappy compression)
# orc: 11.89s (with zlib compression)

Predicate pushdown with columnar formats:

# Only reads necessary columns and filters at storage level
df_filtered = spark.read \
    .parquet("hdfs:///data/warehouse/transactions") \
    .select("user_id", "amount", "date") \
    .filter("date >= '2024-01-01' AND amount > 1000")

# Check physical plan to verify pushdown
df_filtered.explain()

Handling Write Failures and Data Consistency

HDFS writes are not atomic at the directory level. Use staging directories and rename operations for atomic updates:

from pyspark.sql.utils import AnalysisException

staging_path = "hdfs:///data/staging/transactions_temp"
final_path = "hdfs:///data/warehouse/transactions"

try:
    df.write \
        .mode("overwrite") \
        .parquet(staging_path)
    
    # Use Hadoop FileSystem API for atomic rename
    from pyspark import SparkFiles
    hadoop_conf = spark.sparkContext._jsc.hadoopConfiguration()
    fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(hadoop_conf)
    
    staging = spark._jvm.org.apache.hadoop.fs.Path(staging_path)
    final = spark._jvm.org.apache.hadoop.fs.Path(final_path)
    
    # Remove old data and rename
    fs.delete(final, True)
    fs.rename(staging, final)
    
except Exception as e:
    print(f"Write failed: {e}")
    # Cleanup staging directory
    fs.delete(staging, True)

For incremental updates with deduplication:

from pyspark.sql.functions import row_number
from pyspark.sql.window import Window

# Read existing data
existing_df = spark.read.parquet("hdfs:///data/warehouse/users")

# Union with new data
combined_df = existing_df.union(new_df)

# Deduplicate keeping latest record
window_spec = Window.partitionBy("user_id").orderBy(col("updated_at").desc())
deduplicated_df = combined_df \
    .withColumn("row_num", row_number().over(window_spec)) \
    .filter(col("row_num") == 1) \
    .drop("row_num")

# Write back
deduplicated_df.write \
    .mode("overwrite") \
    .parquet("hdfs:///data/warehouse/users")

Monitoring and Debugging HDFS Operations

Enable detailed logging for HDFS operations:

spark.sparkContext.setLogLevel("DEBUG")

# Monitor HDFS I/O metrics
spark.read.parquet("hdfs:///data/input").write.parquet("hdfs:///data/output")

# Access metrics through Spark UI or programmatically
metrics = spark.sparkContext._jsc.sc().env().metricsSystem()

Check HDFS file statistics:

hadoop_conf = spark.sparkContext._jsc.hadoopConfiguration()
fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(hadoop_conf)
path = spark._jvm.org.apache.hadoop.fs.Path("hdfs:///data/warehouse/transactions")

status = fs.getFileStatus(path)
print(f"Size: {status.getLen()} bytes")
print(f"Replication: {status.getReplication()}")
print(f"Block size: {status.getBlockSize()} bytes")

# List files recursively
files = fs.listFiles(path, True)
while files.hasNext():
    file_status = files.next()
    print(f"File: {file_status.getPath()}, Size: {file_status.getLen()}")

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.