PySpark - First and Last Value in Window

Window functions are one of PySpark's most powerful features for analytical queries. Unlike standard aggregations that collapse multiple rows into a single result, window functions compute values...

Key Insights

  • Window functions in PySpark let you access first and last values within ordered partitions without collapsing rows, essential for time-series analysis and tracking state changes across groups
  • The ignorenulls parameter in first() and last() determines whether null values are skipped or treated as valid results, fundamentally changing your output when dealing with incomplete data
  • Window frame specifications like rowsBetween() dramatically alter which values qualify as “first” or “last” by defining custom boundaries instead of using entire partitions

Introduction to Window Functions in PySpark

Window functions are one of PySpark’s most powerful features for analytical queries. Unlike standard aggregations that collapse multiple rows into a single result, window functions compute values across a set of rows while preserving the individual row structure. This makes them invaluable for ranking, running totals, and accessing relative row values.

The first() and last() functions retrieve the first or last value within a window partition. These functions become particularly useful when you need to track initial states, final outcomes, or compare current values against boundary conditions in your dataset.

Every window function requires a window specification that defines how to partition and order your data:

from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import first, last, col

spark = SparkSession.builder.appName("WindowExample").getOrCreate()

# Sample data
data = [
    ("user1", "2024-01-01", 100),
    ("user1", "2024-01-05", 150),
    ("user1", "2024-01-10", 200),
    ("user2", "2024-01-02", 50),
    ("user2", "2024-01-08", 75)
]

df = spark.createDataFrame(data, ["user_id", "date", "amount"])

# Define window specification
windowSpec = Window.partitionBy("user_id").orderBy("date")

# Apply first and last functions
result = df.withColumn("first_amount", first("amount").over(windowSpec)) \
           .withColumn("last_amount", last("amount").over(windowSpec))

result.show()

This creates separate windows for each user, ordered by date, allowing you to see both the first and last amounts alongside each transaction.

Understanding first() and last() Functions

The first() and last() functions have a deceptively simple syntax but important nuances. Both accept a column name and an optional ignorenulls parameter that defaults to False.

When ignorenulls=False, the functions return the literal first or last value in the window, even if it’s null. When ignorenulls=True, they skip null values and return the first or last non-null value. This distinction is critical when working with real-world data containing gaps.

from pyspark.sql.functions import first, last

# Data with nulls
data_with_nulls = [
    ("A", 1, None),
    ("A", 2, 100),
    ("A", 3, 200),
    ("A", 4, None),
    ("B", 1, 50),
    ("B", 2, None),
    ("B", 3, 75)
]

df_nulls = spark.createDataFrame(data_with_nulls, ["group", "seq", "value"])

windowSpec = Window.partitionBy("group").orderBy("seq")

# Compare null handling
result = df_nulls.withColumn("first_default", first("value").over(windowSpec)) \
                 .withColumn("first_ignore_null", first("value", ignorenulls=True).over(windowSpec)) \
                 .withColumn("last_default", last("value").over(windowSpec)) \
                 .withColumn("last_ignore_null", last("value", ignorenulls=True).over(windowSpec))

result.show()

For group “A”, first_default returns null (the actual first value), while first_ignore_null returns 100 (the first non-null value). Understanding this behavior prevents subtle bugs in production pipelines.

Practical Use Cases

Customer Purchase Analysis

Tracking customer behavior over time is a common analytics task. You often need to compare current behavior against initial or most recent activity:

# Customer purchase history
purchases = [
    ("C001", "2024-01-15", "Electronics", 500),
    ("C001", "2024-02-20", "Books", 30),
    ("C001", "2024-03-10", "Electronics", 800),
    ("C002", "2024-01-20", "Clothing", 100),
    ("C002", "2024-02-25", "Clothing", 150),
]

df_purchases = spark.createDataFrame(purchases, 
                                     ["customer_id", "purchase_date", "category", "amount"])

windowSpec = Window.partitionBy("customer_id").orderBy("purchase_date")

customer_analysis = df_purchases.withColumn("first_category", first("category").over(windowSpec)) \
                                .withColumn("first_amount", first("amount").over(windowSpec)) \
                                .withColumn("last_amount", last("amount").over(windowSpec)) \
                                .withColumn("amount_change", col("amount") - first("amount").over(windowSpec))

customer_analysis.show(truncate=False)

This analysis reveals whether customers increased spending over time and if they stayed within their initial product category.

Time-Series Stock Data

Financial analysis frequently requires comparing current prices against opening or closing values:

# Stock prices
stock_data = [
    ("AAPL", "2024-01-02", 180.50, 185.20),
    ("AAPL", "2024-01-03", 185.00, 183.75),
    ("AAPL", "2024-01-04", 183.80, 186.50),
    ("GOOGL", "2024-01-02", 140.20, 142.10),
    ("GOOGL", "2024-01-03", 142.00, 141.50),
]

df_stocks = spark.createDataFrame(stock_data, 
                                  ["ticker", "date", "open_price", "close_price"])

windowSpec = Window.partitionBy("ticker").orderBy("date")

stock_analysis = df_stocks.withColumn("period_open", first("open_price").over(windowSpec)) \
                          .withColumn("period_close", last("close_price").over(windowSpec)) \
                          .withColumn("period_return", 
                                    (last("close_price").over(windowSpec) - first("open_price").over(windowSpec)) 
                                    / first("open_price").over(windowSpec) * 100)

stock_analysis.show()

This calculates the total return across the period by comparing the final closing price against the initial opening price.

User Session Tracking

Understanding session boundaries helps analyze user engagement:

# User activity logs
sessions = [
    ("user1", "2024-01-01 09:00:00", "login"),
    ("user1", "2024-01-01 09:15:00", "view_page"),
    ("user1", "2024-01-01 09:30:00", "logout"),
    ("user1", "2024-01-01 14:00:00", "login"),
    ("user1", "2024-01-01 14:45:00", "logout"),
]

df_sessions = spark.createDataFrame(sessions, ["user_id", "timestamp", "action"])

windowSpec = Window.partitionBy("user_id").orderBy("timestamp")

session_analysis = df_sessions.withColumn("session_start", first("timestamp").over(windowSpec)) \
                              .withColumn("session_end", last("timestamp").over(windowSpec)) \
                              .withColumn("first_action", first("action").over(windowSpec)) \
                              .withColumn("last_action", last("action").over(windowSpec))

session_analysis.show(truncate=False)

Window Frame Specifications

By default, window functions use an unbounded frame from the partition’s start to the current row. You can customize this with rowsBetween() or rangeBetween() to create sliding windows or look-ahead scenarios.

from pyspark.sql.window import Window

# Sales data
sales = [
    ("Store1", 1, 100),
    ("Store1", 2, 150),
    ("Store1", 3, 120),
    ("Store1", 4, 180),
    ("Store1", 5, 200),
]

df_sales = spark.createDataFrame(sales, ["store", "week", "revenue"])

# Unbounded window (entire partition)
unbounded_window = Window.partitionBy("store").orderBy("week") \
                         .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

# Sliding window (current row and 2 preceding)
sliding_window = Window.partitionBy("store").orderBy("week") \
                       .rowsBetween(-2, Window.currentRow)

result = df_sales.withColumn("overall_first", first("revenue").over(unbounded_window)) \
                 .withColumn("overall_last", last("revenue").over(unbounded_window)) \
                 .withColumn("window_first", first("revenue").over(sliding_window)) \
                 .withColumn("window_last", last("revenue").over(sliding_window))

result.show()

The unbounded window always shows 100 and 200 as first and last values. The sliding window shows different values as the 3-row frame moves through the data. For week 3, the window includes weeks 1-3, so window_first is 100 and window_last is 120.

Performance Considerations and Best Practices

Window functions trigger shuffles to co-locate partition data, making partitioning strategy critical for performance. Choose partition keys that distribute data evenly and align with your analytical needs.

# Poor partitioning - skewed data
poor_window = Window.partitionBy("country").orderBy("timestamp")  # If one country dominates

# Better partitioning - more balanced
better_window = Window.partitionBy("country", "region").orderBy("timestamp")

# Consider data volume
# For small datasets within partitions, caching helps
df_cached = df.cache()
result = df_cached.withColumn("first_val", first("amount").over(windowSpec))

When you only need to compare adjacent rows, lag() and lead() are more efficient than first() and last():

from pyspark.sql.functions import lag

# If you just need the previous row value
efficient = df.withColumn("previous_amount", lag("amount", 1).over(windowSpec))

# Less efficient for this use case
inefficient = df.withColumn("previous_amount", 
                           first("amount").over(Window.partitionBy("user_id")
                                               .orderBy("date")
                                               .rowsBetween(-1, -1)))

Common Pitfalls and Troubleshooting

The most frequent mistake is incorrect ordering, which produces valid but meaningless results:

# WRONG - unordered window gives arbitrary results
wrong_window = Window.partitionBy("user_id")  # Missing orderBy!
df.withColumn("first_amount", first("amount").over(wrong_window))

# CORRECT - explicit ordering
correct_window = Window.partitionBy("user_id").orderBy("date")
df.withColumn("first_amount", first("amount").over(correct_window))

Another common issue is forgetting that default window frames extend only to the current row for last():

# This gives unexpected results - last() only sees up to current row
partial_window = Window.partitionBy("user_id").orderBy("date")
df.withColumn("last_amount", last("amount").over(partial_window))

# To get the actual last value in the partition, use unbounded following
full_window = Window.partitionBy("user_id").orderBy("date") \
                    .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn("last_amount", last("amount").over(full_window))

Always validate your window specifications with small test datasets before running on production data. Print the window frame boundaries and verify the first/last values match your expectations. Understanding these functions deeply will make your PySpark analytical queries significantly more powerful and maintainable.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.