PySpark SQL Tutorial - A Complete Guide

PySpark SQL is Apache Spark's module for structured data processing, providing a programming interface for working with structured and semi-structured data. While pandas excels at small to medium...

Key Insights

  • PySpark SQL combines the simplicity of SQL with distributed computing power, making it essential for processing datasets that exceed single-machine memory limits—typically anything over 100GB that would crash pandas.
  • The DataFrame API and SQL interface are interchangeable and can be mixed freely, letting you use whichever approach fits your team’s skills while maintaining the same performance characteristics.
  • Proper partitioning, caching, and broadcast joins can improve query performance by 10-100x, but premature optimization wastes time—always check execution plans with explain() before optimizing.

Introduction to PySpark SQL

PySpark SQL is Apache Spark’s module for structured data processing, providing a programming interface for working with structured and semi-structured data. While pandas excels at small to medium datasets that fit in memory, PySpark SQL shines when you’re dealing with data that spans hundreds of gigabytes or terabytes across multiple machines.

The critical difference between PySpark DataFrames and RDDs (Resilient Distributed Datasets) is that DataFrames are built on top of RDDs with a schema-aware structure. This schema awareness enables Spark’s Catalyst optimizer to generate efficient execution plans, while RDDs require you to manually optimize transformations. Unless you’re doing something extremely specialized, stick with DataFrames.

Choose PySpark SQL when your data exceeds single-machine memory, you need horizontal scaling, or you’re processing data in a distributed environment like Databricks, EMR, or on-premises Hadoop clusters. For datasets under 50GB with simple transformations, pandas is usually faster and simpler.

from pyspark.sql import SparkSession

# Initialize SparkSession - the entry point for PySpark SQL
spark = SparkSession.builder \
    .appName("PySpark SQL Tutorial") \
    .config("spark.sql.shuffle.partitions", "200") \
    .getOrCreate()

# Verify it's working
print(spark.version)

Creating and Loading DataFrames

PySpark SQL supports reading from CSV, JSON, Parquet, Avro, ORC, and JDBC sources. Schema inference is convenient for exploration but costs performance in production—always define explicit schemas for production pipelines.

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

# Define explicit schema for better performance
schema = StructType([
    StructField("user_id", IntegerType(), False),
    StructField("name", StringType(), True),
    StructField("email", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("salary", DoubleType(), True)
])

# Read CSV with explicit schema
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv("s3://bucket/users.csv")

# Create DataFrame from Python collections
data = [
    (1, "Alice", "alice@example.com", 28, 75000.0),
    (2, "Bob", "bob@example.com", 35, 85000.0),
    (3, "Charlie", "charlie@example.com", 42, 95000.0)
]

df_from_list = spark.createDataFrame(data, schema=schema)
df_from_list.show()

For JSON with nested structures, PySpark automatically infers complex types. Parquet is the preferred format for large datasets because it’s columnar, compressed, and preserves schema information.

# Read Parquet (preserves schema automatically)
df_parquet = spark.read.parquet("s3://bucket/events/*.parquet")

# Read from PostgreSQL
df_db = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:5432/mydb") \
    .option("dbtable", "users") \
    .option("user", "postgres") \
    .option("password", "password") \
    .load()

DataFrame Operations and Transformations

PySpark transformations are lazy—they build an execution plan but don’t execute until you call an action like show(), collect(), or write(). This laziness enables optimization across multiple operations.

from pyspark.sql.functions import col, when, upper, concat, lit

# Chain multiple transformations
result = df \
    .filter(col("age") > 30) \
    .select("name", "email", "salary") \
    .withColumn("salary_band", 
                when(col("salary") < 80000, "Junior")
                .when(col("salary") < 100000, "Mid")
                .otherwise("Senior")) \
    .withColumn("email_upper", upper(col("email"))) \
    .withColumnRenamed("name", "full_name") \
    .orderBy(col("salary").desc())

result.show()

# Add computed columns
df_enhanced = df \
    .withColumn("email_domain", 
                concat(lit("@"), 
                       col("email").substr(col("email").instr("@") + 1, 100))) \
    .withColumn("age_group", (col("age") / 10).cast("int") * 10)

# Handle null values
df_cleaned = df \
    .na.fill({"age": 0, "salary": 0.0}) \
    .na.drop(subset=["email"]) \
    .dropDuplicates(["user_id"])

# Group and aggregate
summary = df \
    .groupBy("age_group") \
    .agg({"salary": "avg", "user_id": "count"}) \
    .withColumnRenamed("avg(salary)", "avg_salary") \
    .withColumnRenamed("count(user_id)", "user_count")

summary.show()

SQL Queries with PySpark

One of PySpark SQL’s most powerful features is the ability to register DataFrames as temporary views and query them with standard SQL. This is invaluable when working with analysts who prefer SQL or when migrating from traditional databases.

# Register DataFrame as temporary view
df.createOrReplaceTempView("users")

# Execute SQL queries
sql_result = spark.sql("""
    SELECT 
        FLOOR(age / 10) * 10 as age_group,
        COUNT(*) as user_count,
        AVG(salary) as avg_salary,
        MAX(salary) as max_salary
    FROM users
    WHERE age > 25
    GROUP BY age_group
    ORDER BY age_group
""")

sql_result.show()

# Mix SQL and DataFrame API
df_departments = spark.createDataFrame([
    (1, "Engineering"),
    (2, "Sales"),
    (3, "Marketing")
], ["dept_id", "dept_name"])

df_departments.createOrReplaceTempView("departments")

# Complex JOIN in SQL
joined_result = spark.sql("""
    SELECT 
        u.name,
        u.salary,
        d.dept_name
    FROM users u
    LEFT JOIN departments d ON u.user_id = d.dept_id
    WHERE u.salary > 80000
""")

Advanced Operations

Window functions enable sophisticated analytics like running totals, rankings, and moving averages without self-joins. UDFs let you apply custom Python logic, though they’re slower than built-in functions due to serialization overhead.

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, rank, dense_rank, sum as _sum, lag
from pyspark.sql.types import FloatType

# Window functions for analytics
windowSpec = Window.partitionBy("dept_id").orderBy(col("salary").desc())

df_ranked = df \
    .withColumn("rank", rank().over(windowSpec)) \
    .withColumn("row_number", row_number().over(windowSpec)) \
    .withColumn("running_total", _sum("salary").over(windowSpec))

# User-defined function (UDF)
def calculate_bonus(salary):
    if salary < 80000:
        return salary * 0.05
    elif salary < 100000:
        return salary * 0.10
    else:
        return salary * 0.15

bonus_udf = spark.udf.register("bonus_udf", calculate_bonus, FloatType())

df_with_bonus = df.withColumn("bonus", bonus_udf(col("salary")))

# Complex joins
df_salaries = spark.createDataFrame([
    (1, 2020, 70000.0),
    (1, 2021, 75000.0),
    (2, 2020, 80000.0)
], ["user_id", "year", "salary"])

# Join with different types
inner_join = df.join(df_salaries, "user_id", "inner")
left_join = df.join(df_salaries, "user_id", "left")
cross_join = df.crossJoin(df_departments)

# Union operations
df_2023 = spark.createDataFrame([(4, "David", "david@example.com", 30, 70000.0)], schema=schema)
df_combined = df.union(df_2023)

Performance Optimization

Understanding Spark’s execution model is crucial for performance. Use explain() to inspect query plans and identify bottlenecks. Caching is effective for iterative algorithms or when you access the same DataFrame multiple times.

# Cache frequently accessed DataFrame
df.cache()
df.count()  # Materializes the cache

# Or persist with specific storage level
from pyspark import StorageLevel
df.persist(StorageLevel.MEMORY_AND_DISK)

# Repartition for better parallelism
df_repartitioned = df.repartition(100, "user_id")

# Coalesce to reduce partitions (no shuffle)
df_fewer_partitions = df.coalesce(10)

# Broadcast join for small dimension tables
from pyspark.sql.functions import broadcast

df_small = spark.createDataFrame([(1, "A"), (2, "B")], ["id", "category"])
df_broadcast_join = df.join(broadcast(df_small), df.user_id == df_small.id)

# Inspect execution plan
df_broadcast_join.explain(True)

# Bucketing for repeated joins on same keys
df.write \
    .bucketBy(100, "user_id") \
    .sortBy("user_id") \
    .saveAsTable("users_bucketed")

The explain() output shows physical and logical plans. Look for “Exchange” operations indicating shuffles—these are expensive. Broadcast joins eliminate shuffles for small tables (typically under 10MB).

Writing and Saving Data

Choose your output format based on use case: Parquet for analytics, JSON for APIs, CSV for compatibility. Partitioning by date or category dramatically improves query performance for downstream consumers.

# Write as Parquet with partitioning
df.write \
    .mode("overwrite") \
    .partitionBy("age_group") \
    .parquet("s3://bucket/users_partitioned/")

# Append to existing data
df_new.write \
    .mode("append") \
    .parquet("s3://bucket/users_partitioned/")

# Write to PostgreSQL
df.write \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:5432/mydb") \
    .option("dbtable", "users_export") \
    .option("user", "postgres") \
    .option("password", "password") \
    .mode("overwrite") \
    .save()

# Write as Delta Lake for ACID transactions (requires delta package)
df.write.format("delta").mode("overwrite").save("/delta/users")

# Control output file size with repartition
df.repartition(1).write.csv("s3://bucket/single_file/")

Partition pruning is automatic when you filter on partition columns—Spark only reads relevant directories. For time-series data, partition by date. For multi-tenant data, partition by tenant_id. Avoid over-partitioning (thousands of small files) which creates metadata overhead.

PySpark SQL strikes the right balance between SQL’s familiarity and distributed computing’s power. Master the DataFrame API, understand execution plans, and you’ll process terabytes as easily as you once processed gigabytes.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.