Spark SQL - Create Database and Tables

Spark SQL databases are logical namespaces that organize tables and views. By default, Spark creates a `default` database, but production applications require proper database organization for better...

Key Insights

  • Spark SQL supports both managed and external tables, with managed tables storing data in the Spark warehouse while external tables reference data in custom locations
  • Database creation in Spark SQL is straightforward but requires understanding of storage locations and metadata management through the Hive metastore
  • Table creation offers multiple methods including SQL DDL, DataFrame API, and programmatic approaches, each suited for different use cases and data sources

Understanding Spark SQL Databases

Spark SQL databases are logical namespaces that organize tables and views. By default, Spark creates a default database, but production applications require proper database organization for better data governance and access control.

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("DatabaseManagement") \
    .enableHiveSupport() \
    .getOrCreate()

# Create a database
spark.sql("CREATE DATABASE IF NOT EXISTS sales_db")

# Create database with custom location
spark.sql("""
    CREATE DATABASE IF NOT EXISTS analytics_db
    LOCATION '/user/hive/warehouse/analytics'
    COMMENT 'Analytics database for reporting'
""")

# Show all databases
spark.sql("SHOW DATABASES").show()

# Use a specific database
spark.sql("USE sales_db")

The LOCATION clause specifies where table data will be stored. Without it, Spark uses the default warehouse directory defined by spark.sql.warehouse.dir.

Creating Managed Tables

Managed tables are fully controlled by Spark. When you drop a managed table, Spark deletes both metadata and underlying data files.

# Create managed table using SQL
spark.sql("""
    CREATE TABLE IF NOT EXISTS sales_db.customers (
        customer_id INT,
        name STRING,
        email STRING,
        signup_date DATE,
        account_balance DECIMAL(10,2)
    )
    USING parquet
    PARTITIONED BY (signup_date)
    TBLPROPERTIES (
        'created_by' = 'data_engineering',
        'version' = '1.0'
    )
""")

# Create table from DataFrame
data = [
    (1, "John Doe", "john@example.com", "2024-01-15", 1500.50),
    (2, "Jane Smith", "jane@example.com", "2024-01-16", 2300.75),
    (3, "Bob Johnson", "bob@example.com", "2024-01-17", 890.25)
]

columns = ["customer_id", "name", "email", "signup_date", "account_balance"]
df = spark.createDataFrame(data, columns)

df.write \
    .mode("overwrite") \
    .format("parquet") \
    .saveAsTable("sales_db.customer_snapshot")

The USING clause specifies the file format. Spark supports parquet, orc, json, csv, avro, and delta formats. Parquet is recommended for analytical workloads due to columnar storage and compression.

Creating External Tables

External tables reference data stored outside Spark’s warehouse directory. Dropping an external table removes only metadata, preserving the underlying data.

# Create external table pointing to existing data
spark.sql("""
    CREATE EXTERNAL TABLE IF NOT EXISTS sales_db.transactions (
        transaction_id BIGINT,
        customer_id INT,
        product_id INT,
        amount DECIMAL(10,2),
        transaction_timestamp TIMESTAMP
    )
    USING parquet
    LOCATION '/data/raw/transactions/'
    PARTITIONED BY (transaction_date DATE)
""")

# Add partition metadata for external table
spark.sql("""
    ALTER TABLE sales_db.transactions
    ADD IF NOT EXISTS PARTITION (transaction_date='2024-01-15')
    LOCATION '/data/raw/transactions/date=2024-01-15'
""")

# Discover partitions automatically
spark.sql("MSCK REPAIR TABLE sales_db.transactions")

Use MSCK REPAIR TABLE when partition directories exist but aren’t registered in the metastore. This commonly occurs when external processes write partitioned data.

Creating Tables with Complex Data Types

Spark SQL supports arrays, maps, and structs for modeling complex data structures.

spark.sql("""
    CREATE TABLE IF NOT EXISTS sales_db.orders (
        order_id BIGINT,
        customer_id INT,
        order_items ARRAY<STRUCT<
            product_id: INT,
            quantity: INT,
            unit_price: DECIMAL(10,2)
        >>,
        shipping_address STRUCT<
            street: STRING,
            city: STRING,
            state: STRING,
            zip: STRING
        >,
        metadata MAP<STRING, STRING>,
        created_at TIMESTAMP
    )
    USING parquet
""")

# Insert complex data
spark.sql("""
    INSERT INTO sales_db.orders VALUES (
        1001,
        1,
        ARRAY(
            STRUCT(101, 2, 29.99),
            STRUCT(102, 1, 49.99)
        ),
        STRUCT('123 Main St', 'Springfield', 'IL', '62701'),
        MAP('source', 'web', 'campaign', 'spring_sale'),
        CURRENT_TIMESTAMP()
    )
""")

Complex types eliminate the need for separate normalized tables in many scenarios, improving query performance by reducing joins.

Creating Tables from Query Results (CTAS)

CREATE TABLE AS SELECT (CTAS) creates tables populated with query results.

# Create table from query
spark.sql("""
    CREATE TABLE sales_db.high_value_customers
    USING parquet
    AS
    SELECT 
        c.customer_id,
        c.name,
        c.email,
        SUM(t.amount) as total_spent,
        COUNT(t.transaction_id) as transaction_count
    FROM sales_db.customers c
    JOIN sales_db.transactions t ON c.customer_id = t.customer_id
    WHERE t.transaction_date >= '2024-01-01'
    GROUP BY c.customer_id, c.name, c.email
    HAVING SUM(t.amount) > 5000
""")

CTAS is efficient for creating derivative tables and materialized views, though it doesn’t support explicit schema definition—the schema is inferred from the query.

Programmatic Table Creation with DataFrames

The DataFrame API provides programmatic control over table creation.

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DecimalType

# Define schema explicitly
schema = StructType([
    StructField("product_id", IntegerType(), False),
    StructField("product_name", StringType(), False),
    StructField("category", StringType(), True),
    StructField("price", DecimalType(10,2), False)
])

# Create DataFrame with schema
products_data = [
    (101, "Laptop", "Electronics", 999.99),
    (102, "Mouse", "Electronics", 29.99),
    (103, "Desk Chair", "Furniture", 199.99)
]

products_df = spark.createDataFrame(products_data, schema)

# Write with partitioning and bucketing
products_df.write \
    .mode("overwrite") \
    .format("parquet") \
    .partitionBy("category") \
    .bucketBy(10, "product_id") \
    .sortBy("product_id") \
    .option("compression", "snappy") \
    .saveAsTable("sales_db.products")

Bucketing distributes data across fixed number of files based on hash of specified columns, improving join performance when both tables are bucketed on join keys.

Temporary and Global Temporary Views

Temporary views exist only for the current session, while global temporary views are accessible across sessions but disappear when the application terminates.

# Create temporary view
df = spark.read.parquet("/data/staging/recent_orders")
df.createOrReplaceTempView("recent_orders")

# Query temporary view
spark.sql("SELECT * FROM recent_orders WHERE amount > 100").show()

# Create global temporary view
df.createOrReplaceGlobalTempView("global_recent_orders")

# Access global temporary view (requires global_temp prefix)
spark.sql("SELECT * FROM global_temp.global_recent_orders").show()

Temporary views are ideal for multi-step transformations within a single application, avoiding the overhead of writing intermediate results to disk.

Table Metadata and Management

Understanding table metadata helps with troubleshooting and optimization.

# Describe table schema
spark.sql("DESCRIBE EXTENDED sales_db.customers").show(truncate=False)

# Show table properties
spark.sql("SHOW TBLPROPERTIES sales_db.customers").show()

# Show create table statement
spark.sql("SHOW CREATE TABLE sales_db.customers").show(truncate=False)

# Get table statistics
spark.sql("ANALYZE TABLE sales_db.customers COMPUTE STATISTICS")
spark.sql("DESCRIBE EXTENDED sales_db.customers").show(truncate=False)

# Drop table (managed table deletes data, external preserves it)
spark.sql("DROP TABLE IF EXISTS sales_db.customer_snapshot")

# Drop database (CASCADE removes all tables)
spark.sql("DROP DATABASE IF EXISTS analytics_db CASCADE")

The ANALYZE TABLE command computes statistics that the query optimizer uses for better execution plans. Run this after significant data changes.

Practical Considerations

Storage format selection impacts performance significantly. Parquet offers excellent compression and columnar access for analytical queries. ORC provides similar benefits with better compression for certain workloads. Delta Lake adds ACID transactions and time travel capabilities.

Partitioning improves query performance when filtering on partition columns but creates overhead with too many partitions. Aim for partition sizes between 1GB and 10GB. Bucketing works well for tables frequently joined on specific columns.

Always use IF NOT EXISTS and IF EXISTS clauses in production code to handle idempotent operations. Enable Hive support when creating SparkSession to persist metadata in the Hive metastore, ensuring tables survive application restarts.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.