PySpark - Unpivot DataFrame (Columns to Rows)
Unpivoting transforms wide-format data into long-format data by converting column headers into row values. This operation is the inverse of pivoting and is fundamental when preparing data for...
Key Insights
- PySpark offers three main approaches for unpivoting DataFrames:
stack()for basic transformations,melt()(PySpark 3.4+) for pandas-like syntax with better readability, and SQL expressions for dynamic column selection - Unpivoting converts wide-format data (metrics spread across columns) into long-format data (metrics stacked as rows), essential for analytics tools and normalized data models
- The
melt()function provides the most intuitive API with explicit control over ID columns, variable names, and value names, making it the preferred choice for most use cases
Introduction & Use Case
Unpivoting transforms wide-format data into long-format data by converting column headers into row values. This operation is the inverse of pivoting and is fundamental when preparing data for analytics, visualization tools, or normalized database schemas.
Consider sales data where each month is a separate column. Analytics tools and reporting frameworks typically expect this data in long format—one row per month per product. Here’s what this transformation looks like:
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr
spark = SparkSession.builder.appName("UnpivotExample").getOrCreate()
# Wide format (before unpivoting)
wide_df = spark.createDataFrame([
("Product_A", 100, 150, 200),
("Product_B", 80, 120, 160)
], ["product", "jan", "feb", "mar"])
wide_df.show()
# +----------+---+---+---+
# | product|jan|feb|mar|
# +----------+---+---+---+
# | Product_A|100|150|200|
# | Product_B| 80|120|160|
# +----------+---+---+---+
# Long format (after unpivoting) - we'll implement this next
# +----------+-----+-----+
# | product|month|sales|
# +----------+-----+-----+
# | Product_A| jan| 100|
# | Product_A| feb| 150|
# | Product_A| mar| 200|
# | Product_B| jan| 80|
# | Product_B| feb| 120|
# | Product_B| mar| 160|
# +----------+-----+-----+
Understanding the Problem
Wide-format data is common in spreadsheets and reporting outputs where humans prefer seeing related metrics side-by-side. However, this format creates problems for data processing:
- Analytics incompatibility: Most BI tools and SQL queries work better with normalized, long-format data
- Scalability issues: Adding new time periods requires schema changes
- Aggregation complexity: Calculating metrics across time periods becomes cumbersome
The unpivot operation solves these issues by restructuring data into a format where each observation occupies a single row. Let’s create a realistic example:
# Sales data with quarterly results as columns
sales_df = spark.createDataFrame([
("CUST001", "North", 15000, 18000, 21000, 19000),
("CUST002", "South", 12000, 14000, 16000, 15000),
("CUST003", "East", 20000, 22000, 25000, 23000)
], ["customer_id", "region", "Q1_2024", "Q2_2024", "Q3_2024", "Q4_2024"])
sales_df.show()
# +-----------+------+-------+-------+-------+-------+
# |customer_id|region|Q1_2024|Q2_2024|Q3_2024|Q4_2024|
# +-----------+------+-------+-------+-------+-------+
# | CUST001| North| 15000| 18000| 21000| 19000|
# | CUST002| South| 12000| 14000| 16000| 15000|
# | CUST003| East| 20000| 22000| 25000| 23000|
# +-----------+------+-------+-------+-------+-------+
Method 1: Using stack() Function
The stack() function is PySpark’s traditional approach to unpivoting. It works by specifying the number of column pairs to stack and explicitly listing each column name and its corresponding value.
from pyspark.sql.functions import expr, col
# Using stack() to unpivot quarterly columns
unpivoted_stack = sales_df.select(
"customer_id",
"region",
expr("stack(4, 'Q1_2024', Q1_2024, 'Q2_2024', Q2_2024, 'Q3_2024', Q3_2024, 'Q4_2024', Q4_2024) as (quarter, sales)")
)
unpivoted_stack.show()
# +-----------+------+-------+-----+
# |customer_id|region|quarter|sales|
# +-----------+------+-------+-----+
# | CUST001| North|Q1_2024|15000|
# | CUST001| North|Q2_2024|18000|
# | CUST001| North|Q3_2024|21000|
# | CUST001| North|Q4_2024|19000|
# | CUST002| South|Q1_2024|12000|
# | CUST002| South|Q2_2024|14000|
# | CUST002| South|Q3_2024|16000|
# | CUST002| South|Q4_2024|15000|
# | CUST003| East|Q1_2024|20000|
# | CUST003| East|Q2_2024|22000|
# | CUST003| East|Q3_2024|25000|
# | CUST003| East|Q4_2024|23000|
# +-----------+------+-------+-----+
The stack() function takes pairs of arguments: the column name as a string literal and the column reference. The first argument specifies how many column pairs to stack. While functional, this approach is verbose and error-prone for many columns.
Method 2: Using melt() Function (PySpark 3.4+)
PySpark 3.4 introduced the melt() function, providing a pandas-like API that’s significantly more intuitive. This is now the recommended approach for unpivoting operations.
# Using melt() - cleaner and more readable
unpivoted_melt = sales_df.melt(
ids=["customer_id", "region"],
values=["Q1_2024", "Q2_2024", "Q3_2024", "Q4_2024"],
variableColumnName="quarter",
valueColumnName="sales"
)
unpivoted_melt.show()
# +-----------+------+-------+-----+
# |customer_id|region|quarter|sales|
# +-----------+------+-------+-----+
# | CUST001| North|Q1_2024|15000|
# | CUST001| North|Q2_2024|18000|
# | CUST001| North|Q3_2024|21000|
# | CUST001| North|Q4_2024|19000|
# | CUST002| South|Q1_2024|12000|
# | CUST002| South|Q2_2024|14000|
# | CUST002| South|Q3_2024|16000|
# | CUST002| South|Q4_2024|15000|
# | CUST003| East|Q1_2024|20000|
# | CUST003| East|Q2_2024|22000|
# | CUST003| East|Q3_2024|25000|
# | CUST003| East|Q4_2024|23000|
# +-----------+------+-------+-----+
The melt() parameters are straightforward:
- ids: Columns to preserve as identifiers (not unpivoted)
- values: Columns to unpivot
- variableColumnName: Name for the new column containing former column names
- valueColumnName: Name for the new column containing the values
Method 3: Using SQL Expressions with expr() and stack()
When dealing with many columns or columns following naming patterns, dynamically generating the stack() expression is more maintainable than hardcoding column names.
# Dynamic unpivoting using list comprehension
columns_to_unpivot = ["Q1_2024", "Q2_2024", "Q3_2024", "Q4_2024"]
id_columns = ["customer_id", "region"]
# Build the stack expression dynamically
stack_expr = f"stack({len(columns_to_unpivot)}, "
stack_expr += ", ".join([f"'{col}', `{col}`" for col in columns_to_unpivot])
stack_expr += ") as (quarter, sales)"
unpivoted_dynamic = sales_df.select(
*id_columns,
expr(stack_expr)
)
unpivoted_dynamic.show()
This approach shines when you need to select columns programmatically:
# Unpivot all columns matching a pattern
all_columns = sales_df.columns
quarter_columns = [col for col in all_columns if col.startswith("Q")]
id_cols = [col for col in all_columns if col not in quarter_columns]
stack_expr = f"stack({len(quarter_columns)}, "
stack_expr += ", ".join([f"'{col}', `{col}`" for col in quarter_columns])
stack_expr += ") as (quarter, sales)"
result = sales_df.select(*id_cols, expr(stack_expr))
Handling Complex Scenarios
Real-world unpivoting often involves edge cases that require careful handling.
Multiple value columns: Unpivot multiple metrics simultaneously:
# DataFrame with both sales and costs per quarter
complex_df = spark.createDataFrame([
("CUST001", 15000, 12000, 18000, 14000),
("CUST002", 20000, 16000, 22000, 17000)
], ["customer_id", "Q1_sales", "Q1_cost", "Q2_sales", "Q2_cost"])
# Unpivot both metrics
unpivoted_complex = complex_df.select(
"customer_id",
expr("""
stack(2,
'Q1', Q1_sales, Q1_cost,
'Q2', Q2_sales, Q2_cost
) as (quarter, sales, cost)
""")
)
unpivoted_complex.show()
# +-----------+-------+-----+-----+
# |customer_id|quarter|sales| cost|
# +-----------+-------+-----+-----+
# | CUST001| Q1|15000|12000|
# | CUST001| Q2|18000|14000|
# | CUST002| Q1|20000|16000|
# | CUST002| Q2|22000|17000|
# +-----------+-------+-----+-----+
Handling nulls: Unpivot operations preserve null values, but you can filter them:
# DataFrame with missing values
df_with_nulls = spark.createDataFrame([
("CUST001", 15000, None, 21000),
("CUST002", None, 14000, 16000)
], ["customer_id", "Q1", "Q2", "Q3"])
unpivoted_nulls = df_with_nulls.melt(
ids=["customer_id"],
values=["Q1", "Q2", "Q3"],
variableColumnName="quarter",
valueColumnName="sales"
).filter(col("sales").isNotNull())
unpivoted_nulls.show()
Performance Considerations & Best Practices
Choose your unpivoting method based on these criteria:
Use melt() when:
- Running PySpark 3.4 or later
- Code readability is a priority
- Working with straightforward unpivot scenarios
Use dynamic stack() when:
- Dealing with many columns following patterns
- Need to programmatically determine columns to unpivot
- Working with older PySpark versions
Performance tips:
- Partition before unpivoting: If your wide dataset is large, partition it appropriately before unpivoting to distribute the workload.
# Repartition before unpivoting large datasets
large_df = sales_df.repartition(10, "region")
result = large_df.melt(ids=["customer_id", "region"], values=quarter_columns)
-
Filter early: Apply filters before unpivoting to reduce data volume.
-
Cache strategically: If you’re performing multiple operations on the unpivoted result, cache it:
unpivoted = sales_df.melt(ids=["customer_id"], values=quarter_columns)
unpivoted.cache()
# Perform multiple operations
- Avoid repeated unpivots: Unpivoting is computationally expensive. Design your pipeline to unpivot once and reuse the result.
The choice between methods ultimately depends on your PySpark version and use case complexity. For modern PySpark deployments, melt() offers the best balance of readability and functionality. For dynamic scenarios or legacy systems, the SQL expression approach with stack() provides necessary flexibility. Understanding all three methods ensures you can handle any unpivoting requirement efficiently.