PySpark - Drop Duplicate Rows (dropDuplicates)
Duplicate records plague data pipelines. They inflate metrics, skew analytics, and waste storage. In distributed systems processing terabytes of data, duplicates emerge from multiple sources: retry...
Key Insights
- PySpark’s
dropDuplicates()method removes duplicate rows based on all columns by default, but accepts a subset parameter to target specific columns for deduplication logic - The method triggers a shuffle operation that can be expensive on large datasets—always specify relevant columns rather than checking all columns to minimize data movement
- For time-series data where you need the latest record, combine
dropDuplicates()withorderBy()and window functions to control which duplicate is retained
Understanding the Duplicate Data Problem
Duplicate records plague data pipelines. They inflate metrics, skew analytics, and waste storage. In distributed systems processing terabytes of data, duplicates emerge from multiple sources: retry logic in event streams, CDC (Change Data Capture) operations capturing the same row multiple times, or merging datasets from different systems with overlapping records.
PySpark’s dropDuplicates() method provides a straightforward solution, but using it effectively requires understanding its behavior, performance characteristics, and when alternative approaches make more sense.
Let’s start with a concrete example showing why duplicates matter:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
spark = SparkSession.builder.appName("dedup").getOrCreate()
# Sample data with obvious duplicates
data = [
("user_1", "login", 100),
("user_1", "login", 100), # Exact duplicate
("user_2", "purchase", 250),
("user_3", "logout", 50),
("user_2", "purchase", 250), # Another exact duplicate
]
schema = StructType([
StructField("user_id", StringType(), True),
StructField("event_type", StringType(), True),
StructField("value", IntegerType(), True)
])
df = spark.createDataFrame(data, schema)
df.show()
Output shows 5 rows, but only 3 are unique. If you’re calculating total revenue, those duplicates double-count transactions.
Basic dropDuplicates() Usage
The simplest form of dropDuplicates() takes no parameters and removes rows that are identical across all columns:
# Remove exact duplicates across all columns
df_deduped = df.dropDuplicates()
df_deduped.show()
print(f"Original count: {df.count()}")
print(f"After deduplication: {df_deduped.count()}")
This returns 3 rows—one for each unique combination of user_id, event_type, and value. The method is deterministic but doesn’t guarantee which duplicate row is kept. Don’t rely on row order for this decision.
The method signature is:
DataFrame.dropDuplicates(subset=None)
When subset is None, all columns participate in the uniqueness check. This is rarely what you want in production scenarios.
Column-Specific Deduplication
Real-world deduplication usually focuses on specific columns that define uniqueness. Consider user profiles where the same user might appear multiple times with different timestamps:
from pyspark.sql.functions import col
# User data with updates over time
user_data = [
("user_1", "john@example.com", "2024-01-01", "Premium"),
("user_1", "john@example.com", "2024-01-15", "Premium"),
("user_2", "jane@example.com", "2024-01-05", "Basic"),
("user_3", "bob@example.com", "2024-01-10", "Premium"),
("user_2", "jane@example.com", "2024-01-20", "Premium"), # Upgraded
]
user_schema = StructType([
StructField("user_id", StringType(), True),
StructField("email", StringType(), True),
StructField("update_date", StringType(), True),
StructField("tier", StringType(), True)
])
users_df = spark.createDataFrame(user_data, user_schema)
# Keep only one record per user_id
deduped_users = users_df.dropDuplicates(["user_id"])
deduped_users.show()
This keeps one row per user_id, but which one? Since we haven’t specified an order, it’s arbitrary. For user_2, you might get either “Basic” or “Premium” tier.
You can specify multiple columns in the subset:
# Deduplicate based on user_id AND email combination
deduped_by_both = users_df.dropDuplicates(["user_id", "email"])
This is useful when either column alone might have duplicates, but the combination should be unique.
Controlling Which Duplicate to Keep
To deterministically choose which duplicate row to retain, combine dropDuplicates() with orderBy():
from pyspark.sql.functions import desc
# Keep the most recent record for each user
latest_users = (users_df
.orderBy(desc("update_date"))
.dropDuplicates(["user_id"])
)
latest_users.show()
Now user_2 will reliably show “Premium” tier from the 2024-01-20 record. The orderBy() executes first, sorting the DataFrame, then dropDuplicates() keeps the first occurrence of each user_id—which is now the most recent due to sorting.
This pattern is critical for CDC pipelines and maintaining current state from event streams:
# Real-world example: product inventory updates
inventory_updates = [
("SKU-001", "2024-01-10 10:00:00", 100),
("SKU-001", "2024-01-10 14:30:00", 95),
("SKU-002", "2024-01-10 09:00:00", 50),
("SKU-001", "2024-01-10 16:45:00", 87),
("SKU-002", "2024-01-10 15:20:00", 45),
]
inventory_schema = StructType([
StructField("sku", StringType(), True),
StructField("timestamp", StringType(), True),
StructField("quantity", IntegerType(), True)
])
inventory_df = spark.createDataFrame(inventory_updates, inventory_schema)
# Get current inventory (latest quantity for each SKU)
current_inventory = (inventory_df
.orderBy(desc("timestamp"))
.dropDuplicates(["sku"])
)
current_inventory.show()
Performance Implications
dropDuplicates() triggers a shuffle operation—data is redistributed across the cluster to group identical records. This is expensive. Understanding the execution plan helps optimize:
# Check the execution plan
df.dropDuplicates(["user_id"]).explain()
You’ll see an Exchange operation (shuffle) in the plan. The cost scales with:
- Number of columns in subset: Fewer columns mean less data to hash and compare
- Cardinality: High cardinality columns (many unique values) create more shuffle partitions
- DataFrame size: Obvious, but shuffling 1TB is different from 1GB
Best practices:
# Bad: Checking all columns when only ID matters
large_df.dropDuplicates() # Shuffles entire row content
# Good: Specify minimal columns needed for uniqueness
large_df.dropDuplicates(["user_id"]) # Shuffles only user_id
# Better: Pre-filter before deduplication if possible
filtered = large_df.filter(col("date") >= "2024-01-01")
deduped = filtered.dropDuplicates(["user_id"])
For very large datasets, consider partitioning before deduplication:
# If data is naturally partitioned by date
partitioned = large_df.repartition("date")
deduped = partitioned.dropDuplicates(["user_id"])
This can reduce shuffle cost if duplicates tend to occur within the same partition.
Alternative Approaches
While dropDuplicates() is convenient, alternatives offer more control for complex scenarios.
Using distinct()
For complete row deduplication, distinct() is equivalent to dropDuplicates() without parameters:
# These are identical
df.dropDuplicates()
df.distinct()
Use distinct() when you want all columns to be unique—it’s more explicit about intent.
Window Functions for Advanced Control
When you need to keep duplicates based on complex logic, window functions provide flexibility:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
# Keep top 2 most recent records per user
window_spec = Window.partitionBy("user_id").orderBy(desc("update_date"))
ranked = users_df.withColumn("rank", row_number().over(window_spec))
top_two = ranked.filter(col("rank") <= 2).drop("rank")
top_two.show()
Compare this with dropDuplicates() for keeping only the latest:
# dropDuplicates approach - keeps 1 record
latest_only = (users_df
.orderBy(desc("update_date"))
.dropDuplicates(["user_id"])
)
# Window function approach - can keep N records
window_spec = Window.partitionBy("user_id").orderBy(desc("update_date"))
latest_only_window = (users_df
.withColumn("rank", row_number().over(window_spec))
.filter(col("rank") == 1)
.drop("rank")
)
Both produce the same result for keeping one record, but window functions let you:
- Keep top N duplicates
- Add ranking information
- Apply complex ordering logic (multiple columns, custom functions)
The tradeoff: window functions are typically more expensive than dropDuplicates() for simple cases.
Practical Patterns
Event Stream Deduplication
# Kafka events often have duplicate delivery
events = spark.readStream.format("kafka")...
deduped_events = (events
.withWatermark("event_time", "1 hour")
.dropDuplicates(["event_id"])
)
Incremental ETL
# Merge new data with existing, removing duplicates
new_data = spark.read.parquet("s3://incoming/2024-01-15/")
existing_data = spark.read.parquet("s3://processed/users/")
combined = existing_data.union(new_data)
final = (combined
.orderBy(desc("updated_at"))
.dropDuplicates(["user_id"])
)
final.write.mode("overwrite").parquet("s3://processed/users/")
Conclusion
dropDuplicates() is your primary tool for deduplication in PySpark, but use it deliberately. Always specify the subset parameter with the minimum columns needed to identify duplicates. Combine with orderBy() when you need deterministic selection of which duplicate to keep. For complex scenarios requiring top-N duplicates or conditional logic, reach for window functions instead.
The key is understanding that deduplication isn’t free—it shuffles data across your cluster. Design your deduplication strategy around your data’s natural partitioning and uniqueness constraints to minimize this cost.