Apache Spark - Elasticsearch Connector

The Elasticsearch-Hadoop connector provides native integration between Spark and Elasticsearch. Add the dependency matching your Elasticsearch version to your build configuration.

Key Insights

  • The Elasticsearch-Spark connector enables bidirectional data flow between Spark and Elasticsearch, supporting batch and streaming operations with built-in partitioning and pushdown optimization
  • Connector configuration requires careful tuning of batch sizes, parallelism, and network settings to avoid overwhelming Elasticsearch clusters while maintaining acceptable write throughput
  • Schema mapping between Spark DataFrames and Elasticsearch documents happens automatically but requires explicit handling for nested structures, arrays, and timestamp formats to prevent data corruption

Setting Up the Connector

The Elasticsearch-Hadoop connector provides native integration between Spark and Elasticsearch. Add the dependency matching your Elasticsearch version to your build configuration.

For Maven:

<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch-spark-30_2.12</artifactId>
    <version>8.11.0</version>
</dependency>

For SBT:

libraryDependencies += "org.elasticsearch" %% "elasticsearch-spark-30" % "8.11.0"

Initialize your Spark session with Elasticsearch configuration:

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("ElasticsearchIntegration") \
    .config("spark.es.nodes", "localhost") \
    .config("spark.es.port", "9200") \
    .config("spark.es.nodes.wan.only", "true") \
    .config("spark.es.net.http.auth.user", "elastic") \
    .config("spark.es.net.http.auth.pass", "password") \
    .getOrCreate()

Writing Data to Elasticsearch

The connector supports multiple write patterns. The simplest approach uses the DataFrame API with format specification.

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
from datetime import datetime

# Create sample data
data = [
    ("user_001", "John Doe", 28, datetime(2024, 1, 15, 10, 30)),
    ("user_002", "Jane Smith", 34, datetime(2024, 1, 15, 11, 45)),
    ("user_003", "Bob Johnson", 45, datetime(2024, 1, 15, 12, 15))
]

schema = StructType([
    StructField("user_id", StringType(), False),
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("created_at", TimestampType(), True)
])

df = spark.createDataFrame(data, schema)

# Write to Elasticsearch
df.write \
    .format("org.elasticsearch.spark.sql") \
    .option("es.resource", "users/_doc") \
    .option("es.mapping.id", "user_id") \
    .option("es.write.operation", "upsert") \
    .option("es.batch.size.entries", "1000") \
    .option("es.batch.size.bytes", "1mb") \
    .mode("append") \
    .save()

The es.mapping.id option specifies which field becomes the document ID in Elasticsearch. Without this, Elasticsearch generates random IDs, preventing proper upsert operations.

Reading Data from Elasticsearch

Reading from Elasticsearch creates a distributed RDD or DataFrame with automatic partitioning based on index shards.

# Read entire index
users_df = spark.read \
    .format("org.elasticsearch.spark.sql") \
    .option("es.resource", "users") \
    .option("es.read.field.as.array.include", "tags") \
    .load()

users_df.show()
users_df.printSchema()

# Query with Elasticsearch DSL
query = """{
    "query": {
        "range": {
            "age": {
                "gte": 30,
                "lte": 50
            }
        }
    }
}"""

filtered_df = spark.read \
    .format("org.elasticsearch.spark.sql") \
    .option("es.resource", "users") \
    .option("es.query", query) \
    .load()

Query pushdown significantly improves performance by filtering data at the Elasticsearch level before transferring to Spark executors.

Handling Complex Data Types

Nested structures and arrays require explicit schema handling to maintain data integrity.

from pyspark.sql.types import ArrayType, MapType
from pyspark.sql.functions import struct, array, create_map, lit

# Complex schema with nested objects
complex_schema = StructType([
    StructField("order_id", StringType(), False),
    StructField("customer", StructType([
        StructField("id", StringType(), True),
        StructField("email", StringType(), True)
    ]), True),
    StructField("items", ArrayType(StructType([
        StructField("product_id", StringType(), True),
        StructField("quantity", IntegerType(), True),
        StructField("price", FloatType(), True)
    ])), True),
    StructField("metadata", MapType(StringType(), StringType()), True)
])

complex_data = [(
    "order_001",
    ("cust_123", "customer@example.com"),
    [
        ("prod_001", 2, 29.99),
        ("prod_002", 1, 49.99)
    ],
    {"source": "web", "campaign": "spring_sale"}
)]

complex_df = spark.createDataFrame(complex_data, complex_schema)

complex_df.write \
    .format("org.elasticsearch.spark.sql") \
    .option("es.resource", "orders/_doc") \
    .option("es.mapping.id", "order_id") \
    .option("es.mapping.exclude", "metadata.internal_field") \
    .mode("append") \
    .save()

Performance Optimization

Tuning connector parameters directly impacts throughput and cluster stability.

# Optimized configuration for bulk writes
optimized_config = {
    "es.batch.size.entries": "5000",
    "es.batch.size.bytes": "10mb",
    "es.batch.write.refresh": "false",
    "es.batch.write.retry.count": "3",
    "es.batch.write.retry.wait": "10s",
    "es.http.timeout": "5m",
    "es.http.retries": "3",
    "es.scroll.size": "2000",
    "es.action.heart.beat.lead": "15s"
}

# Apply configuration
for key, value in optimized_config.items():
    spark.conf.set(key, value)

# Repartition before writing for better parallelism
df.repartition(20) \
    .write \
    .format("org.elasticsearch.spark.sql") \
    .option("es.resource", "large_dataset/_doc") \
    .mode("append") \
    .save()

Monitor Elasticsearch cluster metrics during writes. If you see rejected requests or high JVM memory pressure, reduce es.batch.size.entries and increase es.batch.write.retry.wait.

Streaming Integration

The connector supports Spark Structured Streaming for real-time indexing.

from pyspark.sql.functions import from_json, col

# Kafka to Elasticsearch streaming pipeline
kafka_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "events") \
    .load()

event_schema = StructType([
    StructField("event_id", StringType(), True),
    StructField("event_type", StringType(), True),
    StructField("timestamp", TimestampType(), True),
    StructField("payload", MapType(StringType(), StringType()), True)
])

parsed_df = kafka_df.select(
    from_json(col("value").cast("string"), event_schema).alias("data")
).select("data.*")

query = parsed_df.writeStream \
    .format("org.elasticsearch.spark.sql") \
    .option("es.resource", "events/_doc") \
    .option("es.mapping.id", "event_id") \
    .option("checkpointLocation", "/tmp/checkpoint") \
    .outputMode("append") \
    .start()

query.awaitTermination()

Index Management and Mapping

Explicitly define Elasticsearch mappings before bulk writes to prevent dynamic mapping conflicts.

from elasticsearch import Elasticsearch

es_client = Elasticsearch(["http://localhost:9200"])

index_mapping = {
    "mappings": {
        "properties": {
            "user_id": {"type": "keyword"},
            "name": {"type": "text"},
            "age": {"type": "integer"},
            "created_at": {"type": "date"},
            "tags": {"type": "keyword"},
            "description": {
                "type": "text",
                "fields": {
                    "keyword": {"type": "keyword", "ignore_above": 256}
                }
            }
        }
    },
    "settings": {
        "number_of_shards": 5,
        "number_of_replicas": 1,
        "refresh_interval": "30s"
    }
}

es_client.indices.create(index="users", body=index_mapping, ignore=400)

Set refresh_interval to -1 during bulk loads, then restore it afterward to improve indexing performance by 30-40%.

Error Handling and Monitoring

Implement robust error handling for production deployments.

from pyspark.sql.utils import AnalysisException

try:
    df.write \
        .format("org.elasticsearch.spark.sql") \
        .option("es.resource", "users/_doc") \
        .option("es.write.operation", "upsert") \
        .option("es.mapping.id", "user_id") \
        .option("es.spark.dataframe.write.null", "true") \
        .mode("append") \
        .save()
except AnalysisException as e:
    print(f"Schema mismatch or configuration error: {e}")
except Exception as e:
    print(f"Write failed: {e}")
    # Implement retry logic or dead letter queue

The Elasticsearch-Spark connector provides production-ready integration for analytical workloads. Proper configuration of batch sizes, parallelism, and error handling ensures reliable data pipelines at scale.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.