Apache Spark - Elasticsearch Connector
The Elasticsearch-Hadoop connector provides native integration between Spark and Elasticsearch. Add the dependency matching your Elasticsearch version to your build configuration.
Key Insights
- The Elasticsearch-Spark connector enables bidirectional data flow between Spark and Elasticsearch, supporting batch and streaming operations with built-in partitioning and pushdown optimization
- Connector configuration requires careful tuning of batch sizes, parallelism, and network settings to avoid overwhelming Elasticsearch clusters while maintaining acceptable write throughput
- Schema mapping between Spark DataFrames and Elasticsearch documents happens automatically but requires explicit handling for nested structures, arrays, and timestamp formats to prevent data corruption
Setting Up the Connector
The Elasticsearch-Hadoop connector provides native integration between Spark and Elasticsearch. Add the dependency matching your Elasticsearch version to your build configuration.
For Maven:
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark-30_2.12</artifactId>
<version>8.11.0</version>
</dependency>
For SBT:
libraryDependencies += "org.elasticsearch" %% "elasticsearch-spark-30" % "8.11.0"
Initialize your Spark session with Elasticsearch configuration:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("ElasticsearchIntegration") \
.config("spark.es.nodes", "localhost") \
.config("spark.es.port", "9200") \
.config("spark.es.nodes.wan.only", "true") \
.config("spark.es.net.http.auth.user", "elastic") \
.config("spark.es.net.http.auth.pass", "password") \
.getOrCreate()
Writing Data to Elasticsearch
The connector supports multiple write patterns. The simplest approach uses the DataFrame API with format specification.
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
from datetime import datetime
# Create sample data
data = [
("user_001", "John Doe", 28, datetime(2024, 1, 15, 10, 30)),
("user_002", "Jane Smith", 34, datetime(2024, 1, 15, 11, 45)),
("user_003", "Bob Johnson", 45, datetime(2024, 1, 15, 12, 15))
]
schema = StructType([
StructField("user_id", StringType(), False),
StructField("name", StringType(), True),
StructField("age", IntegerType(), True),
StructField("created_at", TimestampType(), True)
])
df = spark.createDataFrame(data, schema)
# Write to Elasticsearch
df.write \
.format("org.elasticsearch.spark.sql") \
.option("es.resource", "users/_doc") \
.option("es.mapping.id", "user_id") \
.option("es.write.operation", "upsert") \
.option("es.batch.size.entries", "1000") \
.option("es.batch.size.bytes", "1mb") \
.mode("append") \
.save()
The es.mapping.id option specifies which field becomes the document ID in Elasticsearch. Without this, Elasticsearch generates random IDs, preventing proper upsert operations.
Reading Data from Elasticsearch
Reading from Elasticsearch creates a distributed RDD or DataFrame with automatic partitioning based on index shards.
# Read entire index
users_df = spark.read \
.format("org.elasticsearch.spark.sql") \
.option("es.resource", "users") \
.option("es.read.field.as.array.include", "tags") \
.load()
users_df.show()
users_df.printSchema()
# Query with Elasticsearch DSL
query = """{
"query": {
"range": {
"age": {
"gte": 30,
"lte": 50
}
}
}
}"""
filtered_df = spark.read \
.format("org.elasticsearch.spark.sql") \
.option("es.resource", "users") \
.option("es.query", query) \
.load()
Query pushdown significantly improves performance by filtering data at the Elasticsearch level before transferring to Spark executors.
Handling Complex Data Types
Nested structures and arrays require explicit schema handling to maintain data integrity.
from pyspark.sql.types import ArrayType, MapType
from pyspark.sql.functions import struct, array, create_map, lit
# Complex schema with nested objects
complex_schema = StructType([
StructField("order_id", StringType(), False),
StructField("customer", StructType([
StructField("id", StringType(), True),
StructField("email", StringType(), True)
]), True),
StructField("items", ArrayType(StructType([
StructField("product_id", StringType(), True),
StructField("quantity", IntegerType(), True),
StructField("price", FloatType(), True)
])), True),
StructField("metadata", MapType(StringType(), StringType()), True)
])
complex_data = [(
"order_001",
("cust_123", "customer@example.com"),
[
("prod_001", 2, 29.99),
("prod_002", 1, 49.99)
],
{"source": "web", "campaign": "spring_sale"}
)]
complex_df = spark.createDataFrame(complex_data, complex_schema)
complex_df.write \
.format("org.elasticsearch.spark.sql") \
.option("es.resource", "orders/_doc") \
.option("es.mapping.id", "order_id") \
.option("es.mapping.exclude", "metadata.internal_field") \
.mode("append") \
.save()
Performance Optimization
Tuning connector parameters directly impacts throughput and cluster stability.
# Optimized configuration for bulk writes
optimized_config = {
"es.batch.size.entries": "5000",
"es.batch.size.bytes": "10mb",
"es.batch.write.refresh": "false",
"es.batch.write.retry.count": "3",
"es.batch.write.retry.wait": "10s",
"es.http.timeout": "5m",
"es.http.retries": "3",
"es.scroll.size": "2000",
"es.action.heart.beat.lead": "15s"
}
# Apply configuration
for key, value in optimized_config.items():
spark.conf.set(key, value)
# Repartition before writing for better parallelism
df.repartition(20) \
.write \
.format("org.elasticsearch.spark.sql") \
.option("es.resource", "large_dataset/_doc") \
.mode("append") \
.save()
Monitor Elasticsearch cluster metrics during writes. If you see rejected requests or high JVM memory pressure, reduce es.batch.size.entries and increase es.batch.write.retry.wait.
Streaming Integration
The connector supports Spark Structured Streaming for real-time indexing.
from pyspark.sql.functions import from_json, col
# Kafka to Elasticsearch streaming pipeline
kafka_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "events") \
.load()
event_schema = StructType([
StructField("event_id", StringType(), True),
StructField("event_type", StringType(), True),
StructField("timestamp", TimestampType(), True),
StructField("payload", MapType(StringType(), StringType()), True)
])
parsed_df = kafka_df.select(
from_json(col("value").cast("string"), event_schema).alias("data")
).select("data.*")
query = parsed_df.writeStream \
.format("org.elasticsearch.spark.sql") \
.option("es.resource", "events/_doc") \
.option("es.mapping.id", "event_id") \
.option("checkpointLocation", "/tmp/checkpoint") \
.outputMode("append") \
.start()
query.awaitTermination()
Index Management and Mapping
Explicitly define Elasticsearch mappings before bulk writes to prevent dynamic mapping conflicts.
from elasticsearch import Elasticsearch
es_client = Elasticsearch(["http://localhost:9200"])
index_mapping = {
"mappings": {
"properties": {
"user_id": {"type": "keyword"},
"name": {"type": "text"},
"age": {"type": "integer"},
"created_at": {"type": "date"},
"tags": {"type": "keyword"},
"description": {
"type": "text",
"fields": {
"keyword": {"type": "keyword", "ignore_above": 256}
}
}
}
},
"settings": {
"number_of_shards": 5,
"number_of_replicas": 1,
"refresh_interval": "30s"
}
}
es_client.indices.create(index="users", body=index_mapping, ignore=400)
Set refresh_interval to -1 during bulk loads, then restore it afterward to improve indexing performance by 30-40%.
Error Handling and Monitoring
Implement robust error handling for production deployments.
from pyspark.sql.utils import AnalysisException
try:
df.write \
.format("org.elasticsearch.spark.sql") \
.option("es.resource", "users/_doc") \
.option("es.write.operation", "upsert") \
.option("es.mapping.id", "user_id") \
.option("es.spark.dataframe.write.null", "true") \
.mode("append") \
.save()
except AnalysisException as e:
print(f"Schema mismatch or configuration error: {e}")
except Exception as e:
print(f"Write failed: {e}")
# Implement retry logic or dead letter queue
The Elasticsearch-Spark connector provides production-ready integration for analytical workloads. Proper configuration of batch sizes, parallelism, and error handling ensures reliable data pipelines at scale.