PySpark - SQL SELECT Statement Examples
PySpark's SQL module bridges the gap between traditional SQL databases and distributed data processing. Under the hood, both SQL queries and DataFrame operations compile to the same optimized...
Key Insights
- PySpark SQL provides a familiar SQL interface for data engineers transitioning from traditional databases, while offering the same performance as the DataFrame API since both compile to the same execution plan
- Creating temporary views from DataFrames enables SQL queries through
spark.sql(), making it easy to mix SQL and DataFrame operations in the same pipeline - Window functions and complex aggregations are often more readable in SQL syntax than their DataFrame API equivalents, especially for analysts comfortable with SQL
Introduction to PySpark SQL
PySpark’s SQL module bridges the gap between traditional SQL databases and distributed data processing. Under the hood, both SQL queries and DataFrame operations compile to the same optimized execution plan through Catalyst, so performance is identical—choose based on readability and team expertise.
The key to using SQL in PySpark is creating temporary views from DataFrames. These views exist only for your Spark session and allow you to query DataFrames using standard SQL syntax through the spark.sql() method.
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from datetime import date
# Initialize SparkSession
spark = SparkSession.builder \
.appName("PySpark SQL Examples") \
.getOrCreate()
# Create sample employee data
data = [
("John Smith", "Engineering", 95000, date(2020, 1, 15)),
("Sarah Johnson", "Engineering", 105000, date(2019, 3, 22)),
("Mike Brown", "Sales", 75000, date(2021, 6, 10)),
("Emily Davis", "Sales", 82000, date(2020, 11, 5)),
("David Wilson", "Marketing", 68000, date(2022, 2, 18)),
("Lisa Anderson", "Engineering", 98000, date(2018, 9, 30)),
("James Taylor", "HR", 62000, date(2021, 4, 12)),
("Maria Garcia", "Marketing", 71000, date(2020, 7, 8)),
("Robert Martinez", "Sales", 79000, date(2019, 12, 1)),
("Jennifer Lee", "Engineering", 110000, date(2017, 5, 20))
]
schema = StructType([
StructField("name", StringType(), True),
StructField("department", StringType(), True),
StructField("salary", IntegerType(), True),
StructField("hire_date", DateType(), True)
])
employees_df = spark.createDataFrame(data, schema)
# Create temporary view for SQL queries
employees_df.createOrReplaceTempView("employees")
Basic SELECT Queries
The SELECT statement forms the foundation of all SQL queries. In PySpark, you execute SQL through spark.sql(), which returns a DataFrame that you can further manipulate or display.
# Select all columns
all_employees = spark.sql("SELECT * FROM employees")
all_employees.show()
# Select specific columns
names_salaries = spark.sql("""
SELECT name, salary
FROM employees
""")
names_salaries.show()
# Column aliasing for clarity
aliased_query = spark.sql("""
SELECT
name AS employee_name,
department AS dept,
salary AS annual_salary
FROM employees
""")
aliased_query.show()
# Get distinct departments
distinct_depts = spark.sql("""
SELECT DISTINCT department
FROM employees
ORDER BY department
""")
distinct_depts.show()
Column aliases improve readability, especially when joining tables or performing calculations. Use them liberally to make your query results self-documenting.
Filtering with WHERE Clause
The WHERE clause filters rows based on conditions. PySpark SQL supports all standard SQL comparison and logical operators, making it straightforward to express complex filtering logic.
# Filter by salary range using BETWEEN
high_earners = spark.sql("""
SELECT name, salary
FROM employees
WHERE salary BETWEEN 80000 AND 100000
""")
high_earners.show()
# Multiple conditions with AND/OR
engineering_high_earners = spark.sql("""
SELECT name, department, salary
FROM employees
WHERE department = 'Engineering'
AND salary > 95000
""")
engineering_high_earners.show()
# Complex logical conditions
sales_or_high_salary = spark.sql("""
SELECT name, department, salary
FROM employees
WHERE department = 'Sales'
OR salary > 100000
""")
sales_or_high_salary.show()
# Pattern matching with LIKE
names_with_j = spark.sql("""
SELECT name, department
FROM employees
WHERE name LIKE 'J%'
""")
names_with_j.show()
# IN operator for multiple values
specific_departments = spark.sql("""
SELECT name, department, salary
FROM employees
WHERE department IN ('Engineering', 'Sales')
""")
specific_departments.show()
For NULL checks, always use IS NULL or IS NOT NULL—never use = NULL, which won’t work as expected:
# Correct NULL checking (hypothetical column with nulls)
non_null_check = spark.sql("""
SELECT name, department
FROM employees
WHERE department IS NOT NULL
""")
Sorting and Limiting Results
ORDER BY controls result ordering, while LIMIT restricts the number of rows returned. Combine them for “top N” queries, a common pattern in analytics.
# Sort by salary descending
sorted_by_salary = spark.sql("""
SELECT name, salary
FROM employees
ORDER BY salary DESC
""")
sorted_by_salary.show()
# Multi-column sorting
multi_sort = spark.sql("""
SELECT name, department, salary
FROM employees
ORDER BY department ASC, salary DESC
""")
multi_sort.show()
# Top 5 highest paid employees
top_earners = spark.sql("""
SELECT name, salary
FROM employees
ORDER BY salary DESC
LIMIT 5
""")
top_earners.show()
Note that LIMIT without ORDER BY returns arbitrary rows due to distributed processing—always include ORDER BY when you need specific rows.
Aggregate Functions and GROUP BY
Aggregations summarize data across groups. The HAVING clause filters groups after aggregation, while WHERE filters rows before aggregation.
# Average salary by department
avg_by_dept = spark.sql("""
SELECT
department,
AVG(salary) AS avg_salary,
COUNT(*) AS employee_count
FROM employees
GROUP BY department
ORDER BY avg_salary DESC
""")
avg_by_dept.show()
# Multiple aggregations
dept_statistics = spark.sql("""
SELECT
department,
COUNT(*) AS total_employees,
MIN(salary) AS min_salary,
MAX(salary) AS max_salary,
AVG(salary) AS avg_salary,
SUM(salary) AS total_payroll
FROM employees
GROUP BY department
""")
dept_statistics.show()
# HAVING clause for filtered aggregations
large_departments = spark.sql("""
SELECT
department,
COUNT(*) AS employee_count,
AVG(salary) AS avg_salary
FROM employees
GROUP BY department
HAVING COUNT(*) >= 3
ORDER BY employee_count DESC
""")
large_departments.show()
Joins and Subqueries
Joins combine data from multiple tables, while subqueries enable nested logic. Create a departments table to demonstrate:
# Create departments table
dept_data = [
("Engineering", "Alice Chen", "Building A"),
("Sales", "Bob Williams", "Building B"),
("Marketing", "Carol White", "Building C"),
("HR", "Dan Brown", "Building A")
]
dept_schema = StructType([
StructField("dept_name", StringType(), True),
StructField("manager", StringType(), True),
StructField("location", StringType(), True)
])
departments_df = spark.createDataFrame(dept_data, dept_schema)
departments_df.createOrReplaceTempView("departments")
# INNER JOIN
employee_details = spark.sql("""
SELECT
e.name,
e.salary,
d.manager,
d.location
FROM employees e
INNER JOIN departments d ON e.department = d.dept_name
""")
employee_details.show()
# LEFT JOIN to include all employees
all_employees_details = spark.sql("""
SELECT
e.name,
e.department,
d.manager,
d.location
FROM employees e
LEFT JOIN departments d ON e.department = d.dept_name
""")
all_employees_details.show()
# Subquery in WHERE clause - employees earning above average
above_average = spark.sql("""
SELECT name, salary
FROM employees
WHERE salary > (SELECT AVG(salary) FROM employees)
ORDER BY salary DESC
""")
above_average.show()
# Subquery in SELECT clause
with_avg_comparison = spark.sql("""
SELECT
name,
salary,
(SELECT AVG(salary) FROM employees) AS company_avg,
salary - (SELECT AVG(salary) FROM employees) AS difference
FROM employees
ORDER BY difference DESC
""")
with_avg_comparison.show()
Advanced Features
Window functions, CASE statements, and date functions unlock sophisticated analytics without complex self-joins.
# Window function - rank employees by salary within department
ranked_employees = spark.sql("""
SELECT
name,
department,
salary,
RANK() OVER (PARTITION BY department ORDER BY salary DESC) AS dept_rank
FROM employees
""")
ranked_employees.show()
# CASE statement for salary categories
salary_categories = spark.sql("""
SELECT
name,
salary,
CASE
WHEN salary >= 100000 THEN 'High'
WHEN salary >= 75000 THEN 'Medium'
ELSE 'Entry'
END AS salary_band
FROM employees
ORDER BY salary DESC
""")
salary_categories.show()
# Date functions
hire_analysis = spark.sql("""
SELECT
name,
hire_date,
YEAR(hire_date) AS hire_year,
MONTH(hire_date) AS hire_month,
DATEDIFF(CURRENT_DATE(), hire_date) AS days_employed
FROM employees
ORDER BY hire_date
""")
hire_analysis.show()
# Combining window functions with aggregations
running_totals = spark.sql("""
SELECT
department,
name,
salary,
SUM(salary) OVER (PARTITION BY department ORDER BY name) AS running_dept_total
FROM employees
ORDER BY department, name
""")
running_totals.show()
PySpark SQL gives you the full power of SQL in a distributed environment. Use it when SQL syntax makes your logic clearer, when working with analysts who prefer SQL, or when migrating existing SQL queries to Spark. The performance is identical to DataFrame operations, so choose based on readability and maintainability for your team.