Spark Scala - Submit Spark Application (spark-submit)
Understanding `spark-submit` thoroughly separates developers who can run Spark locally from engineers who can deploy production workloads. The command abstracts away cluster-specific details while...
Key Insights
- Package your Spark application as an uber JAR with dependencies marked as “provided” to avoid conflicts with cluster-managed Spark libraries
- Choose deploy mode based on your use case: client mode for interactive debugging, cluster mode for production jobs where the driver shouldn’t depend on the submitting machine
- Master the argument order in spark-submit—Spark options must come before the JAR path, and application arguments must come after
Introduction to spark-submit
spark-submit is the unified interface for deploying Spark applications to any supported cluster manager. Whether you’re running on YARN, Kubernetes, Mesos, or Spark’s standalone cluster, this single command handles the complexity of distributing your application code, managing dependencies, and configuring resources.
Understanding spark-submit thoroughly separates developers who can run Spark locally from engineers who can deploy production workloads. The command abstracts away cluster-specific details while exposing fine-grained control over execution parameters. Get it wrong, and your jobs fail with cryptic errors. Get it right, and you have reproducible, scalable deployments.
Building a Spark Application for Submission
Before submitting anything, you need a properly packaged application. The critical concept here is the “uber JAR” or “fat JAR”—a single JAR file containing your application code and all its dependencies, except for Spark itself.
Here’s a minimal build.sbt configuration:
name := "spark-wordcount"
version := "1.0.0"
scalaVersion := "2.12.18"
val sparkVersion = "3.5.0"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion % "provided",
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
"com.typesafe" % "config" % "1.4.2"
)
assembly / assemblyMergeStrategy := {
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case "reference.conf" => MergeStrategy.concat
case x => MergeStrategy.first
}
The % "provided" scope is essential. It tells sbt that Spark libraries will be available at runtime (provided by the cluster), so they shouldn’t be bundled into your JAR. Including them would bloat your artifact and cause version conflicts.
Add the sbt-assembly plugin to project/plugins.sbt:
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.1.5")
Now, here’s a straightforward WordCount application:
package com.example.spark
import org.apache.spark.sql.SparkSession
object WordCount {
def main(args: Array[String]): Unit = {
if (args.length < 2) {
System.err.println("Usage: WordCount <input-path> <output-path>")
System.exit(1)
}
val inputPath = args(0)
val outputPath = args(1)
val spark = SparkSession.builder()
.appName("WordCount")
.getOrCreate()
import spark.implicits._
val wordCounts = spark.read.textFile(inputPath)
.flatMap(line => line.split("\\s+"))
.filter(word => word.nonEmpty)
.groupByKey(identity)
.count()
wordCounts.write.mode("overwrite").csv(outputPath)
spark.stop()
}
}
Build the uber JAR with:
sbt clean assembly
This produces target/scala-2.12/spark-wordcount-assembly-1.0.0.jar.
spark-submit Command Syntax and Structure
The command follows a strict structure:
spark-submit [spark-options] <application-jar> [application-arguments]
Order matters. Spark options must precede the JAR path. Application arguments must follow it. Mixing them up causes silent failures or unexpected behavior.
A basic template:
spark-submit \
--class com.example.spark.WordCount \
--master yarn \
--deploy-mode cluster \
/path/to/spark-wordcount-assembly-1.0.0.jar \
hdfs:///input/data.txt \
hdfs:///output/wordcount
Everything after the JAR path (hdfs:///input/data.txt and hdfs:///output/wordcount) gets passed to your main method as the args array.
Essential Configuration Options
Here are the flags you’ll use constantly:
–master: Specifies the cluster manager URL
local[*]- Local mode with all coresyarn- YARN clusterk8s://https://<host>:<port>- Kubernetesspark://<host>:7077- Standalone cluster
–deploy-mode: Where the driver runs (client or cluster)
–class: Fully qualified main class name
–executor-memory: Memory per executor (e.g., 4g, 8g)
–num-executors: Number of executor instances (YARN/Kubernetes)
–driver-memory: Memory for the driver process
–conf: Arbitrary Spark configuration properties
A production-ready example:
spark-submit \
--class com.example.spark.WordCount \
--master yarn \
--deploy-mode cluster \
--driver-memory 2g \
--executor-memory 4g \
--num-executors 10 \
--executor-cores 2 \
--conf spark.sql.shuffle.partitions=200 \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf spark.dynamicAllocation.enabled=false \
--conf spark.yarn.maxAppAttempts=1 \
/path/to/spark-wordcount-assembly-1.0.0.jar \
hdfs:///input/data.txt \
hdfs:///output/wordcount
The --conf flag accepts any property from Spark’s configuration documentation. Use it for tuning shuffle behavior, serialization, speculation, and hundreds of other parameters.
Deploy Modes: Client vs. Cluster
This distinction trips up many developers. The deploy mode determines where your driver process runs.
Client mode (--deploy-mode client): The driver runs on the machine where you execute spark-submit. Executors connect back to this machine. Use this for:
- Interactive development and debugging
- Spark shells and notebooks
- Jobs where you need to see stdout/stderr immediately
Cluster mode (--deploy-mode cluster): The driver runs on a worker node inside the cluster. The submitting machine can disconnect after submission. Use this for:
- Production batch jobs
- Scheduled jobs via orchestrators
- Situations where the submitting machine shouldn’t be a single point of failure
Here’s the same job in both modes:
# Client mode - driver runs locally, logs stream to terminal
spark-submit \
--class com.example.spark.WordCount \
--master yarn \
--deploy-mode client \
--executor-memory 4g \
--num-executors 5 \
/path/to/spark-wordcount-assembly-1.0.0.jar \
hdfs:///input/data.txt \
hdfs:///output/wordcount-client
# Cluster mode - driver runs on cluster, command returns immediately
spark-submit \
--class com.example.spark.WordCount \
--master yarn \
--deploy-mode cluster \
--executor-memory 4g \
--num-executors 5 \
/path/to/spark-wordcount-assembly-1.0.0.jar \
hdfs:///input/data.txt \
hdfs:///output/wordcount-cluster
In cluster mode, you’ll need to retrieve logs through the cluster manager’s UI or CLI (yarn logs -applicationId <app-id>).
Passing Application Arguments and Dependencies
Your application often needs external libraries beyond what’s in the uber JAR. Use these flags:
–jars: Comma-separated list of additional JARs to distribute
–packages: Maven coordinates for dependencies (downloads automatically)
–files: Files to distribute to each executor’s working directory
–py-files: Python files for PySpark applications
spark-submit \
--class com.example.spark.DataProcessor \
--master yarn \
--deploy-mode cluster \
--jars /path/to/custom-udf.jar,/path/to/jdbc-driver.jar \
--packages org.apache.spark:spark-avro_2.12:3.5.0,io.delta:delta-spark_2.12:3.0.0 \
--files /path/to/config.properties,/path/to/lookup-table.csv \
--conf spark.driver.extraJavaOptions=-Dconfig.file=config.properties \
/path/to/data-processor-assembly-1.0.0.jar \
--input hdfs:///raw/events \
--output hdfs:///processed/events \
--date 2024-01-15
The --packages flag is particularly useful—it resolves transitive dependencies from Maven Central automatically. No need to manually download JARs.
For application arguments, I recommend using a flag-based format (like --input, --output above) rather than positional arguments. Libraries like scopt or decline make parsing straightforward:
case class Config(input: String = "", output: String = "", date: String = "")
val parser = new scopt.OptionParser[Config]("DataProcessor") {
opt[String]("input").required().action((x, c) => c.copy(input = x))
opt[String]("output").required().action((x, c) => c.copy(output = x))
opt[String]("date").required().action((x, c) => c.copy(date = x))
}
Troubleshooting Common Submission Issues
ClassNotFoundException for your main class
Your --class doesn’t match the actual package path, or the class isn’t in the JAR. Verify with:
jar tf your-app.jar | grep YourMainClass
NoSuchMethodError or ClassNotFoundException for dependencies
Version conflict between your bundled dependencies and cluster-provided ones. Check that Spark dependencies are marked provided. Use spark.driver.userClassPathFirst=true as a last resort.
Java heap space / OutOfMemoryError on driver
Increase --driver-memory. For cluster mode, also check spark.yarn.am.memory if using YARN.
Container killed by YARN for exceeding memory limits
Your executors use more memory than allocated. Increase --executor-memory or reduce spark.memory.fraction. Account for off-heap memory with spark.executor.memoryOverhead.
Connection refused to master Network connectivity issue. Verify the master URL, check firewalls, and ensure the cluster is running.
Application takes forever to start Resource contention. Check cluster utilization. Your requested resources might exceed available capacity.
Debug systematically: start with a minimal configuration in client mode, verify basic connectivity, then add complexity. Check logs at every layer—spark-submit output, driver logs, executor logs, and cluster manager logs. The error you see is often a symptom, not the root cause.