Apache Spark - MongoDB Connector

Add the MongoDB Spark Connector dependency to your project. For Spark 3.x with Scala 2.12:

Key Insights

  • The MongoDB Spark Connector enables seamless bidirectional data flow between Spark and MongoDB, supporting both batch and streaming operations with native DataFrame/Dataset APIs
  • Connection configuration requires careful tuning of partition strategies, batch sizes, and write concerns to optimize throughput while maintaining data consistency
  • Advanced features like aggregation pipeline pushdown and schema inference significantly improve query performance by reducing data movement between systems

Setting Up the MongoDB Spark Connector

Add the MongoDB Spark Connector dependency to your project. For Spark 3.x with Scala 2.12:

// build.sbt
libraryDependencies += "org.mongodb.spark" %% "mongo-spark-connector" % "10.2.0"

For PySpark applications:

# Submit with package
spark-submit --packages org.mongodb.spark:mongo-spark-connector_2.12:10.2.0 your_app.py

Initialize SparkSession with MongoDB configuration:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .appName("MongoDBSparkConnector")
  .config("spark.mongodb.read.connection.uri", "mongodb://localhost:27017/")
  .config("spark.mongodb.write.connection.uri", "mongodb://localhost:27017/")
  .config("spark.mongodb.read.database", "analytics")
  .config("spark.mongodb.read.collection", "events")
  .config("spark.mongodb.write.database", "analytics")
  .config("spark.mongodb.write.collection", "processed_events")
  .getOrCreate()

Reading Data from MongoDB

The connector provides multiple read strategies. Basic DataFrame read:

val df = spark.read
  .format("mongodb")
  .option("database", "analytics")
  .option("collection", "events")
  .load()

df.printSchema()
df.show(10)

For Python:

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .config("spark.mongodb.read.connection.uri", "mongodb://localhost:27017/analytics.events") \
    .getOrCreate()

df = spark.read \
    .format("mongodb") \
    .load()

df.printSchema()

Partitioning Strategies

Control data partitioning for parallel reads:

val partitionedDf = spark.read
  .format("mongodb")
  .option("database", "analytics")
  .option("collection", "events")
  .option("partitioner", "MongoShardedPartitioner")
  .option("partitionKey", "user_id")
  .option("partitionsPerShard", "4")
  .load()

Available partitioners:

  • MongoDefaultPartitioner: Single partition (default)
  • MongoSamplePartitioner: Samples collection for partition boundaries
  • MongoShardedPartitioner: Uses shard information for sharded clusters
  • MongoPaginateByCountPartitioner: Creates partitions based on document count
# Sample partitioner with custom bounds
df = spark.read \
    .format("mongodb") \
    .option("partitioner", "MongoSamplePartitioner") \
    .option("samplesPerPartition", "10") \
    .option("partitionKey", "_id") \
    .load()

Filtering and Aggregation Pushdown

The connector pushes compatible operations to MongoDB, reducing data transfer:

// Filter pushdown
val filteredDf = df.filter($"timestamp" > "2024-01-01")
  .filter($"status" === "completed")

// Aggregation pushdown
val aggregatedDf = df.groupBy("user_id")
  .agg(
    sum("amount").as("total_amount"),
    count("*").as("transaction_count")
  )

Explicit aggregation pipeline:

import com.mongodb.spark.sql.connector.read.MongoAggregationPipeline

val pipeline = """
[
  { "$match": { "timestamp": { "$gte": "2024-01-01" } } },
  { "$group": { 
      "_id": "$user_id", 
      "total": { "$sum": "$amount" },
      "count": { "$sum": 1 }
    }
  },
  { "$sort": { "total": -1 } },
  { "$limit": 100 }
]
"""

val resultDf = spark.read
  .format("mongodb")
  .option("database", "analytics")
  .option("collection", "events")
  .option("aggregation.pipeline", pipeline)
  .load()

Writing Data to MongoDB

Basic write operations with different modes:

// Overwrite mode - replaces collection
processedDf.write
  .format("mongodb")
  .mode("overwrite")
  .option("database", "analytics")
  .option("collection", "results")
  .save()

// Append mode - adds documents
processedDf.write
  .format("mongodb")
  .mode("append")
  .option("database", "analytics")
  .option("collection", "results")
  .save()

Write Configuration and Performance

Optimize write performance with batch size and ordered inserts:

processedDf.write
  .format("mongodb")
  .mode("append")
  .option("database", "analytics")
  .option("collection", "results")
  .option("maxBatchSize", "1024")
  .option("ordered", "false")
  .option("writeConcern.w", "majority")
  .option("writeConcern.journal", "true")
  .save()

For upsert operations:

# Configure upsert based on _id field
df.write \
    .format("mongodb") \
    .mode("append") \
    .option("replaceDocument", "false") \
    .option("idFieldList", "_id") \
    .option("operationType", "update") \
    .save()

Schema Handling

MongoDB’s flexible schema requires explicit handling in Spark:

import org.apache.spark.sql.types._

// Define explicit schema
val schema = StructType(Array(
  StructField("_id", StringType, nullable = false),
  StructField("user_id", LongType, nullable = false),
  StructField("timestamp", TimestampType, nullable = false),
  StructField("amount", DoubleType, nullable = true),
  StructField("metadata", MapType(StringType, StringType), nullable = true)
))

val typedDf = spark.read
  .format("mongodb")
  .schema(schema)
  .load()

Handle nested documents:

val nestedSchema = StructType(Array(
  StructField("_id", StringType),
  StructField("user", StructType(Array(
    StructField("id", LongType),
    StructField("name", StringType),
    StructField("email", StringType)
  ))),
  StructField("items", ArrayType(StructType(Array(
    StructField("product_id", StringType),
    StructField("quantity", IntegerType),
    StructField("price", DoubleType)
  ))))
))

val ordersDf = spark.read
  .format("mongodb")
  .schema(nestedSchema)
  .option("collection", "orders")
  .load()

// Access nested fields
ordersDf.select(
  $"user.name",
  explode($"items").as("item")
).select(
  $"name",
  $"item.product_id",
  $"item.quantity"
).show()

Streaming with MongoDB

Configure Spark Structured Streaming with MongoDB change streams:

import org.apache.spark.sql.streaming.Trigger

val streamDf = spark.readStream
  .format("mongodb")
  .option("database", "analytics")
  .option("collection", "events")
  .option("change.stream.publish.full.document.only", "true")
  .load()

val query = streamDf
  .filter($"event_type" === "purchase")
  .groupBy(
    window($"timestamp", "1 minute"),
    $"product_id"
  )
  .agg(
    sum("amount").as("revenue"),
    count("*").as("purchases")
  )
  .writeStream
  .format("mongodb")
  .option("database", "analytics")
  .option("collection", "real_time_metrics")
  .option("checkpointLocation", "/tmp/checkpoint")
  .outputMode("update")
  .trigger(Trigger.ProcessingTime("30 seconds"))
  .start()

query.awaitTermination()

Connection Pool and Performance Tuning

Configure connection pooling for production workloads:

val spark = SparkSession.builder()
  .config("spark.mongodb.read.connection.uri", 
    "mongodb://host1:27017,host2:27017,host3:27017/")
  .config("spark.mongodb.read.connection.uri.options", 
    "maxPoolSize=50&minPoolSize=10&maxIdleTimeMS=300000")
  .config("spark.mongodb.read.readPreference.name", "secondaryPreferred")
  .config("spark.mongodb.read.readConcern.level", "majority")
  .getOrCreate()

Optimize partition size for large collections:

# Target 128MB per partition
df = spark.read \
    .format("mongodb") \
    .option("partitioner", "MongoSamplePartitioner") \
    .option("partitionSizeMB", "128") \
    .option("samplesPerPartition", "20") \
    .load()

print(f"Number of partitions: {df.rdd.getNumPartitions()}")

Monitor and tune based on your cluster characteristics. For write-heavy workloads, disable ordered writes and increase batch size. For read operations, align partition count with available executor cores and adjust based on document size and query selectivity.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.