Spark SQL - Catalog API
The Spark Catalog API exposes metadata operations through the `SparkSession.catalog` object. This interface abstracts the underlying metastore implementation, whether you're using Hive, Glue, or...
Key Insights
- The Catalog API provides programmatic access to Spark’s metadata layer, enabling runtime inspection and manipulation of databases, tables, functions, and columns without writing SQL strings
- Understanding catalog operations is critical for building dynamic data pipelines that adapt to schema changes, implement table discovery patterns, and manage metadata-driven transformations
- The API supports both Hive metastore and in-memory catalogs, with methods for caching control, temporary view management, and database switching that directly impact query performance
Understanding the Catalog API Structure
The Spark Catalog API exposes metadata operations through the SparkSession.catalog object. This interface abstracts the underlying metastore implementation, whether you’re using Hive, Glue, or Spark’s in-memory catalog.
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("CatalogAPIDemo") \
.enableHiveSupport() \
.getOrCreate()
catalog = spark.catalog
# Get current database
current_db = catalog.currentDatabase()
print(f"Current database: {current_db}")
# List all databases
databases = catalog.listDatabases()
for db in databases:
print(f"Database: {db.name}, Location: {db.locationUri}")
The catalog object provides methods that return Dataset-like structures containing metadata. Each method returns a collection of objects with specific attributes relevant to that metadata type.
Database Operations
Managing databases programmatically allows dynamic pipeline routing and multi-tenant data architectures. The catalog API provides methods to list, switch, and inspect database properties.
# Create test databases for demonstration
spark.sql("CREATE DATABASE IF NOT EXISTS analytics")
spark.sql("CREATE DATABASE IF NOT EXISTS staging")
# Switch between databases
catalog.setCurrentDatabase("analytics")
print(f"Switched to: {catalog.currentDatabase()}")
# List all databases with filtering
all_dbs = catalog.listDatabases()
analytics_dbs = [db for db in all_dbs if 'analytics' in db.name]
for db in analytics_dbs:
print(f"Name: {db.name}")
print(f"Description: {db.description}")
print(f"Location: {db.locationUri}")
This pattern is essential for applications that need to process data across multiple logical partitions or environments without hardcoding database names.
Table Discovery and Metadata Inspection
The listTables() method returns comprehensive table metadata including table type (managed vs external), database association, and whether the table is temporary.
# Create sample tables
spark.sql("""
CREATE TABLE IF NOT EXISTS analytics.sales (
transaction_id STRING,
amount DECIMAL(10,2),
transaction_date DATE
) USING parquet
""")
# List tables in specific database
tables = catalog.listTables("analytics")
for table in tables:
print(f"Table: {table.name}")
print(f"Database: {table.database}")
print(f"Type: {table.tableType}")
print(f"Is Temporary: {table.isTemporary}")
print("---")
# Filter for specific table types
permanent_tables = [t for t in tables if not t.isTemporary]
print(f"Found {len(permanent_tables)} permanent tables")
For schema introspection, the listColumns() method provides detailed column metadata including data types, nullability, and whether columns are partition keys.
# Inspect column metadata
columns = catalog.listColumns("analytics.sales")
for col in columns:
print(f"Column: {col.name}")
print(f"Type: {col.dataType}")
print(f"Nullable: {col.nullable}")
print(f"Is Partition: {col.isPartition}")
print(f"Is Bucket: {col.isBucket}")
print("---")
# Build dynamic schema validation
required_columns = {'transaction_id', 'amount', 'transaction_date'}
actual_columns = {col.name for col in columns}
if not required_columns.issubset(actual_columns):
missing = required_columns - actual_columns
raise ValueError(f"Missing required columns: {missing}")
Working with Temporary Views
Temporary views are session-scoped virtual tables that don’t persist to the metastore. The catalog API provides methods to create, list, and drop these views programmatically.
# Create sample DataFrame
data = [
("2024-01-01", 1000.50),
("2024-01-02", 1500.75),
("2024-01-03", 2000.25)
]
df = spark.createDataFrame(data, ["date", "revenue"])
# Create temporary view
df.createOrReplaceTempView("daily_revenue")
# List all tables including temporary views
all_tables = catalog.listTables()
temp_views = [t for t in all_tables if t.isTemporary]
print("Temporary views in session:")
for view in temp_views:
print(f" - {view.name}")
# Drop temporary view
catalog.dropTempView("daily_revenue")
# Create global temporary view (cross-session)
df.createOrReplaceGlobalTempView("global_revenue")
global_views = catalog.listTables("global_temp")
print(f"Global temporary views: {[v.name for v in global_views]}")
Global temporary views persist across SparkSessions within the same application and reside in the global_temp database.
Function Management
The catalog API exposes both built-in and user-defined functions, enabling dynamic function discovery and validation.
# List all functions in current database
functions = catalog.listFunctions()
print(f"Total functions available: {len(functions)}")
# Filter for specific function types
udfs = [f for f in functions if not f.isTemporary]
temp_functions = [f for f in functions if f.isTemporary]
print(f"Permanent functions: {len(udfs)}")
print(f"Temporary functions: {len(temp_functions)}")
# Search for specific functions
date_functions = [f for f in functions if 'date' in f.name.lower()]
for func in date_functions[:5]: # Show first 5
print(f"Function: {func.name}, Class: {func.className}")
# Check if specific function exists
def function_exists(func_name: str) -> bool:
try:
catalog.functionExists(func_name)
return True
except:
return False
if function_exists("current_date"):
print("current_date function is available")
Cache Management
The catalog API provides direct control over table caching, which is critical for optimizing iterative algorithms and frequently accessed datasets.
# Create and populate a table for caching demo
spark.sql("""
CREATE TABLE IF NOT EXISTS analytics.reference_data (
id INT,
category STRING,
value DOUBLE
) USING parquet
""")
# Cache table in memory
catalog.cacheTable("analytics.reference_data")
# Check if table is cached
is_cached = catalog.isCached("analytics.reference_data")
print(f"Table cached: {is_cached}")
# List all cached tables
cached_tables = [t.name for t in catalog.listTables()
if catalog.isCached(f"{t.database}.{t.name")]
print(f"Cached tables: {cached_tables}")
# Uncache specific table
catalog.uncacheTable("analytics.reference_data")
# Clear all cached data
catalog.clearCache()
Caching should be used judiciously as it consumes memory. The catalog API allows you to programmatically manage cache lifecycle based on runtime conditions.
Table Existence and Recovery
Before performing operations on tables, verify their existence to avoid exceptions in production pipelines.
def safe_table_operation(database: str, table: str):
"""Safely perform operations on tables with existence checks"""
full_table_name = f"{database}.{table}"
# Check database exists
if not catalog.databaseExists(database):
print(f"Database {database} does not exist")
return None
# Check table exists
if not catalog.tableExists(full_table_name):
print(f"Table {full_table_name} does not exist")
return None
# Get table metadata
columns = catalog.listColumns(full_table_name)
return {
'table': full_table_name,
'columns': [col.name for col in columns],
'column_count': len(columns)
}
# Usage
result = safe_table_operation("analytics", "sales")
if result:
print(f"Table: {result['table']}")
print(f"Columns: {', '.join(result['columns'])}")
Practical Pattern: Metadata-Driven ETL
Combine catalog operations to build self-documenting, schema-aware data pipelines.
def process_all_tables_in_database(database: str, filter_prefix: str = None):
"""Process all tables matching criteria in a database"""
if not catalog.databaseExists(database):
raise ValueError(f"Database {database} not found")
catalog.setCurrentDatabase(database)
tables = catalog.listTables(database)
if filter_prefix:
tables = [t for t in tables if t.name.startswith(filter_prefix)]
results = []
for table in tables:
if table.isTemporary:
continue
full_name = f"{database}.{table.name}"
columns = catalog.listColumns(full_name)
# Get row count
df = spark.table(full_name)
row_count = df.count()
results.append({
'table': table.name,
'columns': len(columns),
'rows': row_count,
'type': table.tableType
})
return results
# Execute metadata-driven processing
stats = process_all_tables_in_database("analytics", "sales_")
for stat in stats:
print(f"{stat['table']}: {stat['rows']} rows, {stat['columns']} columns")
This pattern enables building data quality frameworks, automated documentation generators, and dynamic pipeline routers that adapt to metadata changes without code modifications.