PySpark - Write to Kafka with Structured Streaming

PySpark Structured Streaming treats Kafka as a structured data sink, requiring DataFrames to conform to a specific schema. The Kafka sink expects at minimum a `value` column containing the message...

Key Insights

  • PySpark Structured Streaming provides native Kafka integration through the kafka format, enabling real-time data pipelines with exactly-once semantics when properly configured
  • Writing to Kafka requires serializing DataFrame columns into key-value byte arrays, with careful schema management to ensure data integrity across the streaming pipeline
  • Production deployments demand attention to checkpoint management, trigger intervals, and Kafka producer configurations to balance throughput, latency, and fault tolerance

Understanding the Kafka Sink Architecture

PySpark Structured Streaming treats Kafka as a structured data sink, requiring DataFrames to conform to a specific schema. The Kafka sink expects at minimum a value column containing the message payload, with optional key, topic, partition, and headers columns for advanced routing and partitioning strategies.

The streaming engine manages offset commits automatically when using checkpoints, providing fault tolerance and exactly-once delivery guarantees. This differs from batch writing where you control commit behavior explicitly.

from pyspark.sql import SparkSession
from pyspark.sql.functions import to_json, struct, col

spark = SparkSession.builder \
    .appName("KafkaStructuredStreaming") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0") \
    .getOrCreate()

# Create a streaming DataFrame from a source
input_df = spark.readStream \
    .format("rate") \
    .option("rowsPerSecond", 10) \
    .load()

# Transform to Kafka-compatible schema
kafka_df = input_df.select(
    col("value").cast("string").alias("key"),
    to_json(struct("timestamp", "value")).alias("value")
)

Basic Kafka Write Operations

The simplest Kafka write operation requires specifying the bootstrap servers and target topic. The DataFrame must contain a value column with binary or string data representing the message payload.

query = kafka_df.writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("topic", "streaming-output") \
    .option("checkpointLocation", "/tmp/checkpoint/kafka-sink") \
    .start()

query.awaitTermination()

For dynamic topic routing based on data content, omit the static topic option and include a topic column in your DataFrame:

from pyspark.sql.functions import lit, when

# Route messages to different topics based on value
dynamic_kafka_df = input_df.select(
    when(col("value") % 2 == 0, "even-numbers")
        .otherwise("odd-numbers").alias("topic"),
    col("value").cast("string").alias("key"),
    to_json(struct("timestamp", "value")).alias("value")
)

query = dynamic_kafka_df.writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("checkpointLocation", "/tmp/checkpoint/dynamic-topics") \
    .start()

Serialization and Schema Management

Kafka messages are byte arrays, requiring explicit serialization of structured data. The to_json function converts complex types to JSON strings, while binary formats like Avro or Protobuf require custom serialization logic.

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
from pyspark.sql.functions import to_json, struct, current_timestamp

# Define a complex schema
event_schema = StructType([
    StructField("event_id", StringType(), False),
    StructField("user_id", IntegerType(), False),
    StructField("action", StringType(), False),
    StructField("metadata", StringType(), True)
])

# Simulate streaming events
events_df = spark.readStream \
    .format("rate") \
    .option("rowsPerSecond", 100) \
    .load() \
    .selectExpr(
        "CAST(value AS STRING) as event_id",
        "CAST(value % 1000 AS INT) as user_id",
        "'page_view' as action",
        "CAST(timestamp AS STRING) as metadata"
    )

# Serialize entire row as JSON value
kafka_events = events_df.select(
    col("event_id").alias("key"),
    to_json(struct(
        col("event_id"),
        col("user_id"),
        col("action"),
        col("metadata"),
        current_timestamp().alias("processed_at")
    )).alias("value")
)

query = kafka_events.writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("topic", "user-events") \
    .option("checkpointLocation", "/tmp/checkpoint/events") \
    .outputMode("append") \
    .start()

Advanced Kafka Producer Configuration

Production environments require tuning Kafka producer settings for performance, reliability, and security. Pass Kafka-specific configurations using the kafka. prefix.

query = kafka_df.writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "broker1:9092,broker2:9092,broker3:9092") \
    .option("topic", "production-events") \
    .option("checkpointLocation", "/mnt/checkpoints/prod-kafka") \
    .option("kafka.acks", "all") \
    .option("kafka.retries", "3") \
    .option("kafka.compression.type", "snappy") \
    .option("kafka.max.in.flight.requests.per.connection", "1") \
    .option("kafka.enable.idempotence", "true") \
    .option("kafka.batch.size", "32768") \
    .option("kafka.linger.ms", "100") \
    .option("kafka.buffer.memory", "67108864") \
    .start()

For secured Kafka clusters, configure SASL and SSL properties:

security_options = {
    "kafka.bootstrap.servers": "secure-broker:9093",
    "kafka.security.protocol": "SASL_SSL",
    "kafka.sasl.mechanism": "PLAIN",
    "kafka.sasl.jaas.config": 'org.apache.kafka.common.security.plain.PlainLoginModule required username="user" password="pass";',
    "kafka.ssl.truststore.location": "/path/to/truststore.jks",
    "kafka.ssl.truststore.password": "truststore-password"
}

query = kafka_df.writeStream \
    .format("kafka") \
    .options(**security_options) \
    .option("topic", "secure-topic") \
    .option("checkpointLocation", "/mnt/checkpoints/secure") \
    .start()

Trigger Strategies and Output Modes

Structured Streaming supports multiple trigger types controlling when micro-batches execute. The trigger strategy directly impacts latency and throughput characteristics.

# Continuous processing with 1-second checkpoints (low latency)
query_continuous = kafka_df.writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("topic", "low-latency-stream") \
    .option("checkpointLocation", "/tmp/checkpoint/continuous") \
    .trigger(continuous="1 second") \
    .start()

# Fixed interval micro-batches
query_interval = kafka_df.writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("topic", "interval-stream") \
    .option("checkpointLocation", "/tmp/checkpoint/interval") \
    .trigger(processingTime="10 seconds") \
    .start()

# Process available data then stop
query_once = kafka_df.writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("topic", "batch-stream") \
    .option("checkpointLocation", "/tmp/checkpoint/once") \
    .trigger(once=True) \
    .start()

The Kafka sink only supports append output mode since Kafka is an append-only log. Attempting to use complete or update modes results in runtime errors.

Partitioning and Message Keys

Kafka uses message keys for partition assignment and compaction. Proper key selection ensures balanced partition distribution and enables log compaction for state management.

from pyspark.sql.functions import hash, abs

# Hash-based partitioning for even distribution
partitioned_df = events_df.select(
    abs(hash(col("user_id")) % 10).cast("string").alias("partition"),
    col("user_id").cast("string").alias("key"),
    to_json(struct("*")).alias("value")
)

query = partitioned_df.writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("topic", "partitioned-events") \
    .option("checkpointLocation", "/tmp/checkpoint/partitioned") \
    .start()

For explicit partition control, include a partition column with integer values corresponding to Kafka partition numbers:

# Assign specific partitions based on business logic
explicit_partition_df = events_df.select(
    (col("user_id") % 5).cast("integer").alias("partition"),
    col("user_id").cast("string").alias("key"),
    to_json(struct("*")).alias("value")
)

Error Handling and Monitoring

Monitor streaming queries using Spark’s StreamingQuery interface and implement proper error handling for production resilience.

query = kafka_df.writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("topic", "monitored-stream") \
    .option("checkpointLocation", "/tmp/checkpoint/monitored") \
    .start()

# Monitor query progress
while query.isActive:
    progress = query.lastProgress
    if progress:
        print(f"Batch: {progress['batchId']}")
        print(f"Input rows: {progress['numInputRows']}")
        print(f"Processing rate: {progress['processedRowsPerSecond']}")
    query.awaitTermination(timeout=30)

# Graceful shutdown
query.stop()

Configure checkpointing to reliable storage like HDFS or S3 for production fault tolerance. Checkpoint directories contain offset information and metadata required for exactly-once processing guarantees.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.