SQL query optimization, schema design, indexing strategies, and relational database mastery for production data systems
Design production SQL queries with window functions, CTEs, and indexing strategies. Claude uses this when optimizing slow queries, analyzing EXPLAIN plans, or designing partitioned schemas for PostgreSQL/MySQL.
/plugin marketplace add pluginagentmarketplace/custom-plugin-data-engineer/plugin install data-engineer-development-assistant@pluginagentmarketplace-data-engineerThis skill inherits all available tools. When active, it can use any tool Claude has access to.
assets/config.yamlassets/schema.jsonreferences/GUIDE.mdreferences/PATTERNS.mdscripts/validate.pyProduction-grade SQL skills for designing, querying, and optimizing relational databases in data engineering workflows.
-- Modern PostgreSQL 16+ query with window functions and CTEs
WITH daily_metrics AS (
SELECT
DATE_TRUNC('day', created_at) AS metric_date,
category,
COUNT(*) AS event_count,
SUM(amount) AS total_amount,
AVG(amount) AS avg_amount
FROM events
WHERE created_at >= CURRENT_DATE - INTERVAL '30 days'
GROUP BY 1, 2
),
ranked_categories AS (
SELECT
*,
ROW_NUMBER() OVER (PARTITION BY metric_date ORDER BY total_amount DESC) AS daily_rank,
LAG(total_amount) OVER (PARTITION BY category ORDER BY metric_date) AS prev_day_amount
FROM daily_metrics
)
SELECT
metric_date,
category,
event_count,
total_amount,
ROUND(100.0 * (total_amount - prev_day_amount) / NULLIF(prev_day_amount, 0), 2) AS day_over_day_pct
FROM ranked_categories
WHERE daily_rank <= 5
ORDER BY metric_date DESC, daily_rank;
-- Always analyze before optimizing
EXPLAIN (ANALYZE, BUFFERS, FORMAT TEXT)
SELECT u.id, u.email, COUNT(o.id) AS order_count
FROM users u
LEFT JOIN orders o ON u.id = o.user_id
WHERE u.created_at >= '2024-01-01'
GROUP BY u.id, u.email
HAVING COUNT(o.id) > 10;
-- Reading EXPLAIN output:
-- Seq Scan = Table scan (often slow, needs index)
-- Index Scan = Using index (good)
-- Bitmap Index Scan = Multiple index conditions (acceptable)
-- Hash Join = Building hash table (memory intensive)
-- Nested Loop = Row-by-row join (slow for large tables)
-- B-tree index (default, most common)
CREATE INDEX idx_users_email ON users(email);
-- Partial index (smaller, faster for specific queries)
CREATE INDEX idx_active_users ON users(created_at)
WHERE status = 'active';
-- Composite index (column order matters!)
CREATE INDEX idx_orders_user_date ON orders(user_id, created_at DESC);
-- Covering index (includes all columns query needs)
CREATE INDEX idx_orders_covering ON orders(user_id)
INCLUDE (total_amount, status);
-- Expression index (for computed values)
CREATE INDEX idx_users_lower_email ON users(LOWER(email));
-- GIN index for JSONB
CREATE INDEX idx_metadata_gin ON events USING GIN(metadata jsonb_path_ops);
-- Check index usage
SELECT
schemaname, tablename, indexname,
idx_scan, idx_tup_read, idx_tup_fetch
FROM pg_stat_user_indexes
ORDER BY idx_scan DESC;
-- Running totals and moving averages
SELECT
order_date,
amount,
-- Running total
SUM(amount) OVER (ORDER BY order_date) AS running_total,
-- 7-day moving average
AVG(amount) OVER (
ORDER BY order_date
ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
) AS moving_avg_7d,
-- Rank within partition
RANK() OVER (PARTITION BY customer_id ORDER BY amount DESC) AS customer_rank,
-- Percent of total within partition
100.0 * amount / SUM(amount) OVER (PARTITION BY customer_id) AS pct_of_customer_total
FROM orders;
-- Gap and island detection
WITH numbered AS (
SELECT
event_date,
ROW_NUMBER() OVER (ORDER BY event_date) AS rn,
event_date - (ROW_NUMBER() OVER (ORDER BY event_date) * INTERVAL '1 day') AS grp
FROM events
)
SELECT
MIN(event_date) AS streak_start,
MAX(event_date) AS streak_end,
COUNT(*) AS streak_length
FROM numbered
GROUP BY grp
ORDER BY streak_start;
-- Read Committed (default in PostgreSQL)
BEGIN;
SET TRANSACTION ISOLATION LEVEL READ COMMITTED;
-- Sees committed data at query start
COMMIT;
-- Repeatable Read (for consistent snapshots)
BEGIN;
SET TRANSACTION ISOLATION LEVEL REPEATABLE READ;
-- Same query returns same results within transaction
COMMIT;
-- Serializable (strongest, may cause retries)
BEGIN;
SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
-- Full isolation, but prepare for serialization failures
COMMIT;
-- Optimistic locking pattern
UPDATE products
SET stock = stock - 1, version = version + 1
WHERE id = $1 AND version = $2;
-- Check rows affected; if 0, concurrent modification occurred
-- Range partitioning (time-series data)
CREATE TABLE events (
id BIGSERIAL,
event_type TEXT NOT NULL,
payload JSONB,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
) PARTITION BY RANGE (created_at);
-- Create partitions
CREATE TABLE events_2024_q1 PARTITION OF events
FOR VALUES FROM ('2024-01-01') TO ('2024-04-01');
CREATE TABLE events_2024_q2 PARTITION OF events
FOR VALUES FROM ('2024-04-01') TO ('2024-07-01');
-- Auto-partition with pg_partman (production setup)
SELECT partman.create_parent(
p_parent_table := 'public.events',
p_control := 'created_at',
p_type := 'native',
p_interval := 'monthly',
p_premake := 3
);
-- List partitioning (categorical data)
CREATE TABLE orders (
id BIGSERIAL,
region TEXT NOT NULL,
total NUMERIC(10,2)
) PARTITION BY LIST (region);
CREATE TABLE orders_us PARTITION OF orders FOR VALUES IN ('US', 'CA');
CREATE TABLE orders_eu PARTITION OF orders FOR VALUES IN ('UK', 'DE', 'FR');
| Tool | Purpose | Version (2025) |
|---|---|---|
| PostgreSQL | Primary OLTP database | 16+ |
| MySQL | Alternative OLTP | 8.0+ |
| DBeaver | Universal SQL client | Latest |
| pgAdmin | PostgreSQL admin | 8+ |
| pgcli | CLI with autocomplete | Latest |
| pg_stat_statements | Query performance | Built-in |
| pgBouncer | Connection pooling | 1.21+ |
| Flyway/Liquibase | Schema migrations | Latest |
Week 1: SELECT, WHERE, JOINs, GROUP BY, ORDER BY
Week 2: Aggregations, HAVING, subqueries, UNION
Week 3: Window functions, CTEs, CASE expressions
Week 4: Index fundamentals, EXPLAIN basics
Week 5: Query optimization, execution plans
Week 6: Transactions, locking, isolation levels
Week 7: Partitioning, schema design patterns
Week 8: Connection pooling, high availability
Week 9: Migrations, versioning, rollback strategies
Week 10: Monitoring, alerting, performance tuning
-- Fact table (measures, metrics)
CREATE TABLE fact_sales (
sale_id BIGSERIAL PRIMARY KEY,
date_key INT NOT NULL REFERENCES dim_date(date_key),
product_key INT NOT NULL REFERENCES dim_product(product_key),
customer_key INT NOT NULL REFERENCES dim_customer(customer_key),
quantity INT NOT NULL,
unit_price NUMERIC(10,2) NOT NULL,
total_amount NUMERIC(12,2) NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW()
) PARTITION BY RANGE (date_key);
-- Dimension table (descriptive attributes)
CREATE TABLE dim_product (
product_key SERIAL PRIMARY KEY,
product_id VARCHAR(50) NOT NULL,
product_name VARCHAR(255) NOT NULL,
category VARCHAR(100),
subcategory VARCHAR(100),
-- SCD Type 2 fields
valid_from DATE NOT NULL DEFAULT CURRENT_DATE,
valid_to DATE DEFAULT '9999-12-31',
is_current BOOLEAN DEFAULT TRUE,
UNIQUE(product_id, valid_from)
);
-- Enable pg_stat_statements
CREATE EXTENSION IF NOT EXISTS pg_stat_statements;
-- Top 10 slowest queries
SELECT
ROUND((total_exec_time / 1000)::numeric, 2) AS total_secs,
calls,
ROUND((mean_exec_time / 1000)::numeric, 4) AS mean_secs,
ROUND((stddev_exec_time / 1000)::numeric, 4) AS stddev_secs,
rows,
query
FROM pg_stat_statements
ORDER BY total_exec_time DESC
LIMIT 10;
-- Reset stats after optimization
SELECT pg_stat_statements_reset();
# pgbouncer.ini
[databases]
mydb = host=localhost port=5432 dbname=mydb
[pgbouncer]
listen_addr = 0.0.0.0
listen_port = 6432
auth_type = scram-sha-256
auth_file = /etc/pgbouncer/userlist.txt
# Pool modes:
# session = 1 connection per client session
# transaction = 1 connection per transaction (recommended)
# statement = 1 connection per statement (limited use)
pool_mode = transaction
default_pool_size = 20
max_client_conn = 1000
reserve_pool_size = 5
| Issue | Symptoms | Root Cause | Fix |
|---|---|---|---|
| Slow Query | Query > 1s | Missing index, bad plan | EXPLAIN ANALYZE, add index |
| Lock Wait | Queries hang | Long transaction, deadlock | Check pg_locks, kill blocking |
| Connection Exhausted | "Too many connections" | No pooling, connection leak | Use pgbouncer, fix app |
| Disk Full | Write errors | Table bloat, WAL growth | VACUUM FULL, archive WAL |
| Replication Lag | Stale reads | Slow replica, network | Check pg_stat_replication |
-- 1. Check active queries
SELECT pid, now() - pg_stat_activity.query_start AS duration, query, state
FROM pg_stat_activity
WHERE state != 'idle'
ORDER BY duration DESC;
-- 2. Find blocking queries
SELECT
blocked.pid AS blocked_pid,
blocked.query AS blocked_query,
blocking.pid AS blocking_pid,
blocking.query AS blocking_query
FROM pg_stat_activity blocked
JOIN pg_locks blocked_locks ON blocked.pid = blocked_locks.pid
JOIN pg_locks blocking_locks ON blocked_locks.locktype = blocking_locks.locktype
AND blocked_locks.relation = blocking_locks.relation
AND blocked_locks.pid != blocking_locks.pid
JOIN pg_stat_activity blocking ON blocking_locks.pid = blocking.pid
WHERE NOT blocked_locks.granted;
-- 3. Check table bloat
SELECT
schemaname, tablename,
pg_size_pretty(pg_total_relation_size(schemaname || '.' || tablename)) AS total_size,
n_dead_tup AS dead_rows,
n_live_tup AS live_rows,
ROUND(100.0 * n_dead_tup / NULLIF(n_live_tup + n_dead_tup, 0), 2) AS dead_pct
FROM pg_stat_user_tables
ORDER BY n_dead_tup DESC
LIMIT 10;
-- 4. Check index usage
SELECT
schemaname, tablename, indexname,
idx_scan AS times_used,
pg_size_pretty(pg_relation_size(indexrelid)) AS index_size
FROM pg_stat_user_indexes
ORDER BY idx_scan ASC -- Least used first
LIMIT 20;
-- Graceful cancel (SIGINT)
SELECT pg_cancel_backend(pid);
-- Force terminate (SIGTERM)
SELECT pg_terminate_backend(pid);
-- Kill all queries older than 5 minutes
SELECT pg_terminate_backend(pid)
FROM pg_stat_activity
WHERE state = 'active'
AND query_start < NOW() - INTERVAL '5 minutes'
AND pid != pg_backend_pid();
import pytest
from sqlalchemy import create_engine, text
from testcontainers.postgres import PostgresContainer
@pytest.fixture(scope="module")
def postgres_container():
"""Spin up real PostgreSQL for integration tests."""
with PostgresContainer("postgres:16-alpine") as postgres:
yield postgres
@pytest.fixture
def db_connection(postgres_container):
"""Create connection with transaction rollback."""
engine = create_engine(postgres_container.get_connection_url())
with engine.connect() as conn:
trans = conn.begin()
yield conn
trans.rollback()
class TestQueryOptimization:
def test_index_is_used(self, db_connection):
# Arrange
db_connection.execute(text("""
CREATE TABLE test_users (
id SERIAL PRIMARY KEY,
email VARCHAR(255) NOT NULL
);
CREATE INDEX idx_email ON test_users(email);
INSERT INTO test_users (email)
SELECT 'user' || i || '@test.com' FROM generate_series(1, 10000) i;
ANALYZE test_users;
"""))
# Act
result = db_connection.execute(text("""
EXPLAIN (FORMAT JSON)
SELECT * FROM test_users WHERE email = 'user5000@test.com'
""")).fetchone()
plan = result[0][0]
# Assert
assert "Index Scan" in str(plan) or "Index Only Scan" in str(plan)
def test_window_function_correctness(self, db_connection):
# Arrange
db_connection.execute(text("""
CREATE TABLE test_sales (id SERIAL, amount NUMERIC);
INSERT INTO test_sales (amount) VALUES (100), (200), (300);
"""))
# Act
result = db_connection.execute(text("""
SELECT amount, SUM(amount) OVER (ORDER BY id) AS running_total
FROM test_sales ORDER BY id
""")).fetchall()
# Assert
assert result[0][1] == 100 # First running total
assert result[1][1] == 300 # 100 + 200
assert result[2][1] == 600 # 100 + 200 + 300
-- ✅ DO: Use explicit JOINs
SELECT u.name, o.total
FROM users u
INNER JOIN orders o ON u.id = o.user_id;
-- ❌ DON'T: Use implicit joins
SELECT u.name, o.total
FROM users u, orders o
WHERE u.id = o.user_id;
-- ✅ DO: Use EXISTS for existence checks
SELECT * FROM users u
WHERE EXISTS (SELECT 1 FROM orders o WHERE o.user_id = u.id);
-- ❌ DON'T: Use IN with subquery for large datasets
SELECT * FROM users
WHERE id IN (SELECT user_id FROM orders);
-- ✅ DO: Paginate with keyset (cursor)
SELECT * FROM events
WHERE id > $last_seen_id
ORDER BY id
LIMIT 100;
-- ❌ DON'T: Use OFFSET for deep pagination
SELECT * FROM events
ORDER BY id
LIMIT 100 OFFSET 1000000; -- Slow!
-- ✅ DO: Use appropriate data types
CREATE TABLE metrics (
id BIGSERIAL PRIMARY KEY, -- Use BIGSERIAL for high-volume
value NUMERIC(10,2), -- Precise decimal
created_at TIMESTAMPTZ -- Always use timezone-aware
);
-- ✅ DO: Add constraints
ALTER TABLE orders
ADD CONSTRAINT chk_positive_amount CHECK (amount > 0);
-- ✅ DO: Use JSONB for flexible schema
CREATE TABLE events (
id BIGSERIAL PRIMARY KEY,
event_type TEXT NOT NULL,
payload JSONB NOT NULL DEFAULT '{}'
);
After mastering SQL databases:
data-warehousing - Snowflake, BigQuery, dimensional modelingetl-tools - Build pipelines with Airflownosql-databases - MongoDB, Redis, DynamoDBbig-data - Spark SQL at scaleSkill Certification Checklist:
Use when working with Payload CMS projects (payload.config.ts, collections, fields, hooks, access control, Payload API). Use when debugging validation errors, security issues, relationship queries, transactions, or hook behavior.