Spark Scala - UDF (User Defined Functions)
User Defined Functions (UDFs) in Spark let you extend the built-in function library with custom logic. When you need to apply business rules, complex string manipulations, or domain-specific...
Key Insights
- UDFs provide escape hatches when built-in Spark functions fall short, but they come with significant performance penalties because they bypass Catalyst optimization and require serialization overhead.
- Always handle nulls explicitly using Scala’s
Optiontype and pattern matching—Spark DataFrames frequently contain null values that will crash naive UDF implementations. - Test your UDF logic as pure Scala functions before registering them with Spark; this makes unit testing faster and debugging significantly easier.
Introduction to UDFs
User Defined Functions (UDFs) in Spark let you extend the built-in function library with custom logic. When you need to apply business rules, complex string manipulations, or domain-specific calculations that Spark’s 300+ built-in functions can’t handle, UDFs are your tool.
But here’s the uncomfortable truth: UDFs should be your last resort, not your first instinct. Every UDF you write:
- Breaks Catalyst optimization—Spark can’t look inside your function to optimize execution
- Requires data serialization between JVM and Spark’s internal format
- Prevents predicate pushdown and other query optimizations
Before writing a UDF, spend five minutes checking if a combination of built-in functions can solve your problem. You’ll often find they can, and your job will run faster.
Creating Basic UDFs
Let’s start with a practical example: formatting phone numbers. You have raw phone data like “5551234567” and need it formatted as “(555) 123-4567”.
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
val spark = SparkSession.builder()
.appName("UDF Examples")
.master("local[*]")
.getOrCreate()
import spark.implicits._
// Step 1: Define the Scala function
def formatPhoneNumber(phone: String): String = {
if (phone == null || phone.length != 10) {
phone // Return as-is if invalid
} else {
s"(${phone.substring(0, 3)}) ${phone.substring(3, 6)}-${phone.substring(6)}"
}
}
// Step 2: Convert to UDF
val formatPhoneUDF = udf(formatPhoneNumber _)
// Step 3: Use it
val phoneData = Seq(
("Alice", "5551234567"),
("Bob", "5559876543"),
("Charlie", "123")
).toDF("name", "phone")
phoneData
.withColumn("formatted_phone", formatPhoneUDF(col("phone")))
.show(false)
Output:
+-------+----------+----------------+
|name |phone |formatted_phone |
+-------+----------+----------------+
|Alice |5551234567|(555) 123-4567 |
|Bob |5559876543|(555) 987-6543 |
|Charlie|123 |123 |
+-------+----------+----------------+
The pattern is straightforward: write a regular Scala function, wrap it with udf(), and apply it to columns.
Registering UDFs for SQL and DataFrame API
Spark offers two registration approaches depending on how you want to use your UDF.
DataFrame API registration uses udf() and works with select(), withColumn(), and other DataFrame operations:
import org.apache.spark.sql.functions.udf
// DataFrame API registration
val formatPhoneUDF = udf(formatPhoneNumber _)
phoneData.select(
col("name"),
formatPhoneUDF(col("phone")).as("formatted")
)
SQL registration uses spark.udf.register() and makes the function available in SQL queries:
// SQL registration
spark.udf.register("format_phone", formatPhoneNumber _)
phoneData.createOrReplaceTempView("phones")
spark.sql("""
SELECT name, format_phone(phone) as formatted
FROM phones
""").show()
You can register the same function both ways. SQL registration is useful when you’re working with analysts who prefer SQL or when you’re building dynamic queries. DataFrame API registration is better for programmatic pipelines where you want compile-time type checking.
Handling Multiple Parameters and Complex Types
Real-world UDFs often need multiple columns and return complex structures. Here’s a UDF that calculates shipping costs based on weight and distance, returning a struct with breakdown details:
import org.apache.spark.sql.types._
// Define the return type for complex UDFs
case class ShippingCost(
baseCost: Double,
weightSurcharge: Double,
distanceSurcharge: Double,
total: Double
)
def calculateShipping(weightKg: Double, distanceKm: Double): ShippingCost = {
val baseCost = 5.0
val weightSurcharge = if (weightKg > 10) (weightKg - 10) * 0.5 else 0.0
val distanceSurcharge = distanceKm * 0.02
val total = baseCost + weightSurcharge + distanceSurcharge
ShippingCost(baseCost, weightSurcharge, distanceSurcharge, total)
}
// Register with explicit encoder for case class
import org.apache.spark.sql.Encoders
val shippingUDF = udf(calculateShipping _)
val orders = Seq(
("ORD001", 5.0, 100.0),
("ORD002", 15.0, 500.0),
("ORD003", 8.0, 50.0)
).toDF("order_id", "weight_kg", "distance_km")
orders
.withColumn("shipping", shippingUDF(col("weight_kg"), col("distance_km")))
.select(
col("order_id"),
col("shipping.total").as("shipping_cost"),
col("shipping.weightSurcharge").as("weight_fee")
)
.show()
For array processing, here’s a UDF that filters and transforms array elements:
// UDF that processes arrays
def filterAndUppercase(items: Seq[String], minLength: Int): Seq[String] = {
if (items == null) Seq.empty
else items.filter(_.length >= minLength).map(_.toUpperCase)
}
val arrayUDF = udf(filterAndUppercase _)
val tagData = Seq(
(1, Seq("scala", "spark", "ml", "ai")),
(2, Seq("java", "spring", "hibernate"))
).toDF("id", "tags")
tagData
.withColumn("filtered_tags", arrayUDF(col("tags"), lit(4)))
.show(false)
Null Handling and Type Safety
Null values are the silent killers of Spark jobs. Your UDF works perfectly in testing, then fails at 3 AM on production data because someone left a field empty. Here’s how to handle nulls properly:
// BAD: Will throw NullPointerException
def unsafeUppercase(s: String): String = s.toUpperCase
// GOOD: Null-safe with Option
def safeUppercase(s: String): Option[String] = {
Option(s).map(_.toUpperCase)
}
val safeUpperUDF = udf(safeUppercase _)
// BETTER: Explicit null handling with pattern matching
def robustTransform(value: String, prefix: String): String = {
(Option(value), Option(prefix)) match {
case (Some(v), Some(p)) => s"$p-${v.toUpperCase}"
case (Some(v), None) => v.toUpperCase
case (None, Some(p)) => s"$p-UNKNOWN"
case (None, None) => "UNKNOWN"
}
}
val robustUDF = udf(robustTransform _)
val messyData = Seq(
("valid", "PRE"),
(null, "PRE"),
("valid", null),
(null, null)
).toDF("value", "prefix")
messyData
.withColumn("result", robustUDF(col("value"), col("prefix")))
.show()
The Option wrapper combined with pattern matching makes your intent explicit and prevents runtime surprises.
Performance Optimization
Let’s quantify the UDF performance penalty. Here’s a benchmark comparing a UDF to the equivalent built-in function:
import org.apache.spark.sql.functions._
// Generate test data
val largeDF = spark.range(0, 10000000)
.withColumn("text", concat(lit("item_"), col("id")))
// UDF approach
def extractNumber(s: String): Long = {
s.split("_")(1).toLong
}
val extractUDF = udf(extractNumber _)
// Built-in function approach
// Uses split and cast - native Spark functions
// Benchmark UDF
val udfStart = System.currentTimeMillis()
largeDF
.withColumn("extracted", extractUDF(col("text")))
.write.format("noop").mode("overwrite").save()
val udfTime = System.currentTimeMillis() - udfStart
// Benchmark built-in
val builtinStart = System.currentTimeMillis()
largeDF
.withColumn("extracted", split(col("text"), "_").getItem(1).cast("long"))
.write.format("noop").mode("overwrite").save()
val builtinTime = System.currentTimeMillis() - builtinStart
println(s"UDF time: ${udfTime}ms")
println(s"Built-in time: ${builtinTime}ms")
println(s"UDF is ${udfTime.toDouble / builtinTime}x slower")
On my machine, the UDF consistently runs 2-4x slower than the built-in equivalent. For simple operations, that overhead adds up across billions of rows.
When you must use UDFs, consider these optimizations:
- Broadcast small lookup data rather than joining inside UDFs
- Batch operations where possible—process arrays instead of row-by-row
- Avoid object creation inside UDFs; reuse builders and buffers
Testing and Best Practices
Since UDFs are just Scala functions, test them without Spark. This makes tests fast and debugging straightforward:
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers
class PhoneFormatterTest extends AnyFunSuite with Matchers {
test("formats valid 10-digit phone number") {
formatPhoneNumber("5551234567") shouldBe "(555) 123-4567"
}
test("returns original for invalid length") {
formatPhoneNumber("123") shouldBe "123"
formatPhoneNumber("12345678901") shouldBe "12345678901"
}
test("handles null input") {
formatPhoneNumber(null) shouldBe null
}
test("handles edge cases") {
formatPhoneNumber("0000000000") shouldBe "(000) 000-0000"
}
}
class ShippingCalculatorTest extends AnyFunSuite with Matchers {
test("calculates base cost for light, local package") {
val result = calculateShipping(5.0, 0.0)
result.baseCost shouldBe 5.0
result.weightSurcharge shouldBe 0.0
result.total shouldBe 5.0
}
test("applies weight surcharge over 10kg") {
val result = calculateShipping(15.0, 0.0)
result.weightSurcharge shouldBe 2.5 // (15-10) * 0.5
}
}
Best practices summary:
- Document your UDFs with scaladoc explaining inputs, outputs, and null behavior
- Version your UDFs when business logic changes; don’t silently modify behavior
- Log sparingly inside UDFs—you’ll generate a log line per row
- Avoid side effects—UDFs should be pure functions
- Consider determinism—mark non-deterministic UDFs with
.asNondeterministic()so Spark doesn’t cache results incorrectly
UDFs are powerful tools, but they’re also escape hatches from Spark’s optimization engine. Use them when necessary, test them thoroughly, and always check if a built-in function can do the job first.