Apache Spark - MongoDB Connector
Add the MongoDB Spark Connector dependency to your project. For Spark 3.x with Scala 2.12:
Key Insights
- The MongoDB Spark Connector enables seamless bidirectional data flow between Spark and MongoDB, supporting both batch and streaming operations with native DataFrame/Dataset APIs
- Connection configuration requires careful tuning of partition strategies, batch sizes, and write concerns to optimize throughput while maintaining data consistency
- Advanced features like aggregation pipeline pushdown and schema inference significantly improve query performance by reducing data movement between systems
Setting Up the MongoDB Spark Connector
Add the MongoDB Spark Connector dependency to your project. For Spark 3.x with Scala 2.12:
// build.sbt
libraryDependencies += "org.mongodb.spark" %% "mongo-spark-connector" % "10.2.0"
For PySpark applications:
# Submit with package
spark-submit --packages org.mongodb.spark:mongo-spark-connector_2.12:10.2.0 your_app.py
Initialize SparkSession with MongoDB configuration:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName("MongoDBSparkConnector")
.config("spark.mongodb.read.connection.uri", "mongodb://localhost:27017/")
.config("spark.mongodb.write.connection.uri", "mongodb://localhost:27017/")
.config("spark.mongodb.read.database", "analytics")
.config("spark.mongodb.read.collection", "events")
.config("spark.mongodb.write.database", "analytics")
.config("spark.mongodb.write.collection", "processed_events")
.getOrCreate()
Reading Data from MongoDB
The connector provides multiple read strategies. Basic DataFrame read:
val df = spark.read
.format("mongodb")
.option("database", "analytics")
.option("collection", "events")
.load()
df.printSchema()
df.show(10)
For Python:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.config("spark.mongodb.read.connection.uri", "mongodb://localhost:27017/analytics.events") \
.getOrCreate()
df = spark.read \
.format("mongodb") \
.load()
df.printSchema()
Partitioning Strategies
Control data partitioning for parallel reads:
val partitionedDf = spark.read
.format("mongodb")
.option("database", "analytics")
.option("collection", "events")
.option("partitioner", "MongoShardedPartitioner")
.option("partitionKey", "user_id")
.option("partitionsPerShard", "4")
.load()
Available partitioners:
MongoDefaultPartitioner: Single partition (default)MongoSamplePartitioner: Samples collection for partition boundariesMongoShardedPartitioner: Uses shard information for sharded clustersMongoPaginateByCountPartitioner: Creates partitions based on document count
# Sample partitioner with custom bounds
df = spark.read \
.format("mongodb") \
.option("partitioner", "MongoSamplePartitioner") \
.option("samplesPerPartition", "10") \
.option("partitionKey", "_id") \
.load()
Filtering and Aggregation Pushdown
The connector pushes compatible operations to MongoDB, reducing data transfer:
// Filter pushdown
val filteredDf = df.filter($"timestamp" > "2024-01-01")
.filter($"status" === "completed")
// Aggregation pushdown
val aggregatedDf = df.groupBy("user_id")
.agg(
sum("amount").as("total_amount"),
count("*").as("transaction_count")
)
Explicit aggregation pipeline:
import com.mongodb.spark.sql.connector.read.MongoAggregationPipeline
val pipeline = """
[
{ "$match": { "timestamp": { "$gte": "2024-01-01" } } },
{ "$group": {
"_id": "$user_id",
"total": { "$sum": "$amount" },
"count": { "$sum": 1 }
}
},
{ "$sort": { "total": -1 } },
{ "$limit": 100 }
]
"""
val resultDf = spark.read
.format("mongodb")
.option("database", "analytics")
.option("collection", "events")
.option("aggregation.pipeline", pipeline)
.load()
Writing Data to MongoDB
Basic write operations with different modes:
// Overwrite mode - replaces collection
processedDf.write
.format("mongodb")
.mode("overwrite")
.option("database", "analytics")
.option("collection", "results")
.save()
// Append mode - adds documents
processedDf.write
.format("mongodb")
.mode("append")
.option("database", "analytics")
.option("collection", "results")
.save()
Write Configuration and Performance
Optimize write performance with batch size and ordered inserts:
processedDf.write
.format("mongodb")
.mode("append")
.option("database", "analytics")
.option("collection", "results")
.option("maxBatchSize", "1024")
.option("ordered", "false")
.option("writeConcern.w", "majority")
.option("writeConcern.journal", "true")
.save()
For upsert operations:
# Configure upsert based on _id field
df.write \
.format("mongodb") \
.mode("append") \
.option("replaceDocument", "false") \
.option("idFieldList", "_id") \
.option("operationType", "update") \
.save()
Schema Handling
MongoDB’s flexible schema requires explicit handling in Spark:
import org.apache.spark.sql.types._
// Define explicit schema
val schema = StructType(Array(
StructField("_id", StringType, nullable = false),
StructField("user_id", LongType, nullable = false),
StructField("timestamp", TimestampType, nullable = false),
StructField("amount", DoubleType, nullable = true),
StructField("metadata", MapType(StringType, StringType), nullable = true)
))
val typedDf = spark.read
.format("mongodb")
.schema(schema)
.load()
Handle nested documents:
val nestedSchema = StructType(Array(
StructField("_id", StringType),
StructField("user", StructType(Array(
StructField("id", LongType),
StructField("name", StringType),
StructField("email", StringType)
))),
StructField("items", ArrayType(StructType(Array(
StructField("product_id", StringType),
StructField("quantity", IntegerType),
StructField("price", DoubleType)
))))
))
val ordersDf = spark.read
.format("mongodb")
.schema(nestedSchema)
.option("collection", "orders")
.load()
// Access nested fields
ordersDf.select(
$"user.name",
explode($"items").as("item")
).select(
$"name",
$"item.product_id",
$"item.quantity"
).show()
Streaming with MongoDB
Configure Spark Structured Streaming with MongoDB change streams:
import org.apache.spark.sql.streaming.Trigger
val streamDf = spark.readStream
.format("mongodb")
.option("database", "analytics")
.option("collection", "events")
.option("change.stream.publish.full.document.only", "true")
.load()
val query = streamDf
.filter($"event_type" === "purchase")
.groupBy(
window($"timestamp", "1 minute"),
$"product_id"
)
.agg(
sum("amount").as("revenue"),
count("*").as("purchases")
)
.writeStream
.format("mongodb")
.option("database", "analytics")
.option("collection", "real_time_metrics")
.option("checkpointLocation", "/tmp/checkpoint")
.outputMode("update")
.trigger(Trigger.ProcessingTime("30 seconds"))
.start()
query.awaitTermination()
Connection Pool and Performance Tuning
Configure connection pooling for production workloads:
val spark = SparkSession.builder()
.config("spark.mongodb.read.connection.uri",
"mongodb://host1:27017,host2:27017,host3:27017/")
.config("spark.mongodb.read.connection.uri.options",
"maxPoolSize=50&minPoolSize=10&maxIdleTimeMS=300000")
.config("spark.mongodb.read.readPreference.name", "secondaryPreferred")
.config("spark.mongodb.read.readConcern.level", "majority")
.getOrCreate()
Optimize partition size for large collections:
# Target 128MB per partition
df = spark.read \
.format("mongodb") \
.option("partitioner", "MongoSamplePartitioner") \
.option("partitionSizeMB", "128") \
.option("samplesPerPartition", "20") \
.load()
print(f"Number of partitions: {df.rdd.getNumPartitions()}")
Monitor and tune based on your cluster characteristics. For write-heavy workloads, disable ordered writes and increase batch size. For read operations, align partition count with available executor cores and adjust based on document size and query selectivity.