Apache Spark - Spark UI - Understanding the Interface
The Spark UI is the window into your application's soul. Every transformation, every shuffle, every memory spike—it's all there if you know where to look. Too many engineers treat Spark as a black...
Key Insights
- The Spark UI is your primary diagnostic tool for understanding job execution, identifying bottlenecks, and optimizing performance—learn to read it fluently before tuning configurations blindly.
- Stage boundaries occur at shuffle operations (wide transformations), and understanding this relationship helps you predict and minimize data movement across your cluster.
- The SQL tab’s query plans reveal exactly how Spark executes your DataFrame operations, making it essential for debugging slow queries and validating optimization strategies.
Introduction to Spark UI
The Spark UI is the window into your application’s soul. Every transformation, every shuffle, every memory spike—it’s all there if you know where to look. Too many engineers treat Spark as a black box, throwing more executors at problems that better partitioning would solve. The UI exists to prevent this waste.
By default, the Spark UI runs on port 4040 during application execution. If that port is occupied, Spark increments to 4041, 4042, and so on. For completed applications, the History Server provides the same interface for post-mortem analysis.
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("SparkUIDemo") \
.config("spark.ui.port", "4050") \
.config("spark.eventLog.enabled", "true") \
.config("spark.eventLog.dir", "/tmp/spark-events") \
.config("spark.history.fs.logDirectory", "/tmp/spark-events") \
.getOrCreate()
# Access UI at http://localhost:4050 while application runs
print(f"Spark UI available at: {spark.sparkContext.uiWebUrl}")
The eventLog configurations enable the History Server to replay completed jobs. Without these, your debugging options disappear the moment your application terminates.
Jobs Tab - Tracking Application Progress
The Jobs tab displays every Spark job triggered by your application. Here’s the critical insight: jobs are created by actions, not transformations. Each count(), collect(), write(), or show() spawns a new job.
from pyspark.sql.functions import col, sum as spark_sum
# Create sample data
data = [(i, f"product_{i % 100}", i * 10.5) for i in range(1000000)]
df = spark.createDataFrame(data, ["id", "product", "amount"])
# Job 1: count action
total_records = df.count()
print(f"Total records: {total_records}")
# Job 2: filter and count
high_value = df.filter(col("amount") > 5000).count()
print(f"High value records: {high_value}")
# Job 3: aggregation and collect
product_totals = df.groupBy("product") \
.agg(spark_sum("amount").alias("total")) \
.orderBy(col("total").desc()) \
.limit(10) \
.collect()
# Job 4: write action
df.write.mode("overwrite").parquet("/tmp/output")
In the Jobs tab, you’ll see four distinct jobs. Each shows its status (succeeded, failed, running), duration, and the number of stages. Click any job to see its DAG visualization—a directed acyclic graph showing the execution plan.
The DAG visualization reveals transformation dependencies. When you see a job with many stages, that’s your signal to investigate shuffle operations. More stages typically means more data movement across the network.
Stages Tab - Understanding Task Execution
Stages represent units of work that can execute without shuffling data. Spark creates stage boundaries at wide transformations—operations requiring data redistribution like groupByKey, reduceByKey, join, and repartition.
from pyspark.sql.functions import count, avg
# Create two DataFrames for join demonstration
orders = spark.createDataFrame([
(1, "customer_1", 100.0),
(2, "customer_2", 200.0),
(3, "customer_1", 150.0),
(4, "customer_3", 300.0),
], ["order_id", "customer_id", "amount"])
customers = spark.createDataFrame([
("customer_1", "Alice", "NY"),
("customer_2", "Bob", "CA"),
("customer_3", "Charlie", "TX"),
], ["customer_id", "name", "state"])
# This query creates multiple stages due to shuffle operations
result = orders \
.groupBy("customer_id") \
.agg(
count("order_id").alias("order_count"),
avg("amount").alias("avg_amount")
) \
.join(customers, "customer_id") \
.groupBy("state") \
.agg(spark_sum("order_count").alias("total_orders"))
result.show()
This query generates at least three stages: one for reading and initial processing, one after the first groupBy shuffle, and another after the join shuffle.
In the Stages tab, examine these metrics:
- Shuffle Read/Write: Data moved between stages. High values indicate expensive operations.
- Task Duration Distribution: The summary statistics show min, median, and max task times. Large gaps suggest data skew.
- Locality Level: PROCESS_LOCAL is fastest, NODE_LOCAL is acceptable, ANY means data traveled across nodes.
Click into any stage to see individual task metrics. The task table reveals stragglers—tasks taking significantly longer than peers, usually indicating skewed partitions.
Storage Tab - Monitoring Cached Data
When you cache DataFrames or RDDs, the Storage tab shows what’s actually persisted in memory and disk. This visibility is crucial for understanding memory pressure.
from pyspark import StorageLevel
# Create and cache a DataFrame
large_df = spark.range(0, 10000000) \
.withColumn("value", col("id") * 2) \
.withColumn("category", (col("id") % 100).cast("string"))
# MEMORY_AND_DISK is the default for DataFrame.cache()
large_df.cache()
# Force materialization - cache is lazy
large_df.count()
# Alternative: explicit persistence level
another_df = large_df.filter(col("value") > 1000000)
another_df.persist(StorageLevel.MEMORY_ONLY_SER)
another_df.count()
# Check storage in UI, then unpersist when done
large_df.unpersist()
another_df.unpersist()
The Storage tab displays:
- RDD Name: The DataFrame or RDD identifier
- Storage Level: MEMORY_ONLY, MEMORY_AND_DISK, DISK_ONLY, etc.
- Cached Partitions: How many partitions are actually cached
- Size in Memory/Disk: Actual storage consumption
- Fraction Cached: Percentage successfully cached (less than 100% indicates memory pressure)
If you see partial caching, your executors lack sufficient memory. Either increase spark.executor.memory or use a serialized storage level to reduce footprint.
Executors Tab - Resource Utilization
The Executors tab provides a health dashboard for your cluster resources. Each row represents an executor (plus the driver), showing real-time metrics.
# Configuration for executor resources (typically in spark-submit or cluster config)
spark_conf = SparkSession.builder \
.appName("ExecutorDemo") \
.config("spark.executor.memory", "4g") \
.config("spark.executor.cores", "2") \
.config("spark.executor.memoryOverhead", "1g") \
.config("spark.memory.fraction", "0.6") \
.config("spark.memory.storageFraction", "0.5") \
.getOrCreate()
Key metrics to monitor:
- Storage Memory: Memory used for caching. If maxed out, cached data spills or evicts.
- Disk Used: Non-zero values indicate shuffle spill—data couldn’t fit in memory during shuffle operations.
- Task Time: Total time spent executing tasks. Imbalanced values across executors suggest uneven data distribution.
- GC Time: Garbage collection overhead. If this exceeds 10% of task time, increase memory or reduce object creation.
- Shuffle Read/Write: Data transferred during shuffles. High values per executor indicate that executor processed skewed partitions.
Watch for executors with significantly higher GC time or disk usage—these are your bottlenecks.
SQL Tab - Query Execution Plans
For DataFrame and SQL operations, the SQL tab provides the most detailed execution information. Each query shows its physical plan with actual runtime metrics.
from pyspark.sql.functions import broadcast
# Create tables for complex query
sales = spark.createDataFrame([
(1, 101, "2024-01-15", 500.0),
(2, 102, "2024-01-16", 750.0),
(3, 101, "2024-01-17", 300.0),
(4, 103, "2024-01-18", 1200.0),
], ["sale_id", "product_id", "sale_date", "amount"])
products = spark.createDataFrame([
(101, "Widget", "Electronics"),
(102, "Gadget", "Electronics"),
(103, "Tool", "Hardware"),
], ["product_id", "name", "category"])
# Complex query with join and aggregation
query_result = sales \
.join(broadcast(products), "product_id") \
.groupBy("category") \
.agg(
spark_sum("amount").alias("total_sales"),
count("sale_id").alias("num_sales")
) \
.filter(col("total_sales") > 100)
query_result.explain(mode="extended")
query_result.show()
The SQL tab shows a visual DAG of the physical plan. Click any node to see:
- Number of output rows: Actual vs estimated helps identify statistics issues
- Data size: Bytes processed at each step
- Time: Duration of each operation
Look for Exchange nodes—these represent shuffles. BroadcastExchange is cheaper than ShuffleExchange. If you see SortMergeJoin where BroadcastHashJoin would work, your small table might need an explicit broadcast() hint.
Practical Debugging Tips
The Spark UI tells you what’s wrong. Here’s how to interpret common symptoms:
Data Skew: In the Stages tab, task duration shows massive variance (e.g., median 2 seconds, max 5 minutes). Fix with salting or isolation of skewed keys.
from pyspark.sql.functions import when, concat, lit, rand
# Before: skewed join on customer_id where one customer has millions of records
skewed_result = large_orders.join(customers, "customer_id")
# After: salt the skewed key
salted_orders = large_orders.withColumn(
"salted_key",
concat(col("customer_id"), lit("_"), (rand() * 10).cast("int"))
)
# Explode the small table to match salted keys
from pyspark.sql.functions import explode, array
salted_customers = customers.withColumn(
"salt",
explode(array([lit(i) for i in range(10)]))
).withColumn(
"salted_key",
concat(col("customer_id"), lit("_"), col("salt"))
)
balanced_result = salted_orders.join(salted_customers, "salted_key")
Insufficient Parallelism: Few tasks processing large amounts of data. Increase partitions with repartition() or adjust spark.sql.shuffle.partitions.
Excessive Shuffling: Multiple Exchange nodes in SQL plans. Restructure queries to minimize wide transformations, use broadcast joins for small tables.
Memory Pressure: High GC time, disk spill in Executors tab. Increase executor memory, use serialized caching, or reduce partition sizes.
The Spark UI doesn’t lie. When performance problems arise, open it first, hypothesize second, and tune third. This discipline separates engineers who optimize effectively from those who guess.