Spark SQL - Array Functions

• Spark SQL provides 50+ array functions that enable complex data transformations without UDFs, significantly improving performance through Catalyst optimizer integration and whole-stage code...

Key Insights

• Spark SQL provides 50+ array functions that enable complex data transformations without UDFs, significantly improving performance through Catalyst optimizer integration and whole-stage code generation. • Functions like array_agg, flatten, and transform replace expensive explode-join-collect patterns, reducing shuffle operations and memory overhead in distributed processing. • Combining higher-order functions (transform, filter, aggregate) with lambda expressions enables functional programming paradigms directly in SQL, eliminating the need for custom Python or Scala code.

Core Array Construction and Manipulation

Spark SQL arrays form the foundation for handling nested data structures. The array() function creates arrays from columns or literals, while array_repeat() and sequence() generate arrays programmatically.

-- Create arrays from columns
SELECT 
    array(col1, col2, col3) as combined,
    array_repeat('default', 5) as defaults,
    sequence(1, 10, 2) as odd_numbers
FROM source_table;

-- Array from date ranges
SELECT 
    sequence(
        to_date('2024-01-01'),
        to_date('2024-12-31'),
        interval 1 month
    ) as monthly_dates;

The split() function converts delimited strings into arrays, essential for parsing CSV-like data within fields:

SELECT 
    order_id,
    split(product_codes, ',') as product_array,
    split(tags, ';') as tag_array
FROM orders
WHERE product_codes IS NOT NULL;

Reverse operations use concat_ws() to join array elements back into strings:

SELECT 
    concat_ws('|', product_array) as pipe_delimited,
    array_join(tag_array, ', ') as comma_separated
FROM parsed_orders;

Array Aggregation and Flattening

The collect_list() and collect_set() aggregation functions group values into arrays, replacing complex grouping patterns:

-- Collect all purchases per customer
SELECT 
    customer_id,
    collect_list(product_id) as all_purchases,
    collect_set(category) as unique_categories,
    count(*) as transaction_count
FROM transactions
GROUP BY customer_id;

For nested arrays, flatten() reduces dimensionality by one level:

-- Flatten nested product arrays
SELECT 
    customer_id,
    flatten(
        collect_list(array(product_id, related_product_id))
    ) as all_product_ids
FROM customer_products
GROUP BY customer_id;

-- Multiple levels require chaining
SELECT flatten(flatten(deeply_nested_array)) as flat_array
FROM complex_data;

The array_distinct() function removes duplicates after flattening:

SELECT 
    customer_id,
    array_distinct(flatten(collect_list(tags))) as unique_tags
FROM customer_interactions
GROUP BY customer_id;

Array Access and Slicing

Array indexing in Spark SQL is 1-based, unlike many programming languages. The element_at() function provides safe access with null handling:

-- Access array elements
SELECT 
    product_array[1] as first_product,  -- 1-based indexing
    element_at(product_array, -1) as last_product,  -- Negative indexing
    element_at(product_array, 10) as maybe_null  -- Returns null if out of bounds
FROM orders;

The slice() function extracts subarrays:

-- Get first 3 and last 3 elements
SELECT 
    order_id,
    slice(product_array, 1, 3) as top_three,
    slice(product_array, -3, 3) as bottom_three
FROM orders;

Array size and existence checks prevent null pointer exceptions:

SELECT 
    size(product_array) as product_count,
    array_contains(categories, 'Electronics') as has_electronics,
    arrays_overlap(purchased, recommended) as has_recommendations
FROM customer_data
WHERE size(product_array) > 0;  -- Filter empty arrays

Higher-Order Functions with Transform

The transform() function applies lambda expressions to each array element, enabling map operations without exploding:

-- Apply transformations to array elements
SELECT 
    transform(prices, x -> x * 1.1) as prices_with_tax,
    transform(product_names, x -> upper(x)) as uppercase_names,
    transform(dates, x -> date_add(x, 7)) as next_week_dates
FROM products;

Complex transformations access both element and index:

-- Add index-based calculations
SELECT 
    transform(
        sequence(1, 12),
        (month, idx) -> struct(
            month as month_num,
            idx as position,
            month * revenue_base as projected_revenue
        )
    ) as monthly_projections
FROM revenue_forecast;

Nested transformations handle multi-dimensional arrays:

-- Transform nested structures
SELECT 
    transform(
        order_lines,
        line -> transform(
            line.items,
            item -> item.price * item.quantity
        )
    ) as line_totals
FROM orders;

Filtering and Aggregating Arrays

The filter() function selects array elements matching predicates:

-- Filter array elements
SELECT 
    customer_id,
    filter(transactions, x -> x.amount > 100) as large_transactions,
    filter(products, x -> x.category = 'Electronics') as electronics,
    filter(dates, x -> year(x) = 2024) as current_year_dates
FROM customer_history;

Combining filter() with size() creates conditional metrics:

SELECT 
    customer_id,
    size(filter(orders, x -> x.status = 'completed')) as completed_count,
    size(filter(orders, x -> x.status = 'cancelled')) as cancelled_count,
    size(filter(orders, x -> x.total > 500)) as high_value_count
FROM customer_orders;

The aggregate() function reduces arrays to single values using accumulator patterns:

-- Sum array elements
SELECT 
    order_id,
    aggregate(
        quantities,
        0,  -- Initial value
        (acc, x) -> acc + x  -- Accumulator function
    ) as total_quantity,
    aggregate(
        prices,
        0.0,
        (acc, x) -> acc + x,
        acc -> round(acc, 2)  -- Finish function
    ) as total_price
FROM order_details;

Complex aggregations implement custom logic:

-- Find maximum with custom comparison
SELECT 
    aggregate(
        product_scores,
        struct(0 as max_score, '' as product_id),
        (acc, x) -> CASE 
            WHEN x.score > acc.max_score 
            THEN struct(x.score as max_score, x.id as product_id)
            ELSE acc
        END
    ) as best_product
FROM product_rankings;

Array Comparison and Set Operations

Spark SQL provides set operations for array comparison:

-- Set operations on arrays
SELECT 
    customer_id,
    array_union(purchased_2023, purchased_2024) as all_purchases,
    array_intersect(purchased_2023, purchased_2024) as repeat_purchases,
    array_except(purchased_2024, purchased_2023) as new_purchases
FROM yearly_purchases;

The array_sort() function orders elements with optional comparators:

-- Sort arrays
SELECT 
    array_sort(product_ids) as sorted_ids,
    array_sort(prices, (left, right) -> 
        CASE 
            WHEN left < right THEN -1
            WHEN left > right THEN 1
            ELSE 0
        END
    ) as sorted_prices_asc,
    reverse(array_sort(dates)) as dates_desc
FROM products;

Position and Existence Functions

Finding elements within arrays uses array_position() and exists patterns:

-- Locate elements
SELECT 
    order_id,
    array_position(product_ids, 'PROD-123') as product_position,
    exists(tags, x -> x LIKE '%urgent%') as has_urgent_tag,
    forall(statuses, x -> x = 'completed') as all_completed
FROM orders;

The zip_with() function combines two arrays element-wise:

-- Combine parallel arrays
SELECT 
    zip_with(
        product_ids,
        quantities,
        (id, qty) -> struct(id as product, qty as quantity)
    ) as order_items,
    zip_with(
        prices,
        discounts,
        (price, disc) -> price * (1 - disc)
    ) as final_prices
FROM order_data;

Performance Considerations

Array functions execute within Spark’s Catalyst optimizer, generating optimized bytecode. Avoid exploding arrays when higher-order functions suffice:

-- Inefficient: explode and collect
SELECT customer_id, collect_list(price * 1.1)
FROM orders LATERAL VIEW explode(prices) as price
GROUP BY customer_id;

-- Efficient: transform directly
SELECT customer_id, transform(prices, x -> x * 1.1)
FROM orders;

For large arrays, consider partitioning strategies and broadcast joins. Array operations are memory-intensive; monitor executor memory when processing millions of elements per row. Use array_distinct() and filter() early in query plans to reduce data volume before expensive operations.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.