Apache Spark - Read/Write from GCS

Apache Spark doesn't include GCS support out of the box. You need the Cloud Storage connector JAR that implements the Hadoop FileSystem interface for `gs://` URIs.

Key Insights

  • Apache Spark can read and write data from Google Cloud Storage using the GCS connector library, which implements the Hadoop FileSystem API for seamless integration with Spark’s native I/O operations.
  • Authentication to GCS requires either service account JSON keys or workload identity federation, with proper IAM permissions (roles/storage.objectViewer for reads, roles/storage.objectCreator for writes).
  • Performance optimization depends on choosing the right file format (Parquet for analytics, Avro for schemas, CSV for compatibility), configuring parallelism through partition counts, and leveraging predicate pushdown for columnar formats.

Setting Up the GCS Connector

Apache Spark doesn’t include GCS support out of the box. You need the Cloud Storage connector JAR that implements the Hadoop FileSystem interface for gs:// URIs.

Add the connector dependency to your build configuration:

// build.sbt
libraryDependencies += "com.google.cloud.bigdataoss" % "gcs-connector" % "hadoop3-2.2.11"

For PySpark installations:

pip install pyspark
# Download the connector JAR
wget https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-latest.jar

When initializing your Spark session, configure the GCS connector:

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("GCS Integration") \
    .config("spark.jars", "/path/to/gcs-connector-hadoop3-latest.jar") \
    .config("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") \
    .config("spark.hadoop.fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS") \
    .getOrCreate()

Authentication Configuration

The connector supports multiple authentication methods. Service account keys provide the most straightforward approach for development and testing.

spark = SparkSession.builder \
    .appName("GCS with Auth") \
    .config("spark.jars", "/path/to/gcs-connector-hadoop3-latest.jar") \
    .config("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") \
    .config("spark.hadoop.fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS") \
    .config("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .config("spark.hadoop.google.cloud.auth.service.account.json.keyfile", "/path/to/service-account-key.json") \
    .getOrCreate()

For production environments running on Google Cloud, use Application Default Credentials:

spark = SparkSession.builder \
    .appName("GCS with ADC") \
    .config("spark.jars", "/path/to/gcs-connector-hadoop3-latest.jar") \
    .config("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") \
    .config("spark.hadoop.google.cloud.auth.service.account.enable", "false") \
    .getOrCreate()

Ensure your service account has appropriate IAM roles:

# For reading
gcloud projects add-iam-policy-binding PROJECT_ID \
    --member="serviceAccount:SERVICE_ACCOUNT_EMAIL" \
    --role="roles/storage.objectViewer"

# For writing
gcloud projects add-iam-policy-binding PROJECT_ID \
    --member="serviceAccount:SERVICE_ACCOUNT_EMAIL" \
    --role="roles/storage.objectCreator"

Reading Data from GCS

Spark’s DataFrameReader API works seamlessly with GCS paths. The connector handles the protocol translation transparently.

Reading CSV files:

df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("gs://your-bucket/data/input.csv")

df.show(5)
df.printSchema()

Reading Parquet files with partition discovery:

df = spark.read \
    .parquet("gs://your-bucket/data/partitioned_data/")

# Spark automatically discovers partitions
df.select("year", "month", "revenue").show()

Reading JSON with schema specification for better performance:

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

schema = StructType([
    StructField("user_id", StringType(), True),
    StructField("transaction_amount", DoubleType(), True),
    StructField("timestamp", StringType(), True),
    StructField("category", StringType(), True)
])

df = spark.read \
    .schema(schema) \
    .json("gs://your-bucket/transactions/*.json")

Reading multiple file formats from different buckets:

sales_df = spark.read.parquet("gs://sales-bucket/2024/")
customer_df = spark.read.csv("gs://customer-bucket/customers.csv", header=True)

joined_df = sales_df.join(customer_df, sales_df.customer_id == customer_df.id)

Writing Data to GCS

Writing operations support various formats and modes. The mode parameter controls behavior when data already exists.

Writing Parquet with partitioning:

df.write \
    .mode("overwrite") \
    .partitionBy("year", "month") \
    .parquet("gs://your-bucket/output/sales_data/")

Writing CSV with custom options:

df.write \
    .mode("append") \
    .option("header", "true") \
    .option("compression", "gzip") \
    .csv("gs://your-bucket/output/reports/")

Writing JSON with single file output:

df.coalesce(1) \
    .write \
    .mode("overwrite") \
    .json("gs://your-bucket/output/summary/")

Using different write modes:

# Overwrite existing data
df.write.mode("overwrite").parquet("gs://bucket/path/")

# Append to existing data
df.write.mode("append").parquet("gs://bucket/path/")

# Error if path exists
df.write.mode("errorifexists").parquet("gs://bucket/path/")

# Ignore write if path exists
df.write.mode("ignore").parquet("gs://bucket/path/")

Performance Optimization Strategies

Control parallelism through repartitioning before writes:

# Write with specific number of output files
df.repartition(10).write.parquet("gs://bucket/output/")

# Partition by column and control file count
df.repartition(5, "country").write \
    .partitionBy("country") \
    .parquet("gs://bucket/output/")

Optimize Parquet compression for storage efficiency:

df.write \
    .option("compression", "snappy") \
    .mode("overwrite") \
    .parquet("gs://bucket/output/")

# Other compression options: gzip, lzo, brotli, lz4, zstd

Configure GCS connector for better throughput:

spark = SparkSession.builder \
    .config("spark.hadoop.fs.gs.block.size", "134217728") \
    .config("spark.hadoop.fs.gs.inputstream.buffer.size", "8388608") \
    .config("spark.hadoop.fs.gs.outputstream.buffer.size", "8388608") \
    .config("spark.hadoop.fs.gs.outputstream.upload.chunk.size", "67108864") \
    .getOrCreate()

Leverage predicate pushdown for columnar formats:

# Efficient: filters pushed to storage layer
df = spark.read.parquet("gs://bucket/large_dataset/")
filtered_df = df.filter(df.year == 2024).filter(df.revenue > 1000)

# The connector only reads relevant Parquet row groups

Handling Large Datasets

For datasets exceeding memory, use dynamic partition overwrite mode:

spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")

df.write \
    .mode("overwrite") \
    .partitionBy("date") \
    .parquet("gs://bucket/incremental_data/")

Process large files incrementally:

# Read in chunks
file_list = ["gs://bucket/file1.csv", "gs://bucket/file2.csv", "gs://bucket/file3.csv"]

for file_path in file_list:
    chunk_df = spark.read.csv(file_path, header=True)
    processed_df = chunk_df.filter(chunk_df.status == "active")
    processed_df.write.mode("append").parquet("gs://bucket/processed/")

Use broadcast joins for small dimension tables:

from pyspark.sql.functions import broadcast

large_fact = spark.read.parquet("gs://bucket/fact_sales/")
small_dim = spark.read.csv("gs://bucket/dim_product.csv", header=True)

result = large_fact.join(broadcast(small_dim), "product_id")

Error Handling and Monitoring

Implement retry logic for transient GCS failures:

from pyspark.sql.utils import AnalysisException

max_retries = 3
retry_count = 0

while retry_count < max_retries:
    try:
        df = spark.read.parquet("gs://bucket/data/")
        df.write.mode("overwrite").parquet("gs://bucket/output/")
        break
    except AnalysisException as e:
        retry_count += 1
        if retry_count == max_retries:
            raise
        time.sleep(2 ** retry_count)

Monitor job progress and data statistics:

# Check record counts
input_count = df.count()
print(f"Processing {input_count} records")

# Verify output
output_df = spark.read.parquet("gs://bucket/output/")
output_count = output_df.count()
print(f"Written {output_count} records")

# Get partition information
spark.sql("DESCRIBE EXTENDED parquet.`gs://bucket/output/`").show()

This configuration provides production-ready GCS integration with Spark, handling authentication, performance tuning, and error scenarios effectively.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.