PySpark - Cumulative Sum in DataFrame

Cumulative sum operations are fundamental to data analysis, appearing everywhere from financial running balances to time-series trend analysis and inventory tracking. While pandas handles cumulative...

Key Insights

  • PySpark window functions enable cumulative sum calculations on distributed DataFrames through Window.orderBy() and sum() operations, essential for running totals in large-scale data processing
  • Partitioning with Window.partitionBy() allows group-wise cumulative sums while maintaining proper ordering within each partition, critical for category-based analytics and time-series analysis
  • Performance depends heavily on proper partitioning strategy and understanding frame specifications—poor window definitions can trigger expensive shuffle operations across your cluster

Introduction

Cumulative sum operations are fundamental to data analysis, appearing everywhere from financial running balances to time-series trend analysis and inventory tracking. While pandas handles cumulative sums trivially with df['column'].cumsum(), PySpark requires a different approach using window functions to distribute computation across a cluster.

The shift from pandas to PySpark becomes necessary when your data exceeds single-machine memory or when processing time becomes prohibitive. A cumulative sum operation on a billion-row dataset isn’t feasible in pandas, but PySpark handles it by distributing the workload across multiple executors.

Here’s the conceptual difference:

# Pandas approach - works great for small data
import pandas as pd
df_pandas = pd.DataFrame({'value': [10, 20, 30, 40]})
df_pandas['cumulative'] = df_pandas['value'].cumsum()

# PySpark approach - designed for distributed processing
from pyspark.sql import Window
from pyspark.sql.functions import sum as _sum

df_spark = spark.createDataFrame([(10,), (20,), (30,), (40,)], ['value'])
window_spec = Window.orderBy('value').rowsBetween(Window.unboundedPreceding, Window.currentRow)
df_spark = df_spark.withColumn('cumulative', _sum('value').over(window_spec))

The PySpark version is more verbose, but it scales horizontally across your cluster.

Understanding Window Functions in PySpark

Window functions operate on a group of rows (a “window”) and return a single value for each row based on that window. Unlike groupBy() which collapses rows, window functions maintain the original row count while performing calculations across related rows.

The Window specification defines three key aspects:

  1. Partitioning: Which rows belong together (optional)
  2. Ordering: How rows are sorted within each partition (required for cumulative operations)
  3. Frame: Which subset of the partition to include in each calculation (optional, defaults to unbounded)

Here’s the basic setup:

from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.sql.functions import sum as _sum, col

spark = SparkSession.builder.appName("CumulativeSum").getOrCreate()

# Basic window specification - order by a column
window_spec = Window.orderBy("date")

# Window with explicit frame (from start to current row)
window_spec_explicit = Window.orderBy("date").rowsBetween(
    Window.unboundedPreceding, 
    Window.currentRow
)

The ordering is critical for cumulative sums because it determines the sequence in which values accumulate. Without proper ordering, your cumulative sum becomes meaningless.

Basic Cumulative Sum Implementation

Let’s build a practical example with daily sales data where we want a running total:

from pyspark.sql.functions import sum as _sum
from datetime import datetime, timedelta

# Create sample data
data = [
    (datetime(2024, 1, 1), 100),
    (datetime(2024, 1, 2), 150),
    (datetime(2024, 1, 3), 200),
    (datetime(2024, 1, 4), 120),
    (datetime(2024, 1, 5), 180)
]

df = spark.createDataFrame(data, ["date", "daily_sales"])

# Define window specification ordered by date
window_spec = Window.orderBy("date")

# Calculate cumulative sum
df_with_cumsum = df.withColumn(
    "cumulative_sales",
    _sum("daily_sales").over(window_spec)
)

df_with_cumsum.show()

Output:

+----------+------------+----------------+
|      date|daily_sales|cumulative_sales|
+----------+------------+----------------+
|2024-01-01|         100|             100|
|2024-01-02|         150|             250|
|2024-01-03|         200|             450|
|2024-01-04|         120|             570|
|2024-01-05|         180|             750|
+----------+------------+----------------+

Note that we import sum as _sum to avoid conflicts with Python’s built-in sum() function. The .over(window_spec) method applies the window specification to the aggregation function.

Partitioned Cumulative Sum

Real-world scenarios often require cumulative sums within groups—sales by region, transactions by account, or metrics by category. The partitionBy() method creates separate cumulative calculations for each group:

# Sales data with multiple regions
data_regions = [
    ("North", datetime(2024, 1, 1), 100),
    ("North", datetime(2024, 1, 2), 150),
    ("North", datetime(2024, 1, 3), 200),
    ("South", datetime(2024, 1, 1), 80),
    ("South", datetime(2024, 1, 2), 120),
    ("South", datetime(2024, 1, 3), 90),
]

df_regions = spark.createDataFrame(
    data_regions, 
    ["region", "date", "sales"]
)

# Window partitioned by region, ordered by date
window_by_region = Window.partitionBy("region").orderBy("date")

df_regions_cumsum = df_regions.withColumn(
    "cumulative_sales",
    _sum("sales").over(window_by_region)
)

df_regions_cumsum.orderBy("region", "date").show()

Output:

+------+----------+-----+----------------+
|region|      date|sales|cumulative_sales|
+------+----------+-----+----------------+
| North|2024-01-01|  100|             100|
| North|2024-01-02|  150|             250|
| North|2024-01-03|  200|             450|
| South|2024-01-01|   80|              80|
| South|2024-01-02|  120|             200|
| South|2024-01-03|   90|             290|
+------+----------+-----+----------------+

Each region maintains its own cumulative sum, resetting at partition boundaries. You can partition by multiple columns for finer granularity:

# Partition by region AND product category
window_multi_partition = Window.partitionBy("region", "category").orderBy("date")

df_multi = df_multi.withColumn(
    "cumulative_sales",
    _sum("sales").over(window_multi_partition)
)

Advanced Scenarios

Custom Window Frames

The default frame for cumulative sum is rowsBetween(Window.unboundedPreceding, Window.currentRow), but you can customize this for different calculations:

# Rolling 3-day sum (not cumulative, but related concept)
rolling_window = Window.orderBy("date").rowsBetween(-2, 0)

df_rolling = df.withColumn(
    "rolling_3day_sales",
    _sum("sales").over(rolling_window)
)

# Range-based window (time-based rather than row-based)
# Sum of sales within 7 days
range_window = Window.orderBy(col("date").cast("long")).rangeBetween(-7*86400, 0)

df_range = df.withColumn(
    "last_7days_sales",
    _sum("sales").over(range_window)
)

Handling Null Values

Null values in your data can disrupt cumulative sums. Handle them explicitly:

from pyspark.sql.functions import coalesce, lit

# Replace nulls with 0 before cumulative sum
df_clean = df.withColumn(
    "sales_clean",
    coalesce(col("sales"), lit(0))
)

window_spec = Window.orderBy("date")
df_clean = df_clean.withColumn(
    "cumulative_sales",
    _sum("sales_clean").over(window_spec)
)

Multiple Cumulative Columns

Calculate multiple cumulative metrics in a single pass:

from pyspark.sql.functions import avg as _avg, count as _count

window_spec = Window.partitionBy("region").orderBy("date")

df_multi_metrics = df.withColumn(
    "cumulative_sales", _sum("sales").over(window_spec)
).withColumn(
    "cumulative_avg", _avg("sales").over(window_spec)
).withColumn(
    "cumulative_count", _count("sales").over(window_spec)
)

Performance Considerations and Best Practices

Window functions can be expensive because they require data shuffling and sorting. Here are strategies to optimize performance:

1. Partition Wisely

Choose partition keys that distribute data evenly. Skewed partitions create bottlenecks:

# Good: balanced partitions
window_good = Window.partitionBy("region").orderBy("date")

# Problematic: if 90% of data is in one category
window_skewed = Window.partitionBy("rare_category").orderBy("date")

2. Cache Strategically

If you’re performing multiple window operations on the same DataFrame, cache it:

df_prepared = df.filter(col("date") >= "2024-01-01")
df_prepared.cache()

# Multiple window operations on cached data
window1 = Window.partitionBy("region").orderBy("date")
window2 = Window.partitionBy("product").orderBy("date")

df_result = df_prepared.withColumn(
    "cumsum_region", _sum("sales").over(window1)
).withColumn(
    "cumsum_product", _sum("sales").over(window2)
)

3. Limit Data Early

Filter before applying window functions to reduce shuffle volume:

# Filter first, then calculate cumulative sum
df_filtered = df.filter(col("date") >= "2024-01-01")
window_spec = Window.orderBy("date")
df_result = df_filtered.withColumn(
    "cumulative_sales", _sum("sales").over(window_spec)
)

4. Use Appropriate Frame Specifications

Explicitly define frames to avoid confusion and potential performance issues:

# Explicit is better than implicit
window_explicit = Window.orderBy("date").rowsBetween(
    Window.unboundedPreceding, 
    Window.currentRow
)

5. Monitor Partition Sizes

Check partition distribution to identify skew:

from pyspark.sql.functions import spark_partition_id

df.withColumn("partition_id", spark_partition_id()).groupBy("partition_id").count().show()

Cumulative sum operations are powerful tools in PySpark for large-scale data analysis. Master window functions, understand partitioning implications, and optimize your queries based on data characteristics. The verbosity compared to pandas pays off when you’re processing terabytes of data across a distributed cluster.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.