Apache Spark - Cassandra Connector
The Spark-Cassandra connector bridges Apache Spark's distributed processing capabilities with Cassandra's distributed storage. Add the connector dependency matching your Spark and Scala versions:
Key Insights
- The Spark-Cassandra connector enables distributed data processing on Cassandra tables with predicate pushdown, column pruning, and partition-aware data locality for optimal performance
- Connector configuration requires careful tuning of batch sizes, parallelism levels, and write consistency settings to balance throughput against cluster load
- Production deployments must handle write failures gracefully using saveMode options and implement proper retry logic for transient network issues
Connector Setup and Dependencies
The Spark-Cassandra connector bridges Apache Spark’s distributed processing capabilities with Cassandra’s distributed storage. Add the connector dependency matching your Spark and Scala versions:
// build.sbt
libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector" % "3.4.1"
For PySpark applications:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("CassandraIntegration") \
.config("spark.cassandra.connection.host", "cassandra-node1,cassandra-node2") \
.config("spark.cassandra.connection.port", "9042") \
.config("spark.cassandra.auth.username", "cassandra_user") \
.config("spark.cassandra.auth.password", "password") \
.config("spark.jars.packages", "com.datastax.spark:spark-cassandra-connector_2.12:3.4.1") \
.getOrCreate()
The connector automatically discovers cluster topology from the initial contact points and establishes connections to appropriate nodes based on data locality.
Reading Data from Cassandra
The connector provides multiple APIs for reading Cassandra data. The DataFrame API offers the most intuitive interface:
df = spark.read \
.format("org.apache.spark.sql.cassandra") \
.options(table="user_events", keyspace="analytics") \
.load()
# Apply filters - these get pushed down to Cassandra
filtered_df = df.filter("event_date >= '2024-01-01'") \
.select("user_id", "event_type", "timestamp") \
.filter("event_type = 'purchase'")
filtered_df.show()
The connector implements predicate pushdown, translating Spark filters into CQL WHERE clauses when possible. This dramatically reduces data transfer by filtering at the source.
For Scala applications, the RDD API provides lower-level control:
import com.datastax.spark.connector._
val rdd = sc.cassandraTable("analytics", "user_events")
.select("user_id", "event_type", "timestamp")
.where("event_date >= ?", "2024-01-01")
rdd.collect().foreach(println)
Optimizing Read Performance
Partition-aware data locality is the connector’s most powerful optimization. Configure split size to control parallelism:
spark.conf.set("spark.cassandra.input.split.size_in_mb", "64")
spark.conf.set("spark.cassandra.input.fetch.size_in_rows", "5000")
# Read with explicit consistency level
df = spark.read \
.format("org.apache.spark.sql.cassandra") \
.options(
table="metrics",
keyspace="monitoring",
consistencyLevel="LOCAL_QUORUM"
) \
.load()
The connector creates Spark partitions aligned with Cassandra token ranges, enabling executors to read from local replicas. Monitor partition distribution:
val rdd = sc.cassandraTable("analytics", "large_table")
println(s"Number of partitions: ${rdd.getNumPartitions}")
println(s"Preferred locations: ${rdd.preferredLocations(rdd.partitions(0))}")
For time-series data, leverage partition key filters to limit token range scans:
# Efficient - filters on partition key
events_df = spark.read \
.format("org.apache.spark.sql.cassandra") \
.options(table="events", keyspace="timeseries") \
.load() \
.filter("device_id = 'sensor-123'")
# Less efficient - full table scan
all_events = spark.read \
.format("org.apache.spark.sql.cassandra") \
.options(table="events", keyspace="timeseries") \
.load() \
.filter("temperature > 75")
Writing Data to Cassandra
The connector supports multiple write modes with configurable consistency and batching:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
schema = StructType([
StructField("user_id", StringType(), False),
StructField("session_id", StringType(), False),
StructField("page_views", IntegerType(), False),
StructField("last_activity", TimestampType(), False)
])
data = [
("user1", "sess123", 45, "2024-01-15 10:30:00"),
("user2", "sess124", 23, "2024-01-15 10:35:00")
]
df = spark.createDataFrame(data, schema)
df.write \
.format("org.apache.spark.sql.cassandra") \
.mode("append") \
.options(
table="user_sessions",
keyspace="analytics",
consistencyLevel="LOCAL_QUORUM"
) \
.save()
Control write throughput and cluster impact through batch configuration:
spark.conf.set("spark.cassandra.output.batch.size.rows", "auto")
spark.conf.set("spark.cassandra.output.batch.size.bytes", "1024")
spark.conf.set("spark.cassandra.output.concurrent.writes", "5")
spark.conf.set("spark.cassandra.output.throughput_mb_per_sec", "10")
# These settings prevent overwhelming Cassandra nodes
df.write \
.format("org.apache.spark.sql.cassandra") \
.mode("append") \
.options(table="metrics", keyspace="monitoring") \
.save()
Handling Schema Mapping and Type Conversions
The connector automatically maps Spark types to Cassandra types, but explicit control is sometimes necessary:
import com.datastax.spark.connector.cql.CassandraConnector
import com.datastax.spark.connector.SomeColumns
case class UserProfile(
userId: String,
email: String,
preferences: Map[String, String],
tags: Set[String]
)
val profiles = sc.parallelize(Seq(
UserProfile("u1", "user@example.com", Map("theme" -> "dark"), Set("premium", "verified"))
))
profiles.saveToCassandra(
"users",
"profiles",
SomeColumns("user_id" as "userId", "email", "preferences", "tags")
)
For complex types like UDTs (User Defined Types):
# Cassandra UDT: CREATE TYPE address (street text, city text, zip text)
from pyspark.sql.types import StructType, StructField, StringType
address_schema = StructType([
StructField("street", StringType()),
StructField("city", StringType()),
StructField("zip", StringType())
])
# Spark automatically maps struct to UDT
df.write \
.format("org.apache.spark.sql.cassandra") \
.options(table="users", keyspace="app") \
.save()
Aggregations and Joins
Execute aggregations in Spark rather than relying on Cassandra’s limited aggregation capabilities:
# Read from multiple tables
users_df = spark.read \
.format("org.apache.spark.sql.cassandra") \
.options(table="users", keyspace="app") \
.load()
events_df = spark.read \
.format("org.apache.spark.sql.cassandra") \
.options(table="events", keyspace="app") \
.load()
# Perform join and aggregation in Spark
result = events_df.join(users_df, "user_id") \
.groupBy("user_type") \
.agg({"event_count": "sum", "user_id": "count"}) \
.orderBy("user_type")
# Write aggregated results back
result.write \
.format("org.apache.spark.sql.cassandra") \
.mode("overwrite") \
.options(table="user_type_summary", keyspace="analytics") \
.save()
Error Handling and Retry Logic
Production systems require robust error handling for network partitions and node failures:
import com.datastax.spark.connector.writer._
val writeConf = WriteConf(
consistencyLevel = ConsistencyLevel.LOCAL_QUORUM,
parallelismLevel = 5,
throughputMiBPS = 10,
taskMetricsEnabled = true
)
try {
rdd.saveToCassandra(
"analytics",
"events",
writeConf = writeConf
)
} catch {
case e: com.datastax.spark.connector.writer.BulkWriteException =>
println(s"Failed to write ${e.rowErrors.size} rows")
e.rowErrors.foreach { case (row, error) =>
println(s"Row: $row, Error: $error")
}
}
For idempotent writes, implement retry logic:
from pyspark.sql.utils import AnalysisException
max_retries = 3
retry_delay = 5
for attempt in range(max_retries):
try:
df.write \
.format("org.apache.spark.sql.cassandra") \
.mode("append") \
.options(table="events", keyspace="analytics") \
.save()
break
except Exception as e:
if attempt < max_retries - 1:
time.sleep(retry_delay)
else:
raise
Monitoring and Performance Tuning
Enable metrics to track connector performance:
spark.conf.set("spark.cassandra.output.metrics", "true")
spark.conf.set("spark.cassandra.input.metrics", "true")
# Monitor through Spark UI or programmatically
df = spark.read \
.format("org.apache.spark.sql.cassandra") \
.options(table="large_table", keyspace="production") \
.load()
print(f"Records read: {df.count()}")
Key performance indicators include read/write throughput, task duration distribution, and data locality percentages. Adjust parallelism based on cluster size and workload characteristics. The connector’s partition-aware scheduling minimizes network transfer, but improper configuration can create hotspots or underutilize cluster resources.