Spark SQL - Window Functions Tutorial

Window functions perform calculations across a set of rows that are related to the current row. Unlike aggregate functions with GROUP BY that collapse multiple rows into one, window functions...

Key Insights

  • Window functions in Spark SQL enable complex analytical queries by computing values across row sets related to the current row, without collapsing rows like GROUP BY operations
  • The OVER clause defines the window specification through PARTITION BY for grouping, ORDER BY for sequencing, and frame specifications for defining the exact row range to consider
  • Ranking, aggregate, and analytic window functions solve real-world problems like running totals, moving averages, and row deduplication more efficiently than self-joins or multiple passes over data

Understanding Window Functions

Window functions perform calculations across a set of rows that are related to the current row. Unlike aggregate functions with GROUP BY that collapse multiple rows into one, window functions maintain all rows while adding computed columns based on the window specification.

The basic syntax structure:

function_name(expression) OVER (
  [PARTITION BY partition_expression]
  [ORDER BY sort_expression]
  [frame_specification]
)

Let’s create a sample dataset to work with throughout this tutorial:

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

spark = SparkSession.builder.appName("WindowFunctions").getOrCreate()

data = [
    ("Alice", "Sales", "2024-01", 5000),
    ("Alice", "Sales", "2024-02", 5500),
    ("Alice", "Sales", "2024-03", 6000),
    ("Bob", "Sales", "2024-01", 4500),
    ("Bob", "Sales", "2024-02", 4800),
    ("Bob", "Sales", "2024-03", 5200),
    ("Charlie", "Engineering", "2024-01", 7000),
    ("Charlie", "Engineering", "2024-02", 7200),
    ("Charlie", "Engineering", "2024-03", 7500),
    ("Diana", "Engineering", "2024-01", 6800),
    ("Diana", "Engineering", "2024-02", 7000),
    ("Diana", "Engineering", "2024-03", 7300)
]

df = spark.createDataFrame(data, ["name", "department", "month", "revenue"])
df.createOrReplaceTempView("sales_data")

Ranking Functions

Ranking functions assign ranks to rows within a partition. The three primary ranking functions are ROW_NUMBER, RANK, and DENSE_RANK.

# Using PySpark DataFrame API
window_spec = Window.partitionBy("department").orderBy(F.desc("revenue"))

df_ranked = df.withColumn("row_num", F.row_number().over(window_spec)) \
              .withColumn("rank", F.rank().over(window_spec)) \
              .withColumn("dense_rank", F.dense_rank().over(window_spec))

df_ranked.show()

Using SQL:

SELECT 
  name,
  department,
  month,
  revenue,
  ROW_NUMBER() OVER (PARTITION BY department ORDER BY revenue DESC) as row_num,
  RANK() OVER (PARTITION BY department ORDER BY revenue DESC) as rank,
  DENSE_RANK() OVER (PARTITION BY department ORDER BY revenue DESC) as dense_rank
FROM sales_data
ORDER BY department, revenue DESC;

The differences matter when handling ties:

  • ROW_NUMBER assigns unique sequential numbers (1, 2, 3, 4)
  • RANK creates gaps after ties (1, 2, 2, 4)
  • DENSE_RANK maintains sequential numbering despite ties (1, 2, 2, 3)

Aggregate Window Functions

Apply aggregate functions over a window without collapsing rows. Common functions include SUM, AVG, MIN, MAX, and COUNT.

# Running total and department averages
window_running = Window.partitionBy("name").orderBy("month").rowsBetween(Window.unboundedPreceding, Window.currentRow)
window_dept = Window.partitionBy("department")

df_agg = df.withColumn("running_total", F.sum("revenue").over(window_running)) \
           .withColumn("dept_avg", F.avg("revenue").over(window_dept)) \
           .withColumn("revenue_vs_avg", F.col("revenue") - F.col("dept_avg"))

df_agg.show()

SQL equivalent:

SELECT 
  name,
  department,
  month,
  revenue,
  SUM(revenue) OVER (PARTITION BY name ORDER BY month 
                     ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as running_total,
  AVG(revenue) OVER (PARTITION BY department) as dept_avg,
  revenue - AVG(revenue) OVER (PARTITION BY department) as revenue_vs_avg
FROM sales_data
ORDER BY name, month;

Frame Specifications

Frame specifications define which rows within the partition to include in the calculation. Two frame types exist: ROWS and RANGE.

# Moving average over 3-month window
window_moving = Window.partitionBy("name").orderBy("month").rowsBetween(-2, 0)

df_moving = df.withColumn("moving_avg_3m", F.avg("revenue").over(window_moving))

df_moving.show()

Frame boundaries:

  • UNBOUNDED PRECEDING: From partition start
  • UNBOUNDED FOLLOWING: To partition end
  • CURRENT ROW: The current row
  • n PRECEDING: n rows before current
  • n FOLLOWING: n rows after current
SELECT 
  name,
  month,
  revenue,
  AVG(revenue) OVER (PARTITION BY name ORDER BY month 
                     ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) as moving_avg_3m,
  AVG(revenue) OVER (PARTITION BY name ORDER BY month 
                     ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as centered_avg
FROM sales_data
ORDER BY name, month;

Analytic Functions

LAG and LEAD access data from previous or subsequent rows without self-joins. FIRST_VALUE and LAST_VALUE retrieve boundary values within the window.

window_ordered = Window.partitionBy("name").orderBy("month")

df_analytic = df.withColumn("prev_revenue", F.lag("revenue", 1).over(window_ordered)) \
                .withColumn("next_revenue", F.lead("revenue", 1).over(window_ordered)) \
                .withColumn("revenue_change", F.col("revenue") - F.lag("revenue", 1).over(window_ordered)) \
                .withColumn("first_month_revenue", F.first("revenue").over(window_ordered)) \
                .withColumn("growth_from_first", 
                           ((F.col("revenue") - F.col("first_month_revenue")) / F.col("first_month_revenue") * 100))

df_analytic.show()

SQL approach:

SELECT 
  name,
  month,
  revenue,
  LAG(revenue, 1) OVER (PARTITION BY name ORDER BY month) as prev_revenue,
  LEAD(revenue, 1) OVER (PARTITION BY name ORDER BY month) as next_revenue,
  revenue - LAG(revenue, 1) OVER (PARTITION BY name ORDER BY month) as revenue_change,
  FIRST_VALUE(revenue) OVER (PARTITION BY name ORDER BY month 
                             ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) as first_revenue
FROM sales_data
ORDER BY name, month;

Deduplication with Window Functions

A common pattern for removing duplicates while keeping specific records based on criteria:

# Add duplicate records for demonstration
duplicate_data = data + [
    ("Alice", "Sales", "2024-02", 5400),  # Duplicate with different revenue
    ("Bob", "Sales", "2024-01", 4600)
]

df_dup = spark.createDataFrame(duplicate_data, ["name", "department", "month", "revenue"])

# Keep record with highest revenue for each person-month combination
window_dedup = Window.partitionBy("name", "month").orderBy(F.desc("revenue"))

df_deduped = df_dup.withColumn("rn", F.row_number().over(window_dedup)) \
                   .filter(F.col("rn") == 1) \
                   .drop("rn")

df_deduped.show()

Percentile and Distribution Functions

NTILE divides rows into buckets. PERCENT_RANK and CUME_DIST calculate relative positions.

window_all = Window.partitionBy("department").orderBy("revenue")

df_dist = df.withColumn("quartile", F.ntile(4).over(window_all)) \
            .withColumn("percent_rank", F.percent_rank().over(window_all)) \
            .withColumn("cume_dist", F.cume_dist().over(window_all))

df_dist.show()
SELECT 
  name,
  department,
  revenue,
  NTILE(4) OVER (PARTITION BY department ORDER BY revenue) as quartile,
  PERCENT_RANK() OVER (PARTITION BY department ORDER BY revenue) as percent_rank,
  CUME_DIST() OVER (PARTITION BY department ORDER BY revenue) as cumulative_dist
FROM sales_data
ORDER BY department, revenue;

Performance Considerations

Window functions trigger wide transformations requiring shuffles. Optimize by:

Partitioning Strategy: Align window partitions with DataFrame partitioning to minimize shuffles.

# Repartition before window operations
df_optimized = df.repartition("department")

window_spec = Window.partitionBy("department").orderBy("month")
df_result = df_optimized.withColumn("running_total", F.sum("revenue").over(window_spec))

Reuse Window Specifications: Define once and reuse across multiple columns.

window = Window.partitionBy("name").orderBy("month")

df_efficient = df.withColumn("running_total", F.sum("revenue").over(window)) \
                 .withColumn("running_avg", F.avg("revenue").over(window)) \
                 .withColumn("row_num", F.row_number().over(window))

Limit Frame Size: Use bounded frames instead of unbounded when possible.

# More efficient for large datasets
window_bounded = Window.partitionBy("name").orderBy("month").rowsBetween(-6, 0)
df_bounded = df.withColumn("last_6m_avg", F.avg("revenue").over(window_bounded))

Window functions eliminate complex self-joins and multiple aggregation passes, replacing them with single-pass operations that express intent clearly. Master these patterns to write more maintainable and performant Spark SQL code.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.