Spark SQL - Window Functions Tutorial
Window functions perform calculations across a set of rows that are related to the current row. Unlike aggregate functions with GROUP BY that collapse multiple rows into one, window functions...
Key Insights
- Window functions in Spark SQL enable complex analytical queries by computing values across row sets related to the current row, without collapsing rows like GROUP BY operations
- The OVER clause defines the window specification through PARTITION BY for grouping, ORDER BY for sequencing, and frame specifications for defining the exact row range to consider
- Ranking, aggregate, and analytic window functions solve real-world problems like running totals, moving averages, and row deduplication more efficiently than self-joins or multiple passes over data
Understanding Window Functions
Window functions perform calculations across a set of rows that are related to the current row. Unlike aggregate functions with GROUP BY that collapse multiple rows into one, window functions maintain all rows while adding computed columns based on the window specification.
The basic syntax structure:
function_name(expression) OVER (
[PARTITION BY partition_expression]
[ORDER BY sort_expression]
[frame_specification]
)
Let’s create a sample dataset to work with throughout this tutorial:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
spark = SparkSession.builder.appName("WindowFunctions").getOrCreate()
data = [
("Alice", "Sales", "2024-01", 5000),
("Alice", "Sales", "2024-02", 5500),
("Alice", "Sales", "2024-03", 6000),
("Bob", "Sales", "2024-01", 4500),
("Bob", "Sales", "2024-02", 4800),
("Bob", "Sales", "2024-03", 5200),
("Charlie", "Engineering", "2024-01", 7000),
("Charlie", "Engineering", "2024-02", 7200),
("Charlie", "Engineering", "2024-03", 7500),
("Diana", "Engineering", "2024-01", 6800),
("Diana", "Engineering", "2024-02", 7000),
("Diana", "Engineering", "2024-03", 7300)
]
df = spark.createDataFrame(data, ["name", "department", "month", "revenue"])
df.createOrReplaceTempView("sales_data")
Ranking Functions
Ranking functions assign ranks to rows within a partition. The three primary ranking functions are ROW_NUMBER, RANK, and DENSE_RANK.
# Using PySpark DataFrame API
window_spec = Window.partitionBy("department").orderBy(F.desc("revenue"))
df_ranked = df.withColumn("row_num", F.row_number().over(window_spec)) \
.withColumn("rank", F.rank().over(window_spec)) \
.withColumn("dense_rank", F.dense_rank().over(window_spec))
df_ranked.show()
Using SQL:
SELECT
name,
department,
month,
revenue,
ROW_NUMBER() OVER (PARTITION BY department ORDER BY revenue DESC) as row_num,
RANK() OVER (PARTITION BY department ORDER BY revenue DESC) as rank,
DENSE_RANK() OVER (PARTITION BY department ORDER BY revenue DESC) as dense_rank
FROM sales_data
ORDER BY department, revenue DESC;
The differences matter when handling ties:
- ROW_NUMBER assigns unique sequential numbers (1, 2, 3, 4)
- RANK creates gaps after ties (1, 2, 2, 4)
- DENSE_RANK maintains sequential numbering despite ties (1, 2, 2, 3)
Aggregate Window Functions
Apply aggregate functions over a window without collapsing rows. Common functions include SUM, AVG, MIN, MAX, and COUNT.
# Running total and department averages
window_running = Window.partitionBy("name").orderBy("month").rowsBetween(Window.unboundedPreceding, Window.currentRow)
window_dept = Window.partitionBy("department")
df_agg = df.withColumn("running_total", F.sum("revenue").over(window_running)) \
.withColumn("dept_avg", F.avg("revenue").over(window_dept)) \
.withColumn("revenue_vs_avg", F.col("revenue") - F.col("dept_avg"))
df_agg.show()
SQL equivalent:
SELECT
name,
department,
month,
revenue,
SUM(revenue) OVER (PARTITION BY name ORDER BY month
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as running_total,
AVG(revenue) OVER (PARTITION BY department) as dept_avg,
revenue - AVG(revenue) OVER (PARTITION BY department) as revenue_vs_avg
FROM sales_data
ORDER BY name, month;
Frame Specifications
Frame specifications define which rows within the partition to include in the calculation. Two frame types exist: ROWS and RANGE.
# Moving average over 3-month window
window_moving = Window.partitionBy("name").orderBy("month").rowsBetween(-2, 0)
df_moving = df.withColumn("moving_avg_3m", F.avg("revenue").over(window_moving))
df_moving.show()
Frame boundaries:
UNBOUNDED PRECEDING: From partition startUNBOUNDED FOLLOWING: To partition endCURRENT ROW: The current rown PRECEDING: n rows before currentn FOLLOWING: n rows after current
SELECT
name,
month,
revenue,
AVG(revenue) OVER (PARTITION BY name ORDER BY month
ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) as moving_avg_3m,
AVG(revenue) OVER (PARTITION BY name ORDER BY month
ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as centered_avg
FROM sales_data
ORDER BY name, month;
Analytic Functions
LAG and LEAD access data from previous or subsequent rows without self-joins. FIRST_VALUE and LAST_VALUE retrieve boundary values within the window.
window_ordered = Window.partitionBy("name").orderBy("month")
df_analytic = df.withColumn("prev_revenue", F.lag("revenue", 1).over(window_ordered)) \
.withColumn("next_revenue", F.lead("revenue", 1).over(window_ordered)) \
.withColumn("revenue_change", F.col("revenue") - F.lag("revenue", 1).over(window_ordered)) \
.withColumn("first_month_revenue", F.first("revenue").over(window_ordered)) \
.withColumn("growth_from_first",
((F.col("revenue") - F.col("first_month_revenue")) / F.col("first_month_revenue") * 100))
df_analytic.show()
SQL approach:
SELECT
name,
month,
revenue,
LAG(revenue, 1) OVER (PARTITION BY name ORDER BY month) as prev_revenue,
LEAD(revenue, 1) OVER (PARTITION BY name ORDER BY month) as next_revenue,
revenue - LAG(revenue, 1) OVER (PARTITION BY name ORDER BY month) as revenue_change,
FIRST_VALUE(revenue) OVER (PARTITION BY name ORDER BY month
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) as first_revenue
FROM sales_data
ORDER BY name, month;
Deduplication with Window Functions
A common pattern for removing duplicates while keeping specific records based on criteria:
# Add duplicate records for demonstration
duplicate_data = data + [
("Alice", "Sales", "2024-02", 5400), # Duplicate with different revenue
("Bob", "Sales", "2024-01", 4600)
]
df_dup = spark.createDataFrame(duplicate_data, ["name", "department", "month", "revenue"])
# Keep record with highest revenue for each person-month combination
window_dedup = Window.partitionBy("name", "month").orderBy(F.desc("revenue"))
df_deduped = df_dup.withColumn("rn", F.row_number().over(window_dedup)) \
.filter(F.col("rn") == 1) \
.drop("rn")
df_deduped.show()
Percentile and Distribution Functions
NTILE divides rows into buckets. PERCENT_RANK and CUME_DIST calculate relative positions.
window_all = Window.partitionBy("department").orderBy("revenue")
df_dist = df.withColumn("quartile", F.ntile(4).over(window_all)) \
.withColumn("percent_rank", F.percent_rank().over(window_all)) \
.withColumn("cume_dist", F.cume_dist().over(window_all))
df_dist.show()
SELECT
name,
department,
revenue,
NTILE(4) OVER (PARTITION BY department ORDER BY revenue) as quartile,
PERCENT_RANK() OVER (PARTITION BY department ORDER BY revenue) as percent_rank,
CUME_DIST() OVER (PARTITION BY department ORDER BY revenue) as cumulative_dist
FROM sales_data
ORDER BY department, revenue;
Performance Considerations
Window functions trigger wide transformations requiring shuffles. Optimize by:
Partitioning Strategy: Align window partitions with DataFrame partitioning to minimize shuffles.
# Repartition before window operations
df_optimized = df.repartition("department")
window_spec = Window.partitionBy("department").orderBy("month")
df_result = df_optimized.withColumn("running_total", F.sum("revenue").over(window_spec))
Reuse Window Specifications: Define once and reuse across multiple columns.
window = Window.partitionBy("name").orderBy("month")
df_efficient = df.withColumn("running_total", F.sum("revenue").over(window)) \
.withColumn("running_avg", F.avg("revenue").over(window)) \
.withColumn("row_num", F.row_number().over(window))
Limit Frame Size: Use bounded frames instead of unbounded when possible.
# More efficient for large datasets
window_bounded = Window.partitionBy("name").orderBy("month").rowsBetween(-6, 0)
df_bounded = df.withColumn("last_6m_avg", F.avg("revenue").over(window_bounded))
Window functions eliminate complex self-joins and multiple aggregation passes, replacing them with single-pass operations that express intent clearly. Master these patterns to write more maintainable and performant Spark SQL code.