Spark Streaming - Rate Source for Testing

The rate source is a built-in streaming source in Spark Structured Streaming that generates rows at a specified rate. Unlike file-based or socket sources, it requires no external setup and produces...

Key Insights

  • The rate source generates synthetic data at configurable rows per second, making it ideal for testing Spark Structured Streaming pipelines without external dependencies
  • Rate source provides built-in timestamp and value columns, allowing you to validate windowing operations, watermarking, and stateful aggregations in a controlled environment
  • By combining rate source with custom transformations and multiple streams, you can simulate complex real-world scenarios including late data, schema evolution, and join operations

Understanding the Rate Source

The rate source is a built-in streaming source in Spark Structured Streaming that generates rows at a specified rate. Unlike file-based or socket sources, it requires no external setup and produces deterministic output, making it perfect for unit testing, benchmarking, and prototyping streaming applications.

Each generated row contains two columns: timestamp (the time when the row was generated) and value (a monotonically increasing long starting from 0). This simple schema provides everything needed to test time-based operations.

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, window, count

spark = SparkSession.builder \
    .appName("RateSourceBasic") \
    .master("local[*]") \
    .getOrCreate()

# Create a basic rate source stream
rate_stream = spark.readStream \
    .format("rate") \
    .option("rowsPerSecond", 10) \
    .load()

# Display schema
rate_stream.printSchema()
# root
#  |-- timestamp: timestamp (nullable = true)
#  |-- value: long (nullable = true)

# Write to console for testing
query = rate_stream.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

query.awaitTermination(10)
query.stop()

Configuring Rate Source Parameters

The rate source accepts several options that control data generation behavior. Understanding these parameters helps you simulate different load patterns and test edge cases.

# Advanced rate source configuration
rate_stream = spark.readStream \
    .format("rate") \
    .option("rowsPerSecond", 100) \
    .option("rampUpTime", "10s") \
    .option("numPartitions", 4) \
    .load()

# rowsPerSecond: Target rate of row generation (default: 1)
# rampUpTime: Time to reach target rate from 0 (default: 0s)
# numPartitions: Number of partitions for parallelism (default: Spark's default parallelism)

The rampUpTime parameter is particularly useful for testing how your pipeline handles gradually increasing load. The numPartitions option affects parallelism and can help identify bottlenecks in distributed processing.

// Scala equivalent with explicit configuration
val rateStream = spark.readStream
  .format("rate")
  .option("rowsPerSecond", "1000")
  .option("rampUpTime", "30s")
  .option("numPartitions", "8")
  .load()

// Test throughput with high-volume generation
val aggregated = rateStream
  .groupBy(window(col("timestamp"), "10 seconds"))
  .count()

val query = aggregated.writeStream
  .outputMode("update")
  .format("console")
  .option("truncate", "false")
  .start()

Testing Windowing and Aggregations

Rate source excels at testing time-based operations. You can validate windowing logic, tumbling and sliding windows, and watermark configurations without dealing with actual event sources.

from pyspark.sql.functions import window, count, avg, max as spark_max

rate_stream = spark.readStream \
    .format("rate") \
    .option("rowsPerSecond", 50) \
    .load()

# Tumbling window aggregation
tumbling_window = rate_stream \
    .groupBy(window(col("timestamp"), "5 seconds")) \
    .agg(
        count("*").alias("record_count"),
        spark_max("value").alias("max_value"),
        avg("value").alias("avg_value")
    )

query = tumbling_window.writeStream \
    .outputMode("complete") \
    .format("console") \
    .option("truncate", "false") \
    .start()

query.awaitTermination(30)
query.stop()

Testing sliding windows with overlapping intervals:

# Sliding window: 10-second window, sliding every 5 seconds
sliding_window = rate_stream \
    .groupBy(window(col("timestamp"), "10 seconds", "5 seconds")) \
    .agg(count("*").alias("count"))

query = sliding_window.writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

Implementing Watermarking Tests

Watermarking is critical for handling late-arriving data in production streaming applications. The rate source provides a controlled environment to verify watermark behavior.

from pyspark.sql.functions import expr

rate_stream = spark.readStream \
    .format("rate") \
    .option("rowsPerSecond", 20) \
    .load()

# Add watermark to handle late data
watermarked_stream = rate_stream \
    .withWatermark("timestamp", "10 seconds") \
    .groupBy(window(col("timestamp"), "5 seconds")) \
    .count()

query = watermarked_stream.writeStream \
    .outputMode("append") \
    .format("console") \
    .option("truncate", "false") \
    .start()

query.awaitTermination(30)
query.stop()

To simulate late data, add artificial delays to timestamps:

from pyspark.sql.functions import current_timestamp, expr

# Create stream with delayed timestamps
delayed_stream = rate_stream \
    .withColumn("original_timestamp", col("timestamp")) \
    .withColumn("timestamp", expr("timestamp - INTERVAL 15 SECONDS"))

# Test watermark with late data
watermarked_delayed = delayed_stream \
    .withWatermark("timestamp", "10 seconds") \
    .groupBy(window(col("timestamp"), "5 seconds")) \
    .agg(
        count("*").alias("count"),
        spark_max("original_timestamp").alias("latest_event_time")
    )

query = watermarked_delayed.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

Testing Stream Joins

Rate source simplifies testing stream-stream and stream-static joins. You can create multiple rate streams with different characteristics to simulate various join scenarios.

# Create two rate streams with different rates
stream1 = spark.readStream \
    .format("rate") \
    .option("rowsPerSecond", 10) \
    .load() \
    .withColumnRenamed("value", "stream1_value")

stream2 = spark.readStream \
    .format("rate") \
    .option("rowsPerSecond", 15) \
    .load() \
    .withColumnRenamed("value", "stream2_value")

# Join streams on timestamp window
joined_stream = stream1 \
    .withWatermark("timestamp", "5 seconds") \
    .join(
        stream2.withWatermark("timestamp", "5 seconds"),
        expr("""
            stream1.timestamp >= stream2.timestamp AND
            stream1.timestamp <= stream2.timestamp + INTERVAL 2 SECONDS
        """),
        "inner"
    )

query = joined_stream.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

Creating Realistic Test Data

Transform the basic rate source output into domain-specific test data using SQL expressions and UDFs.

from pyspark.sql.functions import expr, array, rand, when

# Generate synthetic e-commerce events
ecommerce_stream = spark.readStream \
    .format("rate") \
    .option("rowsPerSecond", 50) \
    .load() \
    .withColumn("user_id", (col("value") % 1000).cast("int")) \
    .withColumn("product_id", (rand() * 100).cast("int")) \
    .withColumn("event_type", 
        expr("CASE WHEN rand() < 0.6 THEN 'view' " +
             "WHEN rand() < 0.85 THEN 'add_to_cart' " +
             "ELSE 'purchase' END")) \
    .withColumn("amount", 
        when(col("event_type") == "purchase", (rand() * 500).cast("decimal(10,2)"))) \
    .select("timestamp", "user_id", "product_id", "event_type", "amount")

# Aggregate by user and event type
user_activity = ecommerce_stream \
    .withWatermark("timestamp", "1 minute") \
    .groupBy(
        window(col("timestamp"), "30 seconds"),
        col("user_id"),
        col("event_type")
    ) \
    .count()

query = user_activity.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

Performance Testing and Benchmarking

Use rate source to stress-test your streaming pipeline and identify performance bottlenecks.

# High-volume test configuration
high_volume_stream = spark.readStream \
    .format("rate") \
    .option("rowsPerSecond", 10000) \
    .option("numPartitions", 16) \
    .load()

# Complex transformation pipeline
processed_stream = high_volume_stream \
    .withColumn("partition_key", col("value") % 100) \
    .withColumn("category", expr("CASE WHEN value % 3 = 0 THEN 'A' " +
                                  "WHEN value % 3 = 1 THEN 'B' ELSE 'C' END")) \
    .groupBy(
        window(col("timestamp"), "10 seconds"),
        col("category")
    ) \
    .agg(
        count("*").alias("count"),
        avg("value").alias("avg_value")
    )

# Monitor processing metrics
query = processed_stream.writeStream \
    .outputMode("update") \
    .format("memory") \
    .queryName("performance_test") \
    .start()

# Check progress after running
import time
time.sleep(60)
print(query.lastProgress)
query.stop()

The rate source provides a zero-setup testing environment for Spark Structured Streaming applications. By mastering its configuration options and combining it with transformations, you can validate complex streaming logic, test edge cases, and benchmark performance without depending on external data sources. This approach accelerates development cycles and increases confidence in production deployments.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.