Spark SQL - Built-in Functions Reference

Spark SQL offers comprehensive string manipulation capabilities. The most commonly used functions handle case conversion, pattern matching, and substring extraction.

Key Insights

  • Spark SQL provides over 300 built-in functions across string manipulation, date/time operations, mathematical computations, aggregate operations, and array/map transformations that eliminate the need for custom UDFs in most scenarios
  • Understanding function categories and their performance characteristics helps you write efficient queries—aggregate functions with window specifications, for example, can replace complex self-joins while maintaining linear time complexity
  • Catalyst optimizer automatically optimizes built-in function calls through predicate pushdown and constant folding, making them significantly faster than user-defined functions that break optimization chains

String Functions

Spark SQL offers comprehensive string manipulation capabilities. The most commonly used functions handle case conversion, pattern matching, and substring extraction.

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder.appName("StringFunctions").getOrCreate()

data = [
    ("john.doe@example.com", "  Product Manager  "),
    ("jane.smith@company.org", "Senior Developer"),
    ("bob_wilson@tech.net", "data analyst")
]

df = spark.createDataFrame(data, ["email", "title"])

# String manipulation examples
result = df.select(
    col("email"),
    regexp_extract("email", "([^@]+)", 1).alias("username"),
    regexp_replace("email", "@.*", "@redacted.com").alias("masked_email"),
    split("email", "@").getItem(1).alias("domain"),
    upper(trim("title")).alias("normalized_title"),
    length("title").alias("title_length"),
    concat_ws(" - ", col("username"), col("domain")).alias("user_domain")
)

result.show(truncate=False)

The regexp_extract function uses Java regex patterns with capture groups. The second parameter specifies the group index. For complex pattern matching, regexp_replace supports backreferences and multiple replacements in a single pass.

// Scala example with advanced string operations
import org.apache.spark.sql.functions._

val df = spark.read.json("users.json")

df.select(
    substring_index($"full_path", "/", -1).as("filename"),
    translate($"phone", "()-", "").as("clean_phone"),
    overlay($"ssn", lit("XXX"), 1, 3).as("masked_ssn"),
    levenshtein($"input_name", $"expected_name").as("edit_distance")
).show()

Date and Time Functions

Temporal operations are critical for time-series analysis and data warehousing. Spark SQL handles timezone conversions, date arithmetic, and interval calculations natively.

from pyspark.sql.functions import *
from pyspark.sql.types import *

# Create sample time-series data
time_data = [
    ("2024-01-15 10:30:00", "America/New_York"),
    ("2024-02-20 14:45:00", "Europe/London"),
    ("2024-03-10 08:15:00", "Asia/Tokyo")
]

df = spark.createDataFrame(time_data, ["timestamp_str", "timezone"])

result = df.select(
    to_timestamp("timestamp_str").alias("ts"),
    current_timestamp().alias("now"),
    date_format("timestamp_str", "yyyy-MM-dd").alias("date_only"),
    date_add(to_date("timestamp_str"), 30).alias("plus_30_days"),
    datediff(current_date(), to_date("timestamp_str")).alias("days_ago"),
    year("timestamp_str").alias("year"),
    quarter("timestamp_str").alias("quarter"),
    weekofyear("timestamp_str").alias("week"),
    from_utc_timestamp("timestamp_str", "timezone").alias("localized_time"),
    unix_timestamp("timestamp_str").alias("epoch_seconds")
)

result.show(truncate=False)

For window-based time calculations, combine date functions with window specifications:

from pyspark.sql.window import Window

# Calculate rolling 7-day metrics
window_spec = Window.partitionBy("user_id").orderBy("event_date").rangeBetween(-6 * 86400, 0)

metrics = events_df.select(
    col("user_id"),
    col("event_date"),
    col("revenue"),
    sum("revenue").over(window_spec).alias("rolling_7day_revenue"),
    count("*").over(window_spec).alias("rolling_7day_events"),
    lag("revenue", 1).over(Window.partitionBy("user_id").orderBy("event_date")).alias("prev_day_revenue")
)

Aggregate Functions

Aggregate functions operate on groups of rows. They’re essential for analytical queries and can be combined with window functions for advanced analytics.

from pyspark.sql.functions import *

sales_data = [
    ("Electronics", "Laptop", 1200, 5),
    ("Electronics", "Mouse", 25, 50),
    ("Clothing", "Shirt", 30, 100),
    ("Clothing", "Pants", 50, 75),
    ("Electronics", "Keyboard", 80, 30)
]

df = spark.createDataFrame(sales_data, ["category", "product", "price", "quantity"])

# Basic aggregations
summary = df.groupBy("category").agg(
    count("*").alias("product_count"),
    sum(col("price") * col("quantity")).alias("total_revenue"),
    avg("price").alias("avg_price"),
    min("price").alias("min_price"),
    max("price").alias("max_price"),
    stddev("price").alias("price_stddev"),
    collect_list("product").alias("products"),
    approx_count_distinct("product").alias("distinct_products")
)

summary.show(truncate=False)

Advanced aggregate functions handle percentiles, statistical distributions, and custom accumulations:

# Statistical aggregations
stats = df.select(
    percentile_approx("price", 0.5).alias("median_price"),
    percentile_approx("price", array(lit(0.25), lit(0.75))).alias("quartiles"),
    corr("price", "quantity").alias("price_quantity_correlation"),
    covar_pop("price", "quantity").alias("covariance"),
    kurtosis("price").alias("price_kurtosis"),
    skewness("price").alias("price_skewness")
)

stats.show()

Array and Map Functions

Complex data types require specialized functions for element access, transformation, and filtering.

from pyspark.sql.functions import *

# Array operations
array_data = [
    (1, [10, 20, 30, 40, 50]),
    (2, [15, 25, 35]),
    (3, [5, 10, 15, 20])
]

df = spark.createDataFrame(array_data, ["id", "values"])

array_ops = df.select(
    col("id"),
    col("values"),
    size("values").alias("array_size"),
    array_contains("values", 20).alias("contains_20"),
    array_max("values").alias("max_value"),
    array_min("values").alias("min_value"),
    array_distinct("values").alias("unique_values"),
    sort_array("values", asc=False).alias("sorted_desc"),
    slice("values", 2, 3).alias("middle_elements"),
    element_at("values", -1).alias("last_element")
)

array_ops.show(truncate=False)

Transform and filter array elements with higher-order functions:

# Higher-order functions
transformed = df.select(
    col("id"),
    transform("values", lambda x: x * 2).alias("doubled"),
    filter("values", lambda x: x > 20).alias("filtered"),
    aggregate("values", lit(0), lambda acc, x: acc + x).alias("sum"),
    exists("values", lambda x: x > 40).alias("has_large_value"),
    forall("values", lambda x: x > 0).alias("all_positive")
)

transformed.show(truncate=False)

Map operations provide key-value manipulation:

# Map functions
map_data = [(1, {"name": "John", "age": "30", "city": "NYC"})]
df = spark.createDataFrame(map_data, ["id", "attributes"])

map_ops = df.select(
    col("id"),
    map_keys("attributes").alias("keys"),
    map_values("attributes").alias("values"),
    element_at("attributes", "name").alias("name"),
    map_concat("attributes", create_map(lit("country"), lit("USA"))).alias("extended")
)

map_ops.show(truncate=False)

Conditional and Null Handling

Conditional logic and null handling are fundamental for data cleansing and business logic implementation.

from pyspark.sql.functions import *

data = [
    (1, "Active", 100, None),
    (2, "Inactive", None, 50),
    (3, None, 200, 75),
    (4, "Pending", 0, 0)
]

df = spark.createDataFrame(data, ["id", "status", "amount", "discount"])

result = df.select(
    col("id"),
    coalesce("status", lit("Unknown")).alias("status_clean"),
    coalesce("amount", "discount", lit(0)).alias("value"),
    when(col("amount") > 100, "High")
        .when(col("amount") > 50, "Medium")
        .otherwise("Low").alias("tier"),
    nvl("amount", lit(0)).alias("amount_or_zero"),
    nullif("amount", lit(0)).alias("amount_null_if_zero"),
    isnan("amount").alias("is_nan"),
    isnull("amount").alias("is_null"),
    nanvl(col("amount"), lit(-1)).alias("nan_replaced")
)

result.show()

Mathematical and Bitwise Functions

Numerical computations span basic arithmetic to advanced mathematical operations.

from pyspark.sql.functions import *
import math

df = spark.range(1, 6).withColumn("value", col("id") * 10)

math_ops = df.select(
    col("id"),
    col("value"),
    sqrt("value").alias("square_root"),
    pow("value", 2).alias("squared"),
    log("value").alias("natural_log"),
    log10("value").alias("log_base_10"),
    exp("value").alias("exponential"),
    round("value" / 3, 2).alias("rounded"),
    ceil("value" / 3).alias("ceiling"),
    floor("value" / 3).alias("floor"),
    abs(col("value") - 100).alias("absolute_diff"),
    signum("value").alias("sign"),
    rand().alias("random_value"),
    shiftLeft("value", 2).alias("left_shift"),
    shiftRight("value", 2).alias("right_shift")
)

math_ops.show()

These built-in functions cover the majority of data processing requirements. When combined strategically, they eliminate performance overhead from UDFs while maintaining code readability. The Catalyst optimizer’s understanding of these functions enables query plan optimizations that custom code cannot achieve.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.