PySpark - Run SQL Queries on DataFrame

PySpark provides two primary interfaces for data manipulation: the DataFrame API and SQL queries. While the DataFrame API offers programmatic control with method chaining, SQL queries often provide...

Key Insights

  • PySpark’s spark.sql() lets you run SQL queries on DataFrames after registering them as temporary views, making complex analytics more readable for SQL-proficient teams
  • Temporary views are session-scoped while global temporary views persist across sessions but require the global_temp database prefix
  • SQL queries in PySpark return DataFrames, allowing seamless integration with the DataFrame API for hybrid workflows

Introduction to SQL in PySpark

PySpark provides two primary interfaces for data manipulation: the DataFrame API and SQL queries. While the DataFrame API offers programmatic control with method chaining, SQL queries often provide superior readability for complex analytical operations, especially for teams with strong SQL backgrounds.

Consider a scenario where you need to find the top 5 customers by total purchase amount with their average order value. Here’s how both approaches compare:

from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, avg, desc

spark = SparkSession.builder.appName("SQLDemo").getOrCreate()

# Sample data
orders_data = [
    (1, "C001", 150.0),
    (2, "C002", 200.0),
    (3, "C001", 300.0),
    (4, "C003", 100.0),
    (5, "C002", 250.0)
]
orders_df = spark.createDataFrame(orders_data, ["order_id", "customer_id", "amount"])

# DataFrame API approach
result_df_api = (orders_df
    .groupBy("customer_id")
    .agg(sum("amount").alias("total_amount"), avg("amount").alias("avg_order_value"))
    .orderBy(desc("total_amount"))
    .limit(5))

# SQL approach
orders_df.createOrReplaceTempView("orders")
result_sql = spark.sql("""
    SELECT customer_id, 
           SUM(amount) as total_amount,
           AVG(amount) as avg_order_value
    FROM orders
    GROUP BY customer_id
    ORDER BY total_amount DESC
    LIMIT 5
""")

Both produce identical results, but the SQL version reads more naturally for those familiar with traditional databases. The choice depends on your team’s expertise and the complexity of the operation.

Creating a Temporary View

Before running SQL queries, you must register your DataFrame as a temporary view using createOrReplaceTempView() or createOrReplaceGlobalTempView(). These methods make DataFrames queryable using SQL syntax.

Session-scoped temporary views exist only within the current SparkSession and are automatically dropped when the session ends:

# Create sample employee data
employees_data = [
    (1, "Alice", "Engineering", 95000),
    (2, "Bob", "Sales", 75000),
    (3, "Charlie", "Engineering", 85000),
    (4, "Diana", "Marketing", 70000)
]
employees_df = spark.createDataFrame(
    employees_data, 
    ["emp_id", "name", "department", "salary"]
)

# Create session-scoped temporary view
employees_df.createOrReplaceTempView("employees")

# Query the view
result = spark.sql("SELECT * FROM employees WHERE department = 'Engineering'")
result.show()

Global temporary views persist across multiple SparkSessions within the same application and require the global_temp database prefix:

# Create global temporary view
employees_df.createOrReplaceGlobalTempView("employees_global")

# Query from the same session
spark.sql("SELECT * FROM global_temp.employees_global WHERE salary > 80000").show()

# This view would be accessible from another SparkSession in the same application
# new_spark = spark.newSession()
# new_spark.sql("SELECT * FROM global_temp.employees_global")

Use global temporary views when you need to share data across different parts of your application. For most use cases, session-scoped views provide sufficient functionality with simpler syntax.

Running Basic SQL Queries

The spark.sql() method executes SQL queries and returns a DataFrame. This enables all standard SQL operations including filtering, aggregation, and sorting.

# Create sales data
sales_data = [
    ("2024-01-15", "Electronics", "Laptop", 1200, 2),
    ("2024-01-16", "Electronics", "Mouse", 25, 10),
    ("2024-01-16", "Clothing", "Shirt", 30, 5),
    ("2024-01-17", "Electronics", "Keyboard", 75, 8),
    ("2024-01-17", "Clothing", "Pants", 50, 3),
    ("2024-01-18", "Electronics", "Monitor", 300, 4)
]
sales_df = spark.createDataFrame(
    sales_data, 
    ["date", "category", "product", "price", "quantity"]
)
sales_df.createOrReplaceTempView("sales")

# SELECT with WHERE clause
filtered = spark.sql("""
    SELECT date, product, price, quantity, (price * quantity) as total_value
    FROM sales
    WHERE category = 'Electronics' AND price > 50
""")
filtered.show()

# GROUP BY with aggregations
category_summary = spark.sql("""
    SELECT category,
           COUNT(*) as num_transactions,
           SUM(price * quantity) as total_revenue,
           AVG(price) as avg_price,
           MAX(quantity) as max_quantity
    FROM sales
    GROUP BY category
    ORDER BY total_revenue DESC
""")
category_summary.show()

# Using HAVING clause
high_value_categories = spark.sql("""
    SELECT category, SUM(price * quantity) as total_revenue
    FROM sales
    GROUP BY category
    HAVING SUM(price * quantity) > 500
""")
high_value_categories.show()

These basic operations form the foundation of most analytical queries. PySpark’s SQL engine optimizes these operations using Catalyst, ensuring efficient execution even on large datasets.

Advanced SQL Operations

PySpark SQL supports sophisticated operations including joins, subqueries, CTEs, and window functions.

Joins between multiple DataFrames:

# Customer data
customers_data = [(1, "Alice", "Premium"), (2, "Bob", "Standard"), (3, "Charlie", "Premium")]
customers_df = spark.createDataFrame(customers_data, ["customer_id", "name", "tier"])
customers_df.createOrReplaceTempView("customers")

# Orders data
orders_data = [(101, 1, 500), (102, 2, 200), (103, 1, 300), (104, 3, 450)]
orders_df = spark.createDataFrame(orders_data, ["order_id", "customer_id", "amount"])
orders_df.createOrReplaceTempView("orders")

# INNER JOIN with aggregation
customer_orders = spark.sql("""
    SELECT c.name, c.tier, 
           COUNT(o.order_id) as order_count,
           SUM(o.amount) as total_spent
    FROM customers c
    INNER JOIN orders o ON c.customer_id = o.customer_id
    GROUP BY c.name, c.tier
    ORDER BY total_spent DESC
""")
customer_orders.show()

Common Table Expressions (CTEs) improve query readability by breaking complex logic into named subqueries:

cte_query = spark.sql("""
    WITH customer_totals AS (
        SELECT customer_id, SUM(amount) as total_amount
        FROM orders
        GROUP BY customer_id
    ),
    tier_averages AS (
        SELECT tier, AVG(total_amount) as avg_spending
        FROM customer_totals ct
        JOIN customers c ON ct.customer_id = c.customer_id
        GROUP BY tier
    )
    SELECT * FROM tier_averages
    ORDER BY avg_spending DESC
""")
cte_query.show()

Window functions enable advanced analytics like ranking and running totals:

window_query = spark.sql("""
    SELECT customer_id, order_id, amount,
           ROW_NUMBER() OVER (PARTITION BY customer_id ORDER BY amount DESC) as rank,
           SUM(amount) OVER (PARTITION BY customer_id ORDER BY order_id 
                             ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as running_total
    FROM orders
""")
window_query.show()

Converting SQL Results Back to DataFrames

Since spark.sql() returns a DataFrame, you can seamlessly combine SQL queries with DataFrame API operations:

# SQL query result is a DataFrame
high_value_orders = spark.sql("""
    SELECT customer_id, order_id, amount
    FROM orders
    WHERE amount > 250
""")

# Chain DataFrame API methods
from pyspark.sql.functions import col

result = (high_value_orders
    .filter(col("amount") < 600)
    .withColumn("amount_with_tax", col("amount") * 1.08)
    .select("customer_id", "amount_with_tax"))

result.show()

# Write results to storage
result.write.mode("overwrite").parquet("output/high_value_orders")

# Or save as CSV
result.write.mode("overwrite").option("header", "true").csv("output/high_value_orders_csv")

# Create a new temporary view from SQL results
high_value_orders.createOrReplaceTempView("high_value_orders")
spark.sql("SELECT customer_id, COUNT(*) FROM high_value_orders GROUP BY customer_id").show()

This hybrid approach leverages the strengths of both interfaces—use SQL for complex analytical logic and the DataFrame API for programmatic transformations.

Best Practices and Performance Considerations

When to use SQL vs DataFrame API:

  • Use SQL for complex analytical queries, reporting, and when working with SQL-proficient teams
  • Use DataFrame API for programmatic transformations, dynamic query construction, and when type safety matters
  • Combine both approaches based on readability and maintainability

Performance tips:

# Use EXPLAIN to understand query execution
spark.sql("SELECT * FROM orders WHERE amount > 300").explain(True)

# Both approaches generate identical execution plans
df_api_plan = orders_df.filter(col("amount") > 300)
sql_plan = spark.sql("SELECT * FROM orders WHERE amount > 300")

print("DataFrame API plan:")
df_api_plan.explain()
print("\nSQL plan:")
sql_plan.explain()

Catalog management:

# List all temporary views
spark.catalog.listTables()

# Drop a temporary view when no longer needed
spark.catalog.dropTempView("orders")

# Check if a view exists
if spark.catalog.tableExists("employees"):
    spark.sql("SELECT * FROM employees LIMIT 5").show()

Cache frequently accessed views:

# Cache the DataFrame before creating a view for better performance
employees_df.cache()
employees_df.createOrReplaceTempView("employees_cached")

# Multiple queries will benefit from caching
spark.sql("SELECT COUNT(*) FROM employees_cached").show()
spark.sql("SELECT department, AVG(salary) FROM employees_cached GROUP BY department").show()

PySpark’s SQL interface provides a powerful, familiar way to analyze data at scale. By understanding temporary views, combining SQL with DataFrame operations, and following performance best practices, you can build efficient data processing pipelines that leverage your team’s SQL expertise while maintaining the flexibility of PySpark’s distributed computing capabilities.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.