Pandas - Write DataFrame to SQL Database
SQLite requires no server setup, making it ideal for local development and testing. The `to_sql()` method handles table creation automatically.
Key Insights
- Pandas provides
to_sql()method supporting SQLite, PostgreSQL, MySQL, and other databases through SQLAlchemy, with options for table creation, appending, and replacing data - Performance optimization requires chunking large DataFrames, using appropriate data types, and leveraging database-specific bulk insert methods for datasets exceeding 100K rows
- Transaction management and error handling are critical for data integrity, especially when dealing with schema mismatches, constraint violations, and connection timeouts
Basic DataFrame to SQL with SQLite
SQLite requires no server setup, making it ideal for local development and testing. The to_sql() method handles table creation automatically.
import pandas as pd
import sqlite3
# Create sample DataFrame
df = pd.DataFrame({
'user_id': [1, 2, 3, 4],
'username': ['alice', 'bob', 'charlie', 'diana'],
'age': [25, 30, 35, 28],
'signup_date': pd.to_datetime(['2024-01-15', '2024-01-16', '2024-01-17', '2024-01-18'])
})
# Create connection
conn = sqlite3.connect('users.db')
# Write DataFrame to SQL
df.to_sql('users', conn, if_exists='replace', index=False)
# Verify data
result = pd.read_sql('SELECT * FROM users', conn)
print(result)
conn.close()
The if_exists parameter controls behavior when the table already exists: 'fail' (default) raises an error, 'replace' drops and recreates the table, and 'append' adds rows to existing data.
Using SQLAlchemy for Multiple Database Engines
SQLAlchemy provides a unified interface for PostgreSQL, MySQL, SQL Server, and other databases. This approach offers better connection pooling and transaction management.
import pandas as pd
from sqlalchemy import create_engine, text
# PostgreSQL connection
pg_engine = create_engine('postgresql://user:password@localhost:5432/mydb')
# MySQL connection
mysql_engine = create_engine('mysql+pymysql://user:password@localhost:3306/mydb')
# SQL Server connection
mssql_engine = create_engine('mssql+pyodbc://user:password@localhost/mydb?driver=ODBC+Driver+17+for+SQL+Server')
df = pd.DataFrame({
'product_id': range(1, 1001),
'product_name': [f'Product_{i}' for i in range(1, 1001)],
'price': [19.99 + i * 0.5 for i in range(1000)],
'stock': [100 - i % 50 for i in range(1000)]
})
# Write to PostgreSQL
df.to_sql('products', pg_engine, if_exists='replace', index=False)
# Verify with raw SQL
with pg_engine.connect() as conn:
result = conn.execute(text('SELECT COUNT(*) FROM products'))
print(f"Rows inserted: {result.scalar()}")
Specifying Data Types and Schema
Pandas infers SQL data types from DataFrame dtypes, but explicit type mapping ensures optimal database schema design.
import pandas as pd
from sqlalchemy import create_engine, Integer, String, Float, DateTime, Boolean
from sqlalchemy.types import VARCHAR
engine = create_engine('sqlite:///ecommerce.db')
df = pd.DataFrame({
'order_id': [1001, 1002, 1003],
'customer_email': ['user@example.com', 'test@example.com', 'admin@example.com'],
'total_amount': [99.99, 149.50, 75.25],
'is_paid': [True, False, True],
'created_at': pd.to_datetime(['2024-01-20 10:30:00', '2024-01-20 11:45:00', '2024-01-20 12:15:00'])
})
# Define explicit data types
dtype_mapping = {
'order_id': Integer,
'customer_email': VARCHAR(255),
'total_amount': Float(precision=2),
'is_paid': Boolean,
'created_at': DateTime
}
df.to_sql('orders', engine, if_exists='replace', index=False, dtype=dtype_mapping)
# Check schema
with engine.connect() as conn:
result = conn.execute(text("PRAGMA table_info(orders)"))
for row in result:
print(row)
Chunked Writes for Large DataFrames
Writing large DataFrames in chunks prevents memory issues and allows progress tracking. This approach is essential for datasets with millions of rows.
import pandas as pd
from sqlalchemy import create_engine
import numpy as np
engine = create_engine('postgresql://user:password@localhost/analytics')
# Generate large DataFrame (5 million rows)
n_rows = 5_000_000
df = pd.DataFrame({
'event_id': range(n_rows),
'user_id': np.random.randint(1, 100000, n_rows),
'event_type': np.random.choice(['click', 'view', 'purchase'], n_rows),
'timestamp': pd.date_range('2024-01-01', periods=n_rows, freq='S'),
'value': np.random.uniform(0, 100, n_rows)
})
# Write in chunks of 50,000 rows
chunk_size = 50_000
total_chunks = len(df) // chunk_size + (1 if len(df) % chunk_size else 0)
for i, chunk_start in enumerate(range(0, len(df), chunk_size)):
chunk = df.iloc[chunk_start:chunk_start + chunk_size]
if i == 0:
chunk.to_sql('events', engine, if_exists='replace', index=False, chunksize=chunk_size)
else:
chunk.to_sql('events', engine, if_exists='append', index=False, chunksize=chunk_size)
print(f"Progress: {i+1}/{total_chunks} chunks written ({(i+1)/total_chunks*100:.1f}%)")
Transaction Management and Error Handling
Proper transaction handling ensures data integrity when writes fail partway through. Use context managers and explicit transactions for production code.
import pandas as pd
from sqlalchemy import create_engine, text
from sqlalchemy.exc import SQLAlchemyError
engine = create_engine('postgresql://user:password@localhost/production')
df = pd.DataFrame({
'account_id': [1, 2, 3, 4],
'balance': [1000.00, 2500.50, 750.25, 3200.00],
'last_updated': pd.Timestamp.now()
})
try:
with engine.begin() as conn:
# Drop existing table within transaction
conn.execute(text('DROP TABLE IF EXISTS accounts'))
# Write DataFrame
df.to_sql('accounts', conn, if_exists='fail', index=False)
# Additional validation query
result = conn.execute(text('SELECT SUM(balance) FROM accounts'))
total_balance = result.scalar()
if total_balance != df['balance'].sum():
raise ValueError(f"Balance mismatch: {total_balance} != {df['balance'].sum()}")
print(f"Successfully wrote {len(df)} rows with total balance: ${total_balance:.2f}")
except SQLAlchemyError as e:
print(f"Database error occurred: {e}")
# Transaction automatically rolled back
except ValueError as e:
print(f"Validation error: {e}")
# Transaction automatically rolled back
Appending Data with Duplicate Handling
When appending data, handle potential duplicates using database constraints or pre-filtering in Pandas.
import pandas as pd
from sqlalchemy import create_engine, text
engine = create_engine('sqlite:///logs.db')
# Initial data
initial_df = pd.DataFrame({
'log_id': [1, 2, 3],
'message': ['Started', 'Processing', 'Completed'],
'timestamp': pd.to_datetime(['2024-01-20 10:00:00', '2024-01-20 10:05:00', '2024-01-20 10:10:00'])
})
initial_df.to_sql('application_logs', engine, if_exists='replace', index=False)
# New data with potential duplicates
new_df = pd.DataFrame({
'log_id': [3, 4, 5], # log_id 3 already exists
'message': ['Completed', 'Error occurred', 'Restarted'],
'timestamp': pd.to_datetime(['2024-01-20 10:10:00', '2024-01-20 10:15:00', '2024-01-20 10:20:00'])
})
# Filter out duplicates before writing
with engine.connect() as conn:
existing_ids = pd.read_sql('SELECT log_id FROM application_logs', conn)
filtered_df = new_df[~new_df['log_id'].isin(existing_ids['log_id'])]
if not filtered_df.empty:
filtered_df.to_sql('application_logs', engine, if_exists='append', index=False)
print(f"Appended {len(filtered_df)} new rows, skipped {len(new_df) - len(filtered_df)} duplicates")
else:
print("No new rows to append")
Performance Optimization with Method Selection
For PostgreSQL and MySQL, use database-specific bulk insert methods for significant performance gains on large datasets.
import pandas as pd
from sqlalchemy import create_engine
import time
engine = create_engine('postgresql://user:password@localhost/benchmark')
# Generate test data
df = pd.DataFrame({
'id': range(100_000),
'value': range(100_000)
})
# Method 1: Standard to_sql
start = time.time()
df.to_sql('test_standard', engine, if_exists='replace', index=False)
standard_time = time.time() - start
# Method 2: to_sql with method='multi' (PostgreSQL/MySQL)
start = time.time()
df.to_sql('test_multi', engine, if_exists='replace', index=False, method='multi')
multi_time = time.time() - start
print(f"Standard method: {standard_time:.2f}s")
print(f"Multi method: {multi_time:.2f}s")
print(f"Speedup: {standard_time/multi_time:.2f}x")
The method='multi' parameter executes multiple rows per INSERT statement, reducing network overhead and significantly improving write performance for supported databases.