PySpark - SQL ORDER BY with Examples
Sorting data is fundamental to analytics workflows, and PySpark provides multiple ways to order your data. The `ORDER BY` clause in PySpark SQL works similarly to traditional SQL databases, but with...
Key Insights
- PySpark’s
ORDER BYtriggers a full shuffle operation across the cluster, making it expensive for large datasets—usesortWithinPartitions()when global ordering isn’t required - NULL values sort first by default in ascending order, but you can explicitly control placement using
NULLS FIRSTorNULLS LASTclauses - The DataFrame API methods (
orderBy(),sort()) and SQL syntax are functionally identical, but mixing both approaches in the same codebase reduces readability
Introduction to ORDER BY in PySpark
Sorting data is fundamental to analytics workflows, and PySpark provides multiple ways to order your data. The ORDER BY clause in PySpark SQL works similarly to traditional SQL databases, but with important distributed computing implications you need to understand.
PySpark offers two primary approaches: SQL syntax using spark.sql() with ORDER BY clauses, and DataFrame API methods like orderBy() and sort(). These are functionally equivalent—sort() is simply an alias for orderBy(). The choice between SQL and DataFrame API is largely stylistic, though the DataFrame API offers better type safety and IDE support.
The critical difference from single-machine databases is that sorting in PySpark requires coordinating data across multiple nodes. An ORDER BY operation forces a global sort, shuffling all data across the cluster to ensure complete ordering. This makes it one of the most expensive operations in PySpark.
Let’s create a sample dataset to work with throughout this article:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, asc, desc, lower, when
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from datetime import date
spark = SparkSession.builder.appName("OrderByExamples").getOrCreate()
data = [
("Alice", 95000, "Engineering", date(2020, 3, 15)),
("Bob", 87000, "Marketing", date(2019, 7, 22)),
("Charlie", 105000, "Engineering", date(2018, 1, 10)),
("Diana", 92000, "Sales", date(2021, 5, 3)),
("Eve", 78000, "Marketing", date(2020, 11, 8)),
("Frank", None, "Sales", date(2022, 2, 14)),
("Grace", 98000, "Engineering", date(2019, 9, 30))
]
schema = StructType([
StructField("name", StringType(), True),
StructField("salary", IntegerType(), True),
StructField("department", StringType(), True),
StructField("hire_date", DateType(), True)
])
df = spark.createDataFrame(data, schema)
df.show()
Basic ORDER BY Syntax
The simplest sorting operation orders data by a single column. By default, sorting is ascending (ASC), but you can explicitly specify descending order with DESC.
Using SQL syntax:
# Ascending order (default)
spark.sql("""
SELECT * FROM {df}
ORDER BY salary
""", df=df).show()
# Explicit descending order
spark.sql("""
SELECT * FROM {df}
ORDER BY salary DESC
""", df=df).show()
The equivalent DataFrame API syntax:
# Ascending order
df.orderBy("salary").show()
df.orderBy(col("salary").asc()).show() # Explicit
# Descending order
df.orderBy(col("salary").desc()).show()
df.orderBy("salary", ascending=False).show() # Alternative syntax
I recommend the DataFrame API for most use cases. It provides better compile-time checking and integrates seamlessly with column expressions. Reserve SQL syntax for complex queries where SQL’s declarative nature improves readability.
Multi-Column Sorting
Real-world scenarios often require sorting by multiple columns with different priorities. PySpark evaluates sort columns left-to-right, with the first column having the highest priority.
# SQL: Sort by department ascending, then salary descending
spark.sql("""
SELECT * FROM {df}
ORDER BY department ASC, salary DESC
""", df=df).show()
# DataFrame API equivalent
df.orderBy(col("department").asc(), col("salary").desc()).show()
# Alternative DataFrame syntax
df.orderBy("department", col("salary").desc()).show()
For three-level sorting, the pattern extends naturally:
# Sort by department, then hire_date, then name
df.orderBy(
col("department").asc(),
col("hire_date").desc(),
col("name").asc()
).show()
The order matters significantly. Sorting by (department, salary) produces different results than (salary, department). Always list your primary sort key first.
Sorting with NULL Values
NULL handling in sorting operations often catches developers off guard. By default, PySpark sorts NULL values first in ascending order and last in descending order. This follows Spark SQL’s behavior, which differs from some traditional databases.
# Default behavior: NULLs appear first in ASC
df.orderBy(col("salary").asc()).show()
# Default behavior: NULLs appear last in DESC
df.orderBy(col("salary").desc()).show()
You can explicitly control NULL positioning:
# Force NULLs to the end in ascending sort
spark.sql("""
SELECT * FROM {df}
ORDER BY salary ASC NULLS LAST
""", df=df).show()
# Force NULLs to the beginning in descending sort
spark.sql("""
SELECT * FROM {df}
ORDER BY salary DESC NULLS FIRST
""", df=df).show()
Unfortunately, the DataFrame API doesn’t have direct NULLS FIRST/LAST support. You need to use SQL or implement a workaround with additional columns:
# Workaround: Create a helper column for NULL ordering
df.withColumn("salary_null_flag", col("salary").isNull()) \
.orderBy("salary_null_flag", col("salary").desc()) \
.drop("salary_null_flag") \
.show()
This is one area where SQL syntax is genuinely superior. If NULL positioning matters to your use case, use spark.sql().
Advanced Sorting Techniques
Sorting isn’t limited to raw column values. You can sort by expressions, transformations, and conditional logic.
Sorting by calculated values:
# Sort by salary with hypothetical 10% bonus
df.orderBy((col("salary") * 1.10).desc()).show()
# Sort by years since hire (most recent first)
from pyspark.sql.functions import datediff, current_date
df.orderBy(
datediff(current_date(), col("hire_date")).asc()
).show()
Case-insensitive string sorting:
# Case-sensitive (default) - uppercase sorts before lowercase
df.orderBy("name").show()
# Case-insensitive sorting
df.orderBy(lower(col("name"))).show()
Sorting with conditional logic:
# Sort: Engineering first, then others by department name
df.orderBy(
when(col("department") == "Engineering", 0).otherwise(1),
col("department")
).show()
This technique is powerful for custom sort orders that don’t follow alphabetical or numerical patterns.
Performance Considerations
Sorting is expensive in distributed systems. Every ORDER BY triggers a full shuffle operation, moving data across network boundaries to ensure global ordering. For large datasets, this becomes a bottleneck.
When you don’t need global ordering—for example, when sorting within groups for window functions—use sortWithinPartitions():
# Global sort: expensive, guarantees total order
df.orderBy("department", "salary").show()
# Partition-local sort: faster, no global order guarantee
df.sortWithinPartitions("department", "salary").show()
The difference becomes dramatic with large datasets:
# Compare execution plans
df.orderBy("salary").explain()
# Shows: Exchange rangepartitioning (full shuffle)
df.sortWithinPartitions("salary").explain()
# Shows: Sort (no shuffle)
Best practices for sorting performance:
- Avoid unnecessary global sorts: Use
sortWithinPartitions()when possible - Sort after filtering: Reduce data volume before sorting
- Limit results: Combine with
limit()when you only need top N records - Partition wisely: Ensure reasonable partition sizes (100MB-200MB target)
# Inefficient: sort then filter
df.orderBy("salary").filter(col("salary") > 90000).show()
# Efficient: filter then sort
df.filter(col("salary") > 90000).orderBy("salary").show()
# Even better: limit results
df.orderBy(col("salary").desc()).limit(10).show()
Common Pitfalls and Best Practices
Memory issues: Sorting requires materializing data in memory. If a single partition’s sorted data exceeds executor memory, you’ll see OOM errors. Increase partition count or executor memory.
Skewed data: If one partition is significantly larger than others, that partition becomes a bottleneck. Consider salting or custom partitioning strategies.
Unnecessary precision: Don’t use global ORDER BY when your downstream operations don’t require it. Many aggregations and window functions work fine with partition-local ordering.
Here’s a complete real-world example combining multiple techniques:
# Real-world scenario: Department salary report
# Requirements:
# - Group by department
# - Show top 3 earners per department
# - Handle NULLs appropriately
# - Sort departments alphabetically
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, coalesce, lit
# Create window spec for ranking within departments
window_spec = Window.partitionBy("department").orderBy(col("salary").desc())
result = df \
.withColumn("salary_clean", coalesce(col("salary"), lit(0))) \
.withColumn("rank", row_number().over(window_spec)) \
.filter(col("rank") <= 3) \
.orderBy(col("department").asc(), col("rank").asc()) \
.select("department", "name", "salary", "rank")
result.show()
The key takeaway: understand the cost of sorting in distributed systems. Use the right tool for your specific ordering requirements, and always consider whether you truly need global ordering or if partition-local sorting suffices. Your cluster will thank you.