Apache Spark - Read/Write from S3

Spark uses the Hadoop S3A filesystem implementation to interact with S3. You need the correct dependencies and AWS credentials configured before reading or writing data.

Key Insights

  • Spark’s S3A connector provides native S3 integration with configurable credentials, connection pooling, and multipart upload support for production workloads
  • Partition pruning and predicate pushdown with Parquet on S3 can reduce data transfer costs by 80%+ compared to reading full datasets
  • S3 eventual consistency patterns require careful handling of read-after-write scenarios, particularly when using S3Guard or implementing custom retry logic

Configuring Spark for S3 Access

Spark uses the Hadoop S3A filesystem implementation to interact with S3. You need the correct dependencies and AWS credentials configured before reading or writing data.

Add these dependencies to your build.sbt or pom.xml:

// build.sbt
libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-sql" % "3.5.0",
  "org.apache.hadoop" % "hadoop-aws" % "3.3.4",
  "com.amazonaws" % "aws-java-sdk-bundle" % "1.12.262"
)

Configure your SparkSession with S3 credentials:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .appName("S3ReadWrite")
  .config("spark.hadoop.fs.s3a.access.key", "YOUR_ACCESS_KEY")
  .config("spark.hadoop.fs.s3a.secret.key", "YOUR_SECRET_KEY")
  .config("spark.hadoop.fs.s3a.endpoint", "s3.amazonaws.com")
  .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
  .getOrCreate()

For production environments, use IAM roles instead of hardcoded credentials:

val spark = SparkSession.builder()
  .appName("S3ReadWrite")
  .config("spark.hadoop.fs.s3a.aws.credentials.provider", 
    "com.amazonaws.auth.InstanceProfileCredentialsProvider")
  .getOrCreate()

Reading Data from S3

Reading from S3 works identically to reading from local filesystems, with the s3a:// protocol prefix.

// Read CSV
val df = spark.read
  .option("header", "true")
  .option("inferSchema", "true")
  .csv("s3a://my-bucket/data/users.csv")

// Read Parquet
val parquetDF = spark.read
  .parquet("s3a://my-bucket/data/events/")

// Read JSON
val jsonDF = spark.read
  .json("s3a://my-bucket/logs/*.json")

// Read with explicit schema
import org.apache.spark.sql.types._

val schema = StructType(Array(
  StructField("user_id", LongType, nullable = false),
  StructField("event_type", StringType, nullable = false),
  StructField("timestamp", TimestampType, nullable = false),
  StructField("properties", MapType(StringType, StringType), nullable = true)
))

val eventsDF = spark.read
  .schema(schema)
  .json("s3a://my-bucket/events/")

For large datasets, use partition pruning to minimize data transfer:

// Data partitioned by date: s3://bucket/events/year=2024/month=01/day=15/
val recentEvents = spark.read
  .parquet("s3a://my-bucket/events/")
  .filter($"year" === 2024 && $"month" === 1)
  .filter($"event_type" === "purchase")

Writing Data to S3

Spark supports multiple write modes and formats when writing to S3.

// Basic write
df.write
  .mode("overwrite")
  .parquet("s3a://my-bucket/output/users/")

// Partitioned write
df.write
  .partitionBy("country", "signup_date")
  .mode("append")
  .parquet("s3a://my-bucket/output/users_partitioned/")

// Write with compression
df.write
  .option("compression", "snappy")
  .mode("overwrite")
  .parquet("s3a://my-bucket/output/compressed/")

// Write CSV with options
df.write
  .option("header", "true")
  .option("delimiter", "|")
  .mode("overwrite")
  .csv("s3a://my-bucket/output/export.csv")

Control file size and parallelism with repartitioning:

// Write fewer, larger files
df.coalesce(10)
  .write
  .mode("overwrite")
  .parquet("s3a://my-bucket/output/consolidated/")

// Distribute evenly across partitions
df.repartition(100)
  .write
  .mode("overwrite")
  .parquet("s3a://my-bucket/output/distributed/")

// Repartition by column for better partition pruning
df.repartition($"country")
  .write
  .partitionBy("country")
  .mode("overwrite")
  .parquet("s3a://my-bucket/output/by_country/")

Performance Optimization

Configure S3A settings for better throughput and reliability:

val spark = SparkSession.builder()
  .appName("OptimizedS3")
  .config("spark.hadoop.fs.s3a.connection.maximum", "100")
  .config("spark.hadoop.fs.s3a.threads.max", "256")
  .config("spark.hadoop.fs.s3a.connection.establish.timeout", "5000")
  .config("spark.hadoop.fs.s3a.connection.timeout", "200000")
  .config("spark.hadoop.fs.s3a.multipart.size", "104857600") // 100MB
  .config("spark.hadoop.fs.s3a.fast.upload", "true")
  .config("spark.hadoop.fs.s3a.fast.upload.buffer", "disk")
  .config("spark.hadoop.fs.s3a.committer.name", "magic")
  .config("spark.hadoop.fs.s3a.committer.magic.enabled", "true")
  .getOrCreate()

Use the S3A committer to avoid slow rename operations:

df.write
  .format("parquet")
  .option("mapreduce.fileoutputcommitter.algorithm.version", "2")
  .option("spark.sql.sources.commitProtocolClass", 
    "org.apache.spark.internal.io.cloud.PathOutputCommitProtocol")
  .option("spark.sql.parquet.output.committer.class",
    "org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter")
  .mode("overwrite")
  .save("s3a://my-bucket/output/optimized/")

Handling Partitioned Data

Work efficiently with partitioned datasets on S3:

// Read specific partitions
val specificPartition = spark.read
  .parquet("s3a://my-bucket/events/year=2024/month=01/")

// Dynamic partition overwrite
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")

df.write
  .partitionBy("year", "month", "day")
  .mode("overwrite")
  .parquet("s3a://my-bucket/events/")

// List partitions programmatically
import org.apache.hadoop.fs.{FileSystem, Path}

val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
val path = new Path("s3a://my-bucket/events/")
val partitions = fs.listStatus(path)
  .filter(_.isDirectory)
  .map(_.getPath.getName)

partitions.foreach(println)

Optimize partition discovery for large datasets:

spark.conf.set("spark.sql.sources.parallelPartitionDiscovery.threshold", "1000")
spark.conf.set("spark.sql.sources.parallelPartitionDiscovery.parallelism", "50")

val largePartitionedData = spark.read
  .parquet("s3a://my-bucket/massive-dataset/")

Error Handling and Retry Logic

Implement robust error handling for S3 operations:

import scala.util.{Try, Success, Failure}

def readWithRetry(path: String, maxRetries: Int = 3): Try[DataFrame] = {
  var attempts = 0
  var lastException: Option[Throwable] = None
  
  while (attempts < maxRetries) {
    Try(spark.read.parquet(path)) match {
      case Success(df) => return Success(df)
      case Failure(e) =>
        lastException = Some(e)
        attempts += 1
        Thread.sleep(1000 * attempts) // Exponential backoff
    }
  }
  
  Failure(lastException.get)
}

readWithRetry("s3a://my-bucket/data/") match {
  case Success(df) => df.show()
  case Failure(e) => println(s"Failed to read after retries: ${e.getMessage}")
}

Configure retry behavior at the S3A level:

spark.conf.set("spark.hadoop.fs.s3a.retry.limit", "5")
spark.conf.set("spark.hadoop.fs.s3a.retry.interval", "500ms")
spark.conf.set("spark.hadoop.fs.s3a.attempts.maximum", "10")

Monitoring and Debugging

Enable S3A metrics and logging:

// Enable S3A statistics
spark.conf.set("spark.hadoop.fs.s3a.metrics.enabled", "true")

// Get S3A statistics
val hadoopConf = spark.sparkContext.hadoopConfiguration
val fs = FileSystem.get(new URI("s3a://my-bucket/"), hadoopConf)
val stats = fs.getStorageStatistics

// Configure logging in log4j.properties
// log4j.logger.org.apache.hadoop.fs.s3a=DEBUG

Track S3 costs by monitoring bytes read/written:

val bytesRead = spark.sparkContext.statusTracker
  .getExecutorInfos
  .map(_.totalInputBytes)
  .sum

println(s"Total bytes read from S3: ${bytesRead / (1024 * 1024)} MB")

These patterns provide a production-ready foundation for S3 integration in Spark applications. The key is balancing parallelism, file sizes, and network configuration for your specific workload characteristics and cost constraints.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.