Spark Scala - SparkSession Configuration

Before Spark 2.0, developers juggled multiple entry points: SparkContext for core RDD operations, SQLContext for DataFrames, and HiveContext for Hive integration. This fragmentation created confusion...

Key Insights

  • SparkSession is the single entry point for all Spark functionality since Spark 2.0, consolidating SparkContext, SQLContext, and HiveContext into one unified API that simplifies application initialization.
  • Configuration order matters: spark-defaults.conf is overridden by spark-submit flags, which are overridden by programmatic .config() calls—understanding this precedence prevents frustrating debugging sessions.
  • Most configuration mistakes happen after SparkSession creation; many settings are immutable once the session starts, so establish your configuration strategy before calling .getOrCreate().

Introduction to SparkSession

Before Spark 2.0, developers juggled multiple entry points: SparkContext for core RDD operations, SQLContext for DataFrames, and HiveContext for Hive integration. This fragmentation created confusion and boilerplate code. SparkSession unified these interfaces into a single, coherent API.

SparkSession serves as your application’s gateway to Spark functionality. It manages the underlying SparkContext, provides DataFrame and Dataset APIs, handles catalog operations, and optionally integrates with Hive. Every Spark application starts by configuring and creating a SparkSession—get this wrong, and you’ll fight performance issues throughout your application’s lifecycle.

The configuration you apply to SparkSession determines resource allocation, execution behavior, and integration capabilities. This isn’t just boilerplate setup; it’s architectural decision-making that affects every query and transformation your application performs.

Creating a Basic SparkSession

SparkSession uses the builder pattern, providing a fluent API for configuration. At minimum, you need an application name and a master URL (though the master is typically set externally in production).

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .appName("MySparkApplication")
  .master("local[*]")
  .getOrCreate()

// Access the underlying SparkContext if needed
val sc = spark.sparkContext

// Your application logic here
import spark.implicits._

val data = Seq(("Alice", 34), ("Bob", 45), ("Charlie", 29))
val df = data.toDF("name", "age")
df.show()

// Clean up
spark.stop()

The getOrCreate() method is idempotent—it returns an existing SparkSession if one exists with the same configuration, or creates a new one. This behavior is useful in notebooks and testing scenarios but can mask configuration issues in production. Be explicit about when you expect a new session versus reusing an existing one.

Setting master("local[*]") runs Spark locally using all available cores. Never hardcode this in production code; instead, let your cluster manager or spark-submit command specify the master.

Core Configuration Options

Real applications require tuning beyond the basics. Memory allocation, parallelism, and serialization settings directly impact performance.

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .appName("TunedSparkApplication")
  .config("spark.executor.memory", "4g")
  .config("spark.executor.cores", "4")
  .config("spark.driver.memory", "2g")
  .config("spark.sql.shuffle.partitions", "200")
  .config("spark.default.parallelism", "100")
  .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  .config("spark.kryoserializer.buffer.max", "1024m")
  .config("spark.sql.broadcastTimeout", "600")
  .getOrCreate()

Let me break down these configurations:

Memory settings (spark.executor.memory, spark.driver.memory) control heap allocation. Executors do the heavy lifting—give them more memory. The driver coordinates tasks and collects results; size it based on your collect operations and broadcast variables.

Parallelism settings (spark.sql.shuffle.partitions, spark.default.parallelism) determine how Spark distributes work. The default 200 shuffle partitions works for moderate datasets but causes excessive overhead for small data and insufficient parallelism for large clusters. Match these to your data size and cluster capacity.

Serialization (spark.serializer) affects data transfer between executors. Kryo serialization is faster and more compact than Java serialization. Register your custom classes with Kryo for optimal performance.

Environment-Specific Configurations

Production applications run across different environments: local development, staging clusters, and production deployments. Hardcoding configurations creates maintenance nightmares.

import org.apache.spark.sql.SparkSession

object SparkSessionFactory {
  
  def create(environment: String): SparkSession = {
    val builder = SparkSession.builder()
      .appName(s"MyApp-$environment")
    
    environment.toLowerCase match {
      case "local" =>
        builder
          .master("local[4]")
          .config("spark.sql.shuffle.partitions", "8")
          .config("spark.driver.memory", "2g")
          
      case "yarn" =>
        builder
          .master("yarn")
          .config("spark.submit.deployMode", "cluster")
          .config("spark.executor.instances", "10")
          .config("spark.executor.memory", "8g")
          .config("spark.executor.cores", "4")
          .config("spark.yarn.queue", "production")
          .config("spark.sql.shuffle.partitions", "400")
          
      case "kubernetes" =>
        builder
          .master("k8s://https://kubernetes-api-server:443")
          .config("spark.kubernetes.container.image", "my-spark-image:latest")
          .config("spark.kubernetes.namespace", "spark-jobs")
          .config("spark.executor.instances", "5")
          .config("spark.executor.memory", "4g")
          
      case _ =>
        throw new IllegalArgumentException(s"Unknown environment: $environment")
    }
    
    builder.getOrCreate()
  }
}

// Usage
val env = sys.env.getOrElse("SPARK_ENV", "local")
val spark = SparkSessionFactory.create(env)

For externalized configuration, Spark reads spark-defaults.conf from the SPARK_HOME/conf directory. This file contains key-value pairs:

# spark-defaults.conf
spark.executor.memory=4g
spark.sql.shuffle.partitions=200
spark.serializer=org.apache.spark.serializer.KryoSerializer

Configuration precedence flows from lowest to highest priority: spark-defaults.confspark-submit flags → programmatic .config() calls. Use spark-defaults.conf for cluster-wide defaults, spark-submit for job-specific overrides, and programmatic configuration sparingly for truly application-specific settings.

Hive and Catalog Integration

Many organizations store metadata in Hive Metastore, enabling table discovery and schema management across tools. SparkSession integrates seamlessly with Hive when properly configured.

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .appName("HiveIntegratedApp")
  .config("spark.sql.warehouse.dir", "/user/hive/warehouse")
  .config("hive.metastore.uris", "thrift://metastore-host:9083")
  .config("spark.sql.catalogImplementation", "hive")
  .enableHiveSupport()
  .getOrCreate()

// Now you can query Hive tables directly
spark.sql("SHOW DATABASES").show()
spark.sql("USE analytics")
spark.sql("SELECT * FROM user_events LIMIT 10").show()

// Create managed tables that persist to Hive
spark.sql("""
  CREATE TABLE IF NOT EXISTS processed_events (
    event_id STRING,
    user_id STRING,
    event_timestamp TIMESTAMP,
    event_type STRING
  )
  USING PARQUET
  PARTITIONED BY (event_date DATE)
""")

The enableHiveSupport() call activates Hive SerDes, UDFs, and the Hive metastore catalog. Without it, Spark uses an in-memory catalog that doesn’t persist between sessions.

Ensure your Spark distribution includes Hive dependencies. The pre-built Spark packages from Apache include Hive support, but custom builds might not.

Advanced Configuration Patterns

Production workloads benefit from dynamic resource allocation and adaptive query execution. These features let Spark respond to actual workload characteristics rather than static configurations.

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .appName("ProductionSparkApp")
  // Dynamic allocation - scale executors based on workload
  .config("spark.dynamicAllocation.enabled", "true")
  .config("spark.dynamicAllocation.minExecutors", "2")
  .config("spark.dynamicAllocation.maxExecutors", "100")
  .config("spark.dynamicAllocation.executorIdleTimeout", "60s")
  .config("spark.shuffle.service.enabled", "true")
  
  // Adaptive Query Execution (Spark 3.0+)
  .config("spark.sql.adaptive.enabled", "true")
  .config("spark.sql.adaptive.coalescePartitions.enabled", "true")
  .config("spark.sql.adaptive.skewJoin.enabled", "true")
  .config("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5")
  .config("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256MB")
  
  // S3 configuration for cloud deployments
  .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
  .config("spark.hadoop.fs.s3a.aws.credentials.provider", 
          "com.amazonaws.auth.DefaultAWSCredentialsProviderChain")
  .config("spark.hadoop.fs.s3a.endpoint", "s3.amazonaws.com")
  .config("spark.hadoop.fs.s3a.fast.upload", "true")
  .config("spark.hadoop.fs.s3a.path.style.access", "true")
  
  // Speculation for stragglers
  .config("spark.speculation", "true")
  .config("spark.speculation.multiplier", "1.5")
  .config("spark.speculation.quantile", "0.9")
  
  .getOrCreate()

Dynamic allocation adjusts executor count based on pending tasks. It requires the external shuffle service to preserve shuffle data when executors are removed. This prevents over-provisioning during quiet periods and scales up during intensive operations.

Adaptive Query Execution (AQE) optimizes query plans at runtime based on actual data statistics. It handles partition coalescing (reducing small partitions), skew join optimization (splitting hot partitions), and runtime filter pushdown. Enable AQE in Spark 3.0+ unless you have specific reasons not to—it’s remarkably effective.

Best Practices and Troubleshooting

Configuration mistakes waste hours of debugging time. Follow these practices to avoid common pitfalls.

First, inspect your active configuration. This utility function helps during development and troubleshooting:

import org.apache.spark.sql.SparkSession
import scala.collection.JavaConverters._

object SparkConfigUtils {
  
  def logActiveConfiguration(spark: SparkSession): Unit = {
    println("=" * 60)
    println("ACTIVE SPARK CONFIGURATION")
    println("=" * 60)
    
    val configs = spark.conf.getAll
    val sortedConfigs = configs.toSeq.sortBy(_._1)
    
    val categories = Map(
      "spark.sql" -> "SQL Settings",
      "spark.executor" -> "Executor Settings",
      "spark.driver" -> "Driver Settings",
      "spark.shuffle" -> "Shuffle Settings",
      "spark.dynamicAllocation" -> "Dynamic Allocation",
      "spark.hadoop" -> "Hadoop/Cloud Settings"
    )
    
    categories.foreach { case (prefix, label) =>
      val matching = sortedConfigs.filter(_._1.startsWith(prefix))
      if (matching.nonEmpty) {
        println(s"\n$label:")
        matching.foreach { case (key, value) =>
          val displayValue = if (key.toLowerCase.contains("password") || 
                                  key.toLowerCase.contains("secret")) "****" else value
          println(s"  $key = $displayValue")
        }
      }
    }
    println("=" * 60)
  }
  
  def validateConfiguration(spark: SparkSession): Seq[String] = {
    val warnings = scala.collection.mutable.ListBuffer[String]()
    
    val shufflePartitions = spark.conf.get("spark.sql.shuffle.partitions", "200").toInt
    if (shufflePartitions == 200) {
      warnings += "Using default shuffle partitions (200). Consider tuning for your data size."
    }
    
    if (spark.conf.getOption("spark.serializer").isEmpty) {
      warnings += "Using default Java serializer. Consider Kryo for better performance."
    }
    
    val aqeEnabled = spark.conf.get("spark.sql.adaptive.enabled", "false").toBoolean
    if (!aqeEnabled && spark.version >= "3.0") {
      warnings += "Adaptive Query Execution is disabled. Enable it for automatic optimization."
    }
    
    warnings.toSeq
  }
}

// Usage
val spark = SparkSession.builder()
  .appName("ConfigDemo")
  .master("local[*]")
  .getOrCreate()

SparkConfigUtils.logActiveConfiguration(spark)
SparkConfigUtils.validateConfiguration(spark).foreach(w => println(s"WARNING: $w"))

Key pitfalls to avoid:

  1. Setting configs after session creation: Many configurations are immutable once SparkSession starts. Always configure before calling getOrCreate().

  2. Ignoring configuration precedence: When your programmatic config doesn’t take effect, check if spark-submit or spark-defaults.conf is overriding it.

  3. Copying configurations blindly: What works for one workload fails for another. Understand what each setting does before applying it.

  4. Forgetting to stop the session: In applications (not notebooks), always call spark.stop() to release resources cleanly.

SparkSession configuration isn’t glamorous work, but it’s foundational. Invest time understanding these options, and your Spark applications will reward you with reliable, performant execution.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.