PySpark - Filter Rows by Column Value
Filtering rows is one of the most fundamental operations in any data processing workflow. In PySpark, you'll spend a significant portion of your time selecting subsets of data based on specific...
Key Insights
- PySpark’s
filter()andwhere()methods are interchangeable aliases—use whichever feels more natural, butfilter()aligns better with functional programming conventions whilewhere()matches SQL familiarity. - Always use bitwise operators (
&,|,~) instead of Python’s logical operators (and,or,not) when combining conditions, and wrap each condition in parentheses to avoid operator precedence issues. - Filter operations in PySpark support predicate pushdown, meaning filters applied early in your transformation chain can dramatically improve performance by reducing data movement across the cluster.
Introduction
Filtering rows is one of the most fundamental operations in any data processing workflow. In PySpark, you’ll spend a significant portion of your time selecting subsets of data based on specific conditions. Whether you’re cleaning data, performing exploratory analysis, or preparing datasets for machine learning, understanding how to efficiently filter DataFrames is essential.
PySpark provides two methods for filtering: filter() and where(). These are complete aliases of each other—there’s zero functional difference. The choice is purely stylistic. Use filter() if you prefer functional programming conventions, or where() if you come from a SQL background and want your code to read more like a SQL query.
Let’s create a sample dataset to work with throughout this article:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
spark = SparkSession.builder.appName("FilterExample").getOrCreate()
data = [
("John", "Engineering", 75000, 28, "john@example.com"),
("Sarah", "Marketing", 65000, 32, "sarah@example.com"),
("Mike", "Engineering", 82000, 35, None),
("Emily", "Sales", 58000, 26, "emily@example.com"),
("David", "Engineering", 95000, 42, "david@example.com"),
("Lisa", "Marketing", 71000, 29, "lisa@example.com"),
("Tom", "Sales", 62000, 31, None),
("Anna", "HR", 54000, 27, "anna@example.com")
]
schema = StructType([
StructField("name", StringType(), True),
StructField("department", StringType(), True),
StructField("salary", IntegerType(), True),
StructField("age", IntegerType(), True),
StructField("email", StringType(), True)
])
df = spark.createDataFrame(data, schema)
df.show()
Basic Filtering with Single Conditions
The simplest filtering operation involves checking a single column against a specific value. PySpark supports all standard comparison operators: ==, !=, >, <, >=, and <=.
Here’s how to filter for employees in the Engineering department:
# Filter for Engineering department
engineering_df = df.filter(col("department") == "Engineering")
engineering_df.show()
# Alternative syntax without col()
engineering_df = df.filter(df.department == "Engineering")
engineering_df.show()
Both approaches work identically. Using col() is more explicit and works better when column names contain special characters or spaces, but the DataFrame attribute syntax is cleaner for simple cases.
For numeric comparisons, use the standard operators:
# Employees with salary greater than 70000
high_earners = df.filter(col("salary") > 70000)
# Employees aged 30 or older
experienced = df.filter(col("age") >= 30)
# Employees younger than 30
young_employees = df.filter(col("age") < 30)
Handling null values requires special methods—you cannot use == None like you would in Python. Instead, use isNull() and isNotNull():
# Find rows with missing email addresses
missing_email = df.filter(col("email").isNull())
missing_email.show()
# Find rows with email addresses present
has_email = df.filter(col("email").isNotNull())
has_email.show()
Filtering with Multiple Conditions
Real-world filtering often requires combining multiple conditions. This is where many developers stumble. You must use bitwise operators (&, |, ~) instead of Python’s logical operators (and, or, not). Additionally, wrap each condition in parentheses to ensure correct operator precedence.
Using the AND operator to combine conditions:
# Engineering employees earning more than 80000
senior_engineers = df.filter(
(col("department") == "Engineering") & (col("salary") > 80000)
)
senior_engineers.show()
# Employees aged between 30 and 40 (inclusive) in Marketing
marketing_30s = df.filter(
(col("department") == "Marketing") &
(col("age") >= 30) &
(col("age") <= 40)
)
marketing_30s.show()
Using the OR operator for alternative conditions:
# Employees in either Sales or Marketing
sales_or_marketing = df.filter(
(col("department") == "Sales") | (col("department") == "Marketing")
)
sales_or_marketing.show()
# Employees either very young (<28) or very experienced (>40)
edge_ages = df.filter((col("age") < 28) | (col("age") > 40))
edge_ages.show()
The NOT operator negates a condition:
# Everyone except Engineering
non_engineering = df.filter(~(col("department") == "Engineering"))
non_engineering.show()
# Employees not in the 60000-80000 salary range
outside_range = df.filter(
~((col("salary") >= 60000) & (col("salary") <= 80000))
)
outside_range.show()
You can also chain multiple filter() calls, which is equivalent to using AND:
# These two approaches are identical
result1 = df.filter(col("department") == "Engineering").filter(col("salary") > 80000)
result2 = df.filter((col("department") == "Engineering") & (col("salary") > 80000))
Chaining can improve readability for complex conditions, but combining conditions in a single filter may perform better due to query optimization.
Advanced Filtering Techniques
PySpark provides specialized functions for common filtering patterns. The isin() method checks if a column value matches any value in a list:
# Employees in specific departments
selected_depts = df.filter(col("department").isin(["Engineering", "Sales"]))
selected_depts.show()
# Filter by multiple names
selected_people = df.filter(col("name").isin(["John", "Sarah", "Mike"]))
selected_people.show()
For string pattern matching, use like() for SQL-style wildcards or rlike() for regular expressions:
# Names starting with 'J' (SQL LIKE pattern)
j_names = df.filter(col("name").like("J%"))
j_names.show()
# Email addresses from example.com domain
example_emails = df.filter(col("email").like("%@example.com"))
example_emails.show()
# Names containing 'ar' using regular expression
ar_pattern = df.filter(col("name").rlike(".*ar.*"))
ar_pattern.show()
The startswith() and endswith() methods provide cleaner syntax for prefix and suffix matching:
# Departments starting with 'M'
m_departments = df.filter(col("department").startswith("M"))
m_departments.show()
# Names ending with 'a'
a_ending_names = df.filter(col("name").endswith("a"))
a_ending_names.show()
For range filtering, between() offers a concise alternative to combining comparison operators:
# Salaries between 60000 and 80000 (inclusive)
mid_range_salaries = df.filter(col("salary").between(60000, 80000))
mid_range_salaries.show()
# Ages in the 28-32 range
age_range = df.filter(col("age").between(28, 32))
age_range.show()
SQL-Style Filtering
If you’re more comfortable with SQL syntax, you can pass SQL expressions as strings to the filter() method:
# SQL-style string expression
sql_filter = df.filter("department = 'Engineering' AND salary > 80000")
sql_filter.show()
# Complex SQL expression
complex_sql = df.filter("""
(department IN ('Sales', 'Marketing') AND age < 30) OR
(department = 'Engineering' AND salary > 90000)
""")
complex_sql.show()
Compare this to the DataFrame API syntax:
# Equivalent using DataFrame API
api_filter = df.filter(
((col("department").isin(["Sales", "Marketing"]) & (col("age") < 30)) |
((col("department") == "Engineering") & (col("salary") > 90000)))
)
api_filter.show()
The SQL-style approach is more readable for complex conditions, especially for teams with strong SQL backgrounds. However, the DataFrame API provides better IDE support, type checking, and refactoring capabilities.
Performance Considerations
Filter placement significantly impacts performance. Place filters as early as possible in your transformation chain to reduce the amount of data processed by subsequent operations:
# Good: Filter early
result = df.filter(col("department") == "Engineering") \
.filter(col("salary") > 80000) \
.select("name", "salary") \
.groupBy("salary").count()
# Less efficient: Filter after expensive operations
result = df.select("name", "salary", "department") \
.groupBy("salary", "department").count() \
.filter(col("department") == "Engineering")
When filtering on multiple columns, order your conditions from most selective to least selective. While Spark’s Catalyst optimizer handles much of this, explicit ordering can help:
# If department is highly selective and age is not
optimized = df.filter(
(col("department") == "Engineering") & # More selective first
(col("age") > 25) # Less selective second
)
Avoid using UDFs (User Defined Functions) in filter conditions when built-in functions suffice. UDFs prevent predicate pushdown and force data serialization:
# Bad: UDF prevents optimization
from pyspark.sql.functions import udf
@udf(returnType=StringType())
def custom_check(value):
return value if value == "Engineering" else None
inefficient = df.filter(custom_check(col("department")).isNotNull())
# Good: Use built-in functions
efficient = df.filter(col("department") == "Engineering")
When working with partitioned data, filter on partition columns to enable partition pruning, which skips reading irrelevant partitions entirely:
# If data is partitioned by department
partitioned_filter = df.filter(col("department") == "Engineering")
# Spark only reads the Engineering partition
Filtering is the gateway to effective data processing in PySpark. Master these patterns, understand the performance implications, and you’ll write cleaner, faster data pipelines. The key is choosing the right syntax for your team’s background while keeping performance considerations in mind.