Apache Spark - Cassandra Connector

The Spark-Cassandra connector bridges Apache Spark's distributed processing capabilities with Cassandra's distributed storage. Add the connector dependency matching your Spark and Scala versions:

Key Insights

  • The Spark-Cassandra connector enables distributed data processing on Cassandra tables with predicate pushdown, column pruning, and partition-aware data locality for optimal performance
  • Connector configuration requires careful tuning of batch sizes, parallelism levels, and write consistency settings to balance throughput against cluster load
  • Production deployments must handle write failures gracefully using saveMode options and implement proper retry logic for transient network issues

Connector Setup and Dependencies

The Spark-Cassandra connector bridges Apache Spark’s distributed processing capabilities with Cassandra’s distributed storage. Add the connector dependency matching your Spark and Scala versions:

// build.sbt
libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector" % "3.4.1"

For PySpark applications:

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("CassandraIntegration") \
    .config("spark.cassandra.connection.host", "cassandra-node1,cassandra-node2") \
    .config("spark.cassandra.connection.port", "9042") \
    .config("spark.cassandra.auth.username", "cassandra_user") \
    .config("spark.cassandra.auth.password", "password") \
    .config("spark.jars.packages", "com.datastax.spark:spark-cassandra-connector_2.12:3.4.1") \
    .getOrCreate()

The connector automatically discovers cluster topology from the initial contact points and establishes connections to appropriate nodes based on data locality.

Reading Data from Cassandra

The connector provides multiple APIs for reading Cassandra data. The DataFrame API offers the most intuitive interface:

df = spark.read \
    .format("org.apache.spark.sql.cassandra") \
    .options(table="user_events", keyspace="analytics") \
    .load()

# Apply filters - these get pushed down to Cassandra
filtered_df = df.filter("event_date >= '2024-01-01'") \
    .select("user_id", "event_type", "timestamp") \
    .filter("event_type = 'purchase'")

filtered_df.show()

The connector implements predicate pushdown, translating Spark filters into CQL WHERE clauses when possible. This dramatically reduces data transfer by filtering at the source.

For Scala applications, the RDD API provides lower-level control:

import com.datastax.spark.connector._

val rdd = sc.cassandraTable("analytics", "user_events")
  .select("user_id", "event_type", "timestamp")
  .where("event_date >= ?", "2024-01-01")

rdd.collect().foreach(println)

Optimizing Read Performance

Partition-aware data locality is the connector’s most powerful optimization. Configure split size to control parallelism:

spark.conf.set("spark.cassandra.input.split.size_in_mb", "64")
spark.conf.set("spark.cassandra.input.fetch.size_in_rows", "5000")

# Read with explicit consistency level
df = spark.read \
    .format("org.apache.spark.sql.cassandra") \
    .options(
        table="metrics",
        keyspace="monitoring",
        consistencyLevel="LOCAL_QUORUM"
    ) \
    .load()

The connector creates Spark partitions aligned with Cassandra token ranges, enabling executors to read from local replicas. Monitor partition distribution:

val rdd = sc.cassandraTable("analytics", "large_table")
println(s"Number of partitions: ${rdd.getNumPartitions}")
println(s"Preferred locations: ${rdd.preferredLocations(rdd.partitions(0))}")

For time-series data, leverage partition key filters to limit token range scans:

# Efficient - filters on partition key
events_df = spark.read \
    .format("org.apache.spark.sql.cassandra") \
    .options(table="events", keyspace="timeseries") \
    .load() \
    .filter("device_id = 'sensor-123'")

# Less efficient - full table scan
all_events = spark.read \
    .format("org.apache.spark.sql.cassandra") \
    .options(table="events", keyspace="timeseries") \
    .load() \
    .filter("temperature > 75")

Writing Data to Cassandra

The connector supports multiple write modes with configurable consistency and batching:

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType

schema = StructType([
    StructField("user_id", StringType(), False),
    StructField("session_id", StringType(), False),
    StructField("page_views", IntegerType(), False),
    StructField("last_activity", TimestampType(), False)
])

data = [
    ("user1", "sess123", 45, "2024-01-15 10:30:00"),
    ("user2", "sess124", 23, "2024-01-15 10:35:00")
]

df = spark.createDataFrame(data, schema)

df.write \
    .format("org.apache.spark.sql.cassandra") \
    .mode("append") \
    .options(
        table="user_sessions",
        keyspace="analytics",
        consistencyLevel="LOCAL_QUORUM"
    ) \
    .save()

Control write throughput and cluster impact through batch configuration:

spark.conf.set("spark.cassandra.output.batch.size.rows", "auto")
spark.conf.set("spark.cassandra.output.batch.size.bytes", "1024")
spark.conf.set("spark.cassandra.output.concurrent.writes", "5")
spark.conf.set("spark.cassandra.output.throughput_mb_per_sec", "10")

# These settings prevent overwhelming Cassandra nodes
df.write \
    .format("org.apache.spark.sql.cassandra") \
    .mode("append") \
    .options(table="metrics", keyspace="monitoring") \
    .save()

Handling Schema Mapping and Type Conversions

The connector automatically maps Spark types to Cassandra types, but explicit control is sometimes necessary:

import com.datastax.spark.connector.cql.CassandraConnector
import com.datastax.spark.connector.SomeColumns

case class UserProfile(
    userId: String,
    email: String,
    preferences: Map[String, String],
    tags: Set[String]
)

val profiles = sc.parallelize(Seq(
    UserProfile("u1", "user@example.com", Map("theme" -> "dark"), Set("premium", "verified"))
))

profiles.saveToCassandra(
    "users",
    "profiles",
    SomeColumns("user_id" as "userId", "email", "preferences", "tags")
)

For complex types like UDTs (User Defined Types):

# Cassandra UDT: CREATE TYPE address (street text, city text, zip text)
from pyspark.sql.types import StructType, StructField, StringType

address_schema = StructType([
    StructField("street", StringType()),
    StructField("city", StringType()),
    StructField("zip", StringType())
])

# Spark automatically maps struct to UDT
df.write \
    .format("org.apache.spark.sql.cassandra") \
    .options(table="users", keyspace="app") \
    .save()

Aggregations and Joins

Execute aggregations in Spark rather than relying on Cassandra’s limited aggregation capabilities:

# Read from multiple tables
users_df = spark.read \
    .format("org.apache.spark.sql.cassandra") \
    .options(table="users", keyspace="app") \
    .load()

events_df = spark.read \
    .format("org.apache.spark.sql.cassandra") \
    .options(table="events", keyspace="app") \
    .load()

# Perform join and aggregation in Spark
result = events_df.join(users_df, "user_id") \
    .groupBy("user_type") \
    .agg({"event_count": "sum", "user_id": "count"}) \
    .orderBy("user_type")

# Write aggregated results back
result.write \
    .format("org.apache.spark.sql.cassandra") \
    .mode("overwrite") \
    .options(table="user_type_summary", keyspace="analytics") \
    .save()

Error Handling and Retry Logic

Production systems require robust error handling for network partitions and node failures:

import com.datastax.spark.connector.writer._

val writeConf = WriteConf(
    consistencyLevel = ConsistencyLevel.LOCAL_QUORUM,
    parallelismLevel = 5,
    throughputMiBPS = 10,
    taskMetricsEnabled = true
)

try {
    rdd.saveToCassandra(
        "analytics",
        "events",
        writeConf = writeConf
    )
} catch {
    case e: com.datastax.spark.connector.writer.BulkWriteException =>
        println(s"Failed to write ${e.rowErrors.size} rows")
        e.rowErrors.foreach { case (row, error) =>
            println(s"Row: $row, Error: $error")
        }
}

For idempotent writes, implement retry logic:

from pyspark.sql.utils import AnalysisException

max_retries = 3
retry_delay = 5

for attempt in range(max_retries):
    try:
        df.write \
            .format("org.apache.spark.sql.cassandra") \
            .mode("append") \
            .options(table="events", keyspace="analytics") \
            .save()
        break
    except Exception as e:
        if attempt < max_retries - 1:
            time.sleep(retry_delay)
        else:
            raise

Monitoring and Performance Tuning

Enable metrics to track connector performance:

spark.conf.set("spark.cassandra.output.metrics", "true")
spark.conf.set("spark.cassandra.input.metrics", "true")

# Monitor through Spark UI or programmatically
df = spark.read \
    .format("org.apache.spark.sql.cassandra") \
    .options(table="large_table", keyspace="production") \
    .load()

print(f"Records read: {df.count()}")

Key performance indicators include read/write throughput, task duration distribution, and data locality percentages. Adjust parallelism based on cluster size and workload characteristics. The connector’s partition-aware scheduling minimizes network transfer, but improper configuration can create hotspots or underutilize cluster resources.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.