Spark SQL - Array Functions
• Spark SQL provides 50+ array functions that enable complex data transformations without UDFs, significantly improving performance through Catalyst optimizer integration and whole-stage code...
Key Insights
• Spark SQL provides 50+ array functions that enable complex data transformations without UDFs, significantly improving performance through Catalyst optimizer integration and whole-stage code generation.
• Functions like array_agg, flatten, and transform replace expensive explode-join-collect patterns, reducing shuffle operations and memory overhead in distributed processing.
• Combining higher-order functions (transform, filter, aggregate) with lambda expressions enables functional programming paradigms directly in SQL, eliminating the need for custom Python or Scala code.
Core Array Construction and Manipulation
Spark SQL arrays form the foundation for handling nested data structures. The array() function creates arrays from columns or literals, while array_repeat() and sequence() generate arrays programmatically.
-- Create arrays from columns
SELECT
array(col1, col2, col3) as combined,
array_repeat('default', 5) as defaults,
sequence(1, 10, 2) as odd_numbers
FROM source_table;
-- Array from date ranges
SELECT
sequence(
to_date('2024-01-01'),
to_date('2024-12-31'),
interval 1 month
) as monthly_dates;
The split() function converts delimited strings into arrays, essential for parsing CSV-like data within fields:
SELECT
order_id,
split(product_codes, ',') as product_array,
split(tags, ';') as tag_array
FROM orders
WHERE product_codes IS NOT NULL;
Reverse operations use concat_ws() to join array elements back into strings:
SELECT
concat_ws('|', product_array) as pipe_delimited,
array_join(tag_array, ', ') as comma_separated
FROM parsed_orders;
Array Aggregation and Flattening
The collect_list() and collect_set() aggregation functions group values into arrays, replacing complex grouping patterns:
-- Collect all purchases per customer
SELECT
customer_id,
collect_list(product_id) as all_purchases,
collect_set(category) as unique_categories,
count(*) as transaction_count
FROM transactions
GROUP BY customer_id;
For nested arrays, flatten() reduces dimensionality by one level:
-- Flatten nested product arrays
SELECT
customer_id,
flatten(
collect_list(array(product_id, related_product_id))
) as all_product_ids
FROM customer_products
GROUP BY customer_id;
-- Multiple levels require chaining
SELECT flatten(flatten(deeply_nested_array)) as flat_array
FROM complex_data;
The array_distinct() function removes duplicates after flattening:
SELECT
customer_id,
array_distinct(flatten(collect_list(tags))) as unique_tags
FROM customer_interactions
GROUP BY customer_id;
Array Access and Slicing
Array indexing in Spark SQL is 1-based, unlike many programming languages. The element_at() function provides safe access with null handling:
-- Access array elements
SELECT
product_array[1] as first_product, -- 1-based indexing
element_at(product_array, -1) as last_product, -- Negative indexing
element_at(product_array, 10) as maybe_null -- Returns null if out of bounds
FROM orders;
The slice() function extracts subarrays:
-- Get first 3 and last 3 elements
SELECT
order_id,
slice(product_array, 1, 3) as top_three,
slice(product_array, -3, 3) as bottom_three
FROM orders;
Array size and existence checks prevent null pointer exceptions:
SELECT
size(product_array) as product_count,
array_contains(categories, 'Electronics') as has_electronics,
arrays_overlap(purchased, recommended) as has_recommendations
FROM customer_data
WHERE size(product_array) > 0; -- Filter empty arrays
Higher-Order Functions with Transform
The transform() function applies lambda expressions to each array element, enabling map operations without exploding:
-- Apply transformations to array elements
SELECT
transform(prices, x -> x * 1.1) as prices_with_tax,
transform(product_names, x -> upper(x)) as uppercase_names,
transform(dates, x -> date_add(x, 7)) as next_week_dates
FROM products;
Complex transformations access both element and index:
-- Add index-based calculations
SELECT
transform(
sequence(1, 12),
(month, idx) -> struct(
month as month_num,
idx as position,
month * revenue_base as projected_revenue
)
) as monthly_projections
FROM revenue_forecast;
Nested transformations handle multi-dimensional arrays:
-- Transform nested structures
SELECT
transform(
order_lines,
line -> transform(
line.items,
item -> item.price * item.quantity
)
) as line_totals
FROM orders;
Filtering and Aggregating Arrays
The filter() function selects array elements matching predicates:
-- Filter array elements
SELECT
customer_id,
filter(transactions, x -> x.amount > 100) as large_transactions,
filter(products, x -> x.category = 'Electronics') as electronics,
filter(dates, x -> year(x) = 2024) as current_year_dates
FROM customer_history;
Combining filter() with size() creates conditional metrics:
SELECT
customer_id,
size(filter(orders, x -> x.status = 'completed')) as completed_count,
size(filter(orders, x -> x.status = 'cancelled')) as cancelled_count,
size(filter(orders, x -> x.total > 500)) as high_value_count
FROM customer_orders;
The aggregate() function reduces arrays to single values using accumulator patterns:
-- Sum array elements
SELECT
order_id,
aggregate(
quantities,
0, -- Initial value
(acc, x) -> acc + x -- Accumulator function
) as total_quantity,
aggregate(
prices,
0.0,
(acc, x) -> acc + x,
acc -> round(acc, 2) -- Finish function
) as total_price
FROM order_details;
Complex aggregations implement custom logic:
-- Find maximum with custom comparison
SELECT
aggregate(
product_scores,
struct(0 as max_score, '' as product_id),
(acc, x) -> CASE
WHEN x.score > acc.max_score
THEN struct(x.score as max_score, x.id as product_id)
ELSE acc
END
) as best_product
FROM product_rankings;
Array Comparison and Set Operations
Spark SQL provides set operations for array comparison:
-- Set operations on arrays
SELECT
customer_id,
array_union(purchased_2023, purchased_2024) as all_purchases,
array_intersect(purchased_2023, purchased_2024) as repeat_purchases,
array_except(purchased_2024, purchased_2023) as new_purchases
FROM yearly_purchases;
The array_sort() function orders elements with optional comparators:
-- Sort arrays
SELECT
array_sort(product_ids) as sorted_ids,
array_sort(prices, (left, right) ->
CASE
WHEN left < right THEN -1
WHEN left > right THEN 1
ELSE 0
END
) as sorted_prices_asc,
reverse(array_sort(dates)) as dates_desc
FROM products;
Position and Existence Functions
Finding elements within arrays uses array_position() and exists patterns:
-- Locate elements
SELECT
order_id,
array_position(product_ids, 'PROD-123') as product_position,
exists(tags, x -> x LIKE '%urgent%') as has_urgent_tag,
forall(statuses, x -> x = 'completed') as all_completed
FROM orders;
The zip_with() function combines two arrays element-wise:
-- Combine parallel arrays
SELECT
zip_with(
product_ids,
quantities,
(id, qty) -> struct(id as product, qty as quantity)
) as order_items,
zip_with(
prices,
discounts,
(price, disc) -> price * (1 - disc)
) as final_prices
FROM order_data;
Performance Considerations
Array functions execute within Spark’s Catalyst optimizer, generating optimized bytecode. Avoid exploding arrays when higher-order functions suffice:
-- Inefficient: explode and collect
SELECT customer_id, collect_list(price * 1.1)
FROM orders LATERAL VIEW explode(prices) as price
GROUP BY customer_id;
-- Efficient: transform directly
SELECT customer_id, transform(prices, x -> x * 1.1)
FROM orders;
For large arrays, consider partitioning strategies and broadcast joins. Array operations are memory-intensive; monitor executor memory when processing millions of elements per row. Use array_distinct() and filter() early in query plans to reduce data volume before expensive operations.