How to Use SQL Queries in PySpark
PySpark's SQL module bridges two worlds: the distributed computing power of Apache Spark and the familiar syntax of SQL. If you've ever worked on a team where data engineers write PySpark and...
Key Insights
- PySpark SQL lets you write standard SQL queries against DataFrames by registering them as temporary views, making Spark accessible to analysts and engineers who prefer declarative syntax over method chaining.
- The
spark.sql()method returns a DataFrame, enabling seamless mixing of SQL queries with DataFrame transformations for maximum flexibility in data pipelines. - Performance between SQL and DataFrame API is identical after Catalyst optimization, so choose based on readability and team familiarity rather than speed concerns.
Introduction to PySpark SQL
PySpark’s SQL module bridges two worlds: the distributed computing power of Apache Spark and the familiar syntax of SQL. If you’ve ever worked on a team where data engineers write PySpark and analysts write SQL, you’ve probably felt the friction. PySpark SQL eliminates that friction.
The SQL module isn’t a compromise or a wrapper—it’s a first-class interface to Spark’s execution engine. Under the hood, both the DataFrame API and SQL queries compile down to the same logical plan through Catalyst, Spark’s query optimizer. This means you’re not sacrificing performance for readability.
You’d reach for SQL over the DataFrame API when:
- Your team includes analysts who think in SQL
- You’re porting existing SQL queries from a data warehouse
- The query logic is complex with multiple CTEs and subqueries
- You want self-documenting code that reads like a specification
Let’s build up from the basics to advanced patterns.
Setting Up SparkSession
Every PySpark application starts with a SparkSession. This is your entry point to all Spark functionality, including SQL operations.
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("SQLQueryDemo") \
.config("spark.sql.shuffle.partitions", "200") \
.config("spark.sql.adaptive.enabled", "true") \
.getOrCreate()
The getOrCreate() method is idempotent—it returns an existing session if one exists or creates a new one. This matters in notebooks where you might run cells multiple times.
Two configurations worth noting here: spark.sql.shuffle.partitions controls how many partitions result from shuffle operations like joins and aggregations. The default of 200 is often too high for small datasets and too low for massive ones. Tune this based on your data size. spark.sql.adaptive.enabled turns on Adaptive Query Execution (AQE), which dynamically adjusts query plans at runtime. Keep this on unless you have a specific reason not to.
Creating Temporary Views from DataFrames
Before you can query a DataFrame with SQL, you need to register it as a view. Think of this as giving your DataFrame a table name that SQL can reference.
# Create sample data
data = [
("Alice", "Engineering", 95000, "2020-03-15"),
("Bob", "Engineering", 105000, "2019-07-22"),
("Carol", "Marketing", 85000, "2021-01-10"),
("David", "Marketing", 78000, "2020-11-03"),
("Eve", "Engineering", 115000, "2018-05-20")
]
columns = ["name", "department", "salary", "hire_date"]
employees_df = spark.createDataFrame(data, columns)
# Register as a temporary view
employees_df.createOrReplaceTempView("employees")
You have two options for temporary views:
createOrReplaceTempView() creates a view scoped to the current SparkSession. When the session ends, the view disappears. The “OrReplace” part means it overwrites any existing view with the same name—no errors if it already exists.
createGlobalTempView() creates a view accessible across multiple SparkSessions within the same Spark application. You reference it with the global_temp database prefix: SELECT * FROM global_temp.employees. Use this sparingly—it’s mainly useful for sharing data between sessions in complex applications.
For most use cases, stick with createOrReplaceTempView(). It’s simpler and the scoping prevents accidental data leakage between unrelated operations.
Executing SQL Queries with spark.sql()
The spark.sql() method takes a SQL string and returns a DataFrame. This is where the magic happens.
# Basic SELECT with filtering and aggregation
result = spark.sql("""
SELECT
department,
COUNT(*) as employee_count,
AVG(salary) as avg_salary,
MAX(salary) as max_salary
FROM employees
WHERE salary > 80000
GROUP BY department
ORDER BY avg_salary DESC
""")
result.show()
Output:
+-----------+--------------+----------+----------+
| department|employee_count|avg_salary|max_salary|
+-----------+--------------+----------+----------+
|Engineering| 3| 105000.0| 115000|
| Marketing| 1| 85000.0| 85000|
+-----------+--------------+----------+----------+
The returned DataFrame works exactly like any other DataFrame. You can call .show(), .collect(), .write, or chain additional transformations.
For joins, register multiple views and reference them in your query:
# Create departments data
departments_data = [
("Engineering", "Building A", "Jane"),
("Marketing", "Building B", "John"),
("Sales", "Building C", "Sarah")
]
departments_df = spark.createDataFrame(
departments_data,
["dept_name", "location", "manager"]
)
departments_df.createOrReplaceTempView("departments")
# JOIN query
joined_result = spark.sql("""
SELECT
e.name,
e.salary,
d.location,
d.manager
FROM employees e
INNER JOIN departments d
ON e.department = d.dept_name
WHERE e.salary > 90000
""")
joined_result.show()
PySpark SQL supports the full range of SQL operations: CTEs with WITH, window functions, subqueries, UNION, INTERSECT, and EXCEPT. If you can write it in standard SQL, it probably works.
Mixing SQL and DataFrame Operations
Here’s where PySpark shines over traditional SQL engines. Since spark.sql() returns a DataFrame, you can seamlessly chain DataFrame operations after a SQL query. This hybrid approach lets you use SQL for the parts where it’s cleaner and DataFrame methods where they’re more expressive.
from pyspark.sql.functions import col, when, lit, year, current_date
# Start with SQL for the complex join and aggregation logic
base_query = spark.sql("""
WITH salary_ranks AS (
SELECT
name,
department,
salary,
hire_date,
RANK() OVER (PARTITION BY department ORDER BY salary DESC) as dept_rank
FROM employees
)
SELECT * FROM salary_ranks WHERE dept_rank <= 3
""")
# Continue with DataFrame API for transformations
final_result = (
base_query
.withColumn("hire_date", col("hire_date").cast("date"))
.withColumn("tenure_years",
year(current_date()) - year(col("hire_date")))
.withColumn("salary_band",
when(col("salary") >= 100000, "Senior")
.when(col("salary") >= 80000, "Mid")
.otherwise("Junior"))
.filter(col("tenure_years") >= 2)
.select("name", "department", "salary", "salary_band", "tenure_years")
)
final_result.show()
This pattern works well because SQL excels at set operations, joins, and window functions, while the DataFrame API is more readable for conditional column creation and type transformations. Use each where it fits naturally.
Working with Complex Data Types in SQL
Real-world data is messy. You’ll encounter nested JSON, arrays, and structs. PySpark SQL handles these with functions like EXPLODE, LATERAL VIEW, and dot notation for struct access.
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, IntegerType
# Create nested data structure
nested_data = [
("order_1", "Alice", ["laptop", "mouse", "keyboard"],
{"city": "Seattle", "zip": "98101"}),
("order_2", "Bob", ["monitor", "webcam"],
{"city": "Portland", "zip": "97201"}),
("order_3", "Carol", ["desk", "chair", "lamp", "monitor"],
{"city": "Seattle", "zip": "98102"})
]
schema = StructType([
StructField("order_id", StringType(), True),
StructField("customer", StringType(), True),
StructField("items", ArrayType(StringType()), True),
StructField("shipping", StructType([
StructField("city", StringType(), True),
StructField("zip", StringType(), True)
]), True)
])
orders_df = spark.createDataFrame(nested_data, schema)
orders_df.createOrReplaceTempView("orders")
# Query nested structs with dot notation
city_orders = spark.sql("""
SELECT
order_id,
customer,
shipping.city as city,
shipping.zip as zip_code,
SIZE(items) as item_count
FROM orders
WHERE shipping.city = 'Seattle'
""")
city_orders.show()
# Explode arrays to create one row per item
exploded_items = spark.sql("""
SELECT
order_id,
customer,
exploded_item
FROM orders
LATERAL VIEW EXPLODE(items) AS exploded_item
""")
exploded_items.show()
The LATERAL VIEW EXPLODE syntax might look unfamiliar if you’re coming from PostgreSQL or MySQL. It’s Hive SQL syntax that PySpark inherited. Each array element becomes its own row, with all other columns duplicated. This is essential for analyzing array data with aggregations.
Performance Considerations and Best Practices
SQL and DataFrame operations compile to identical execution plans, so don’t choose based on perceived performance differences. Instead, focus on these practical optimizations:
Use EXPLAIN to understand your queries. Before running expensive operations on large datasets, check the execution plan:
spark.sql("""
SELECT
e.department,
COUNT(*) as cnt
FROM employees e
JOIN departments d ON e.department = d.dept_name
GROUP BY e.department
""").explain(mode="formatted")
This shows you the physical plan including shuffles, broadcasts, and scan operations. Look for Exchange nodes—these indicate shuffles, which are expensive.
Cache strategically. If you’re running multiple queries against the same view, cache it:
spark.sql("CACHE TABLE employees")
# Run multiple queries...
spark.sql("UNCACHE TABLE employees")
Only cache DataFrames you’ll reuse. Caching consumes memory and has overhead for serialization.
Prefer broadcast joins for small tables. When joining a large table with a small one, hint Spark to broadcast the small table:
result = spark.sql("""
SELECT /*+ BROADCAST(d) */
e.name, d.location
FROM employees e
JOIN departments d ON e.department = d.dept_name
""")
Broadcasting avoids shuffling the large table entirely.
Know when to use DataFrame API instead. SQL is great for ad-hoc analysis and complex queries, but the DataFrame API offers better IDE support, type checking, and composability for production pipelines. If you’re building reusable transformations, consider wrapping SQL in functions that return DataFrames, or use the DataFrame API directly for better testability.
The best PySpark code uses both approaches where each shines. SQL for readable, declarative queries; DataFrame API for programmatic transformations and pipeline composition.