PySpark - Moving Average with Window Function
Moving averages smooth out short-term fluctuations in time series data, revealing underlying trends and patterns. Whether you're analyzing stock prices, website traffic, IoT sensor readings, or sales...
Key Insights
- PySpark’s Window functions enable efficient moving average calculations on distributed datasets by defining frame boundaries with
rowsBetween()orrangeBetween(), avoiding expensive self-joins or iterative operations. - You can compute multiple moving averages (7-day, 30-day, 90-day) in a single pass over your data by defining different window specifications, significantly improving performance compared to sequential calculations.
- Row-based windows (
rowsBetween) count physical rows regardless of values, while range-based windows (rangeBetween) consider actual value ranges—choose row-based for time series with consistent intervals and range-based for irregular data.
Introduction to Moving Averages in Time Series Analysis
Moving averages smooth out short-term fluctuations in time series data, revealing underlying trends and patterns. Whether you’re analyzing stock prices, website traffic, IoT sensor readings, or sales metrics, moving averages help identify momentum shifts, detect anomalies, and generate trading signals.
For small datasets, pandas handles moving averages beautifully with .rolling(). But when you’re processing terabytes of financial tick data, billions of IoT measurements, or years of e-commerce transactions, PySpark becomes essential. PySpark distributes computations across clusters, processing data in parallel while maintaining the ordered nature required for time series calculations through Window functions.
The key advantage: PySpark Window functions let you perform complex analytical operations without shuffling massive datasets or writing complicated self-joins. You define your window specification once, and Spark optimizes the execution plan.
PySpark Window Functions Basics
Window functions operate on a subset of rows related to the current row. Unlike group-by operations that collapse rows, window functions preserve individual rows while computing aggregate values across defined windows.
The Window class from pyspark.sql.window provides the specification. You define three critical aspects:
- Partitioning: Divides data into independent groups (like GROUP BY)
- Ordering: Establishes sequence within partitions (essential for time series)
- Frame specification: Defines which rows to include in calculations
Here’s the basic syntax:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import avg, col
spark = SparkSession.builder.appName("MovingAverage").getOrCreate()
# Define a window partitioned by product, ordered by date
window_spec = Window.partitionBy("product_id").orderBy("date")
# Or with a specific frame
window_spec_with_frame = (
Window.partitionBy("product_id")
.orderBy("date")
.rowsBetween(-6, 0) # Current row and 6 preceding rows
)
The partitionBy() ensures calculations stay within logical groups (like per stock symbol or per customer), while orderBy() establishes the sequence for window frame calculations.
Implementing Simple Moving Average (SMA)
A simple moving average calculates the mean of values within a sliding window. The rowsBetween() method defines frame boundaries relative to the current row.
Let’s calculate a 3-day moving average on stock prices:
from pyspark.sql.functions import avg, col
from pyspark.sql.window import Window
# Sample stock data
data = [
("AAPL", "2024-01-01", 150.0),
("AAPL", "2024-01-02", 152.0),
("AAPL", "2024-01-03", 151.0),
("AAPL", "2024-01-04", 153.0),
("AAPL", "2024-01-05", 155.0),
("AAPL", "2024-01-06", 154.0),
("AAPL", "2024-01-07", 156.0),
]
df = spark.createDataFrame(data, ["symbol", "date", "close_price"])
# Define 3-day moving average window
window_3day = (
Window.partitionBy("symbol")
.orderBy("date")
.rowsBetween(-2, 0) # Current row + 2 preceding = 3 days
)
# Calculate moving average
df_with_ma = df.withColumn("ma_3day", avg("close_price").over(window_3day))
df_with_ma.show()
Output shows the 3-day average, with the first two rows having averages based on fewer data points:
+------+----------+-----------+------------------+
|symbol| date|close_price| ma_3day|
+------+----------+-----------+------------------+
| AAPL|2024-01-01| 150.0| 150.0|
| AAPL|2024-01-02| 152.0| 151.0|
| AAPL|2024-01-03| 151.0|151.00000000000003|
| AAPL|2024-01-04| 153.0|152.00000000000003|
| AAPL|2024-01-05| 155.0|153.00000000000003|
+------+----------+-----------+------------------+
For a 7-day rolling average on sales data:
# Sales data with multiple products
sales_data = [
("P001", "2024-01-01", 1000),
("P001", "2024-01-02", 1200),
("P001", "2024-01-03", 900),
# ... more data
]
sales_df = spark.createDataFrame(sales_data, ["product_id", "date", "revenue"])
window_7day = (
Window.partitionBy("product_id")
.orderBy("date")
.rowsBetween(-6, 0)
)
sales_with_ma = sales_df.withColumn(
"revenue_ma_7day",
avg("revenue").over(window_7day)
)
Advanced Window Configurations
Different analytical needs require different window frame types. PySpark offers flexible configurations beyond basic moving averages.
Weighted Moving Average: Apply more weight to recent values using custom calculations:
from pyspark.sql.functions import sum as spark_sum, lit
# Create weights for 5-day weighted MA (more weight on recent days)
window_5day = Window.partitionBy("symbol").orderBy("date").rowsBetween(-4, 0)
df_weighted = (
df.withColumn("weight_1", col("close_price") * lit(1))
.withColumn("weight_2", col("close_price") * lit(2))
# In practice, you'd use more sophisticated weighting
# This is simplified for demonstration
)
Expanding Window (Cumulative Average): Calculate average from the start of the partition to the current row:
window_expanding = (
Window.partitionBy("symbol")
.orderBy("date")
.rowsBetween(Window.unboundedPreceding, 0)
)
df_cumulative = df.withColumn(
"cumulative_avg",
avg("close_price").over(window_expanding)
)
Centered Moving Average: Include rows before and after the current row for smoother trends:
# 5-day centered MA: 2 before, current, 2 after
window_centered = (
Window.partitionBy("symbol")
.orderBy("date")
.rowsBetween(-2, 2)
)
df_centered = df.withColumn(
"centered_ma_5day",
avg("close_price").over(window_centered)
)
Range-based Windows: Use actual value ranges instead of row counts for irregular time series:
# Window covering 7 days of actual time
window_range = (
Window.partitionBy("symbol")
.orderBy(col("date").cast("long")) # Convert to timestamp
.rangeBetween(-7*24*60*60, 0) # 7 days in seconds
)
Multiple Moving Averages and Performance Optimization
Real-world analysis often requires multiple moving averages simultaneously. Computing them in one pass is far more efficient than multiple DataFrame transformations.
from pyspark.sql.functions import avg, coalesce, lit
# Define multiple windows
window_7day = Window.partitionBy("symbol").orderBy("date").rowsBetween(-6, 0)
window_30day = Window.partitionBy("symbol").orderBy("date").rowsBetween(-29, 0)
window_90day = Window.partitionBy("symbol").orderBy("date").rowsBetween(-89, 0)
# Calculate all moving averages in one transformation
df_multi_ma = (
df.withColumn("ma_7day", avg("close_price").over(window_7day))
.withColumn("ma_30day", avg("close_price").over(window_30day))
.withColumn("ma_90day", avg("close_price").over(window_90day))
)
df_multi_ma.show()
Handling Null Values and Edge Cases: Use coalesce() to handle missing data gracefully:
from pyspark.sql.functions import when, count
# Count non-null values in window
window_spec = Window.partitionBy("symbol").orderBy("date").rowsBetween(-6, 0)
df_robust = (
df.withColumn("ma_7day", avg("close_price").over(window_spec))
.withColumn("data_points", count("close_price").over(window_spec))
.withColumn(
"ma_7day_adjusted",
when(col("data_points") >= 5, col("ma_7day"))
.otherwise(lit(None)) # Only show MA if sufficient data
)
)
Performance Optimization: Partition your data strategically before window operations:
# Repartition by partition key before window operations
df_optimized = (
df.repartition("symbol") # Reduces shuffle during window calculation
.withColumn("ma_7day", avg("close_price").over(window_7day))
)
# For very large datasets, consider persisting intermediate results
df.persist()
Real-World Use Case: Financial Data Analysis
Let’s build a complete pipeline analyzing stock data with a moving average crossover strategy—a common technique where short-term MA crossing above long-term MA signals a potential buy.
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, col, when, lag
from pyspark.sql.window import Window
spark = SparkSession.builder.appName("StockAnalysis").getOrCreate()
# Load stock data (assume CSV with symbol, date, close columns)
stock_df = spark.read.csv("stock_prices.csv", header=True, inferSchema=True)
# Define windows for short-term (50-day) and long-term (200-day) MAs
window_50 = Window.partitionBy("symbol").orderBy("date").rowsBetween(-49, 0)
window_200 = Window.partitionBy("symbol").orderBy("date").rowsBetween(-199, 0)
# Calculate moving averages
df_with_mas = (
stock_df
.withColumn("ma_50", avg("close").over(window_50))
.withColumn("ma_200", avg("close").over(window_200))
)
# Detect crossover signals
window_for_lag = Window.partitionBy("symbol").orderBy("date")
df_with_signals = (
df_with_mas
.withColumn("ma_50_prev", lag("ma_50", 1).over(window_for_lag))
.withColumn("ma_200_prev", lag("ma_200", 1).over(window_for_lag))
.withColumn(
"signal",
when(
(col("ma_50") > col("ma_200")) &
(col("ma_50_prev") <= col("ma_200_prev")),
"BUY"
)
.when(
(col("ma_50") < col("ma_200")) &
(col("ma_50_prev") >= col("ma_200_prev")),
"SELL"
)
.otherwise("HOLD")
)
)
# Filter for actionable signals
signals = df_with_signals.filter(col("signal").isin(["BUY", "SELL"]))
signals.select("symbol", "date", "close", "ma_50", "ma_200", "signal").show()
# Save results
df_with_signals.write.parquet("stock_analysis_results.parquet", mode="overwrite")
This pipeline processes potentially millions of stock records across thousands of symbols, calculating moving averages and generating trading signals in a distributed, scalable manner.
Conclusion & Best Practices
PySpark Window functions provide powerful, efficient tools for calculating moving averages on large-scale time series data. Key takeaways:
When to use PySpark over pandas: Choose PySpark when your data exceeds single-machine memory (typically >10GB), when you need distributed processing, or when integrating with existing Spark pipelines. For exploratory analysis on smaller datasets, pandas’ .rolling() is simpler and faster.
Performance considerations: Always partition by your grouping key before window operations. Use rowsBetween() for consistent time intervals and rangeBetween() for irregular data. Persist DataFrames if you’re computing multiple window operations. Limit partition sizes to avoid skew—if one symbol has 10x more data than others, consider bucketing strategies.
Handle edge cases explicitly: Decide how to treat the beginning of your time series where insufficient data exists for the full window. Use conditional logic to filter or flag these rows based on your analytical requirements.
Combine operations efficiently: Calculate multiple moving averages in one pass rather than chaining transformations. Use cache() or persist() judiciously when reusing intermediate results.
Moving averages are just the beginning—PySpark Window functions support ranking, lead/lag operations, and custom aggregations. Master these fundamentals, and you’ll handle sophisticated time series analysis at any scale.