From claude-starter-kit
Builds scalable data infrastructure from prototype to production, covering caching (Redis), read replicas, sharding, message queues, ETL pipelines, and high-throughput planning.
npx claudepluginhub sunnypatneedi/claude-starter-kitThis skill uses the workspace's default tool permissions.
Build data infrastructure that grows from MVP to millions of users without rewrites.
Designs scalable database architectures from scratch, selects technologies like PostgreSQL or DynamoDB, models schemas, indexes, and plans migrations or re-architecting.
Designs scalable database architectures from scratch or re-architects existing systems. Selects SQL/NoSQL/TimeSeries technologies, models schemas, applies normalization, and plans migrations for performance.
Guides scaling systems from startup (0-10K users) to enterprise (1M+), with stage architectures, metrics, bottlenecks diagnosis, and capacity planning.
Share bugs, ideas, or general feedback.
Build data infrastructure that grows from MVP to millions of users without rewrites.
Use this skill when:
Architecture:
┌─────────┐
│ App │
└────┬────┘
│
┌────▼────┐
│ DB │
└─────────┘
Characteristics:
When to graduate: Response time > 200ms, database CPU > 60%
Architecture:
┌─────────┐
│ App │
└─┬────┬──┘
│ │
│ ┌─▼──────┐
│ │ Cache │
│ └────────┘
│
┌─▼────┐
│ DB │
└──────┘
Add:
Pattern: Cache-Aside
def get_user(user_id):
# Try cache first
user = cache.get(f"user:{user_id}")
if user:
return user
# Miss: fetch from DB
user = db.query("SELECT * FROM users WHERE id = ?", user_id)
# Populate cache
cache.set(f"user:{user_id}", user, ttl=300)
return user
When to graduate: Database read load > 70%, cache hit rate < 80%
Architecture:
┌─────────┐
│ App │
└─┬────┬──┘
│ │
│ ┌─▼──────┐
│ │ Cache │
│ └────────┘
│
│ ┌─────────┐
├──► Primary │ (writes)
│ └────┬────┘
│ │ replication
│ ┌────▼────┐
└──► Replica │ (reads)
└─────────┘
Add:
Pattern: Write-Primary, Read-Replica
def get_user(user_id):
return db_replica.query("SELECT * FROM users WHERE id = ?", user_id)
def update_user(user_id, data):
# Must use primary for writes
db_primary.execute("UPDATE users SET ... WHERE id = ?", data, user_id)
# Invalidate cache
cache.delete(f"user:{user_id}")
Gotcha: Replication Lag
# Problem: Read-your-own-writes fails
def create_post(user_id, content):
post_id = db_primary.insert("INSERT INTO posts ...", user_id, content)
# Reads from replica, but replication hasn't caught up yet!
post = db_replica.query("SELECT * FROM posts WHERE id = ?", post_id)
# post might be None!
# Fix: Read from primary immediately after write
def create_post(user_id, content):
post_id = db_primary.insert("INSERT INTO posts ...", user_id, content)
post = db_primary.query("SELECT * FROM posts WHERE id = ?", post_id)
return post
When to graduate: Single database can't handle write load, need horizontal scaling
Architecture:
┌─────────┐
│ App │
└────┬────┘
│
┌────▼─────────┐
│ Shard Router │
└─┬──────────┬─┘
│ │
┌─▼──────┐ ┌─▼──────┐
│Shard 1 │ │Shard 2 │ (each shard has replicas)
│user 0-5M │Shard 5M-10M│
└────────┘ └────────┘
Sharding Strategies:
1. Hash-based (even distribution):
def get_shard(user_id):
return user_id % NUM_SHARDS
# User 123 → Shard 3
# User 456 → Shard 0
2. Range-based (logical grouping):
def get_shard(user_id):
if user_id < 5_000_000:
return 'shard_1'
elif user_id < 10_000_000:
return 'shard_2'
else:
return 'shard_3'
3. Geographic (data locality):
def get_shard(user_region):
if user_region in ['US', 'CA', 'MX']:
return 'shard_americas'
elif user_region in ['UK', 'DE', 'FR']:
return 'shard_europe'
else:
return 'shard_asia'
Challenges:
When to graduate: Need multi-datacenter, global scale
Architecture:
┌──────────────────────────────────┐
│ Load Balancer (Global) │
└────────┬────────────────┬────────┘
│ │
┌────▼────┐ ┌────▼────┐
│ US-DC │ │ EU-DC │
└────┬────┘ └────┬────┘
│ │
┌────▼────────────────▼────┐
│ Distributed Database │
│ (CockroachDB, Spanner) │
└──────────────────────────┘
Add:
Databases for this stage:
Start here:
│
├─ Need ACID transactions?
│ ├─ Yes → SQL (PostgreSQL, MySQL)
│ └─ No → Continue
│
├─ Need complex queries (JOINs, aggregations)?
│ ├─ Yes → SQL
│ └─ No → Continue
│
├─ Need flexible schema?
│ ├─ Yes → Document DB (MongoDB, Firestore)
│ └─ No → Continue
│
├─ Write-heavy, time-series?
│ ├─ Yes → Wide-column (Cassandra, ClickHouse)
│ └─ No → Continue
│
├─ Need ultra-low latency?
│ ├─ Yes → In-memory (Redis, Memcached)
│ └─ No → Key-Value (DynamoDB, RocksDB)
| Database | Best For | Throughput | Consistency | Cost |
|---|---|---|---|---|
| PostgreSQL | Transactional, complex queries | Medium | Strong | Low |
| MySQL | Read-heavy, replication | High reads | Strong | Low |
| MongoDB | Flexible schema, rapid iteration | Medium | Eventual | Medium |
| Cassandra | Write-heavy, multi-DC | Very high writes | Tunable | Medium |
| Redis | Caching, sessions, pub/sub | Very high | Eventually | Low (memory) |
| DynamoDB | Serverless, predictable latency | High | Strong/Eventual | Pay-per-request |
| ClickHouse | Analytics, OLAP | Very high (columnar) | Eventually | Medium |
| Elasticsearch | Full-text search, logs | High | Eventually | Medium-High |
Pattern: Application manages cache
def get_product(product_id):
# Check cache
product = cache.get(f"product:{product_id}")
if product:
return product
# Cache miss: Load from DB
product = db.query("SELECT * FROM products WHERE id = ?", product_id)
# Store in cache
cache.set(f"product:{product_id}", product, ttl=3600)
return product
Pros: Simple, cache failures don't break app Cons: Cache miss penalty, stale data possible
Pattern: Update cache and database together
def update_product(product_id, data):
# Update DB
db.execute("UPDATE products SET ... WHERE id = ?", data, product_id)
# Update cache
cache.set(f"product:{product_id}", data, ttl=3600)
Pros: Cache always fresh Cons: Write latency (two operations)
Pattern: Update cache first, DB asynchronously
def update_product(product_id, data):
# Update cache immediately
cache.set(f"product:{product_id}", data)
# Queue DB update for later
queue.enqueue('update_product_db', product_id, data)
Pros: Low write latency Cons: Data loss risk if cache fails before DB write
Pattern: Refresh cache before expiration
def get_product(product_id):
product = cache.get(f"product:{product_id}")
ttl = cache.ttl(f"product:{product_id}")
# Refresh if TTL < 10% of original
if ttl < 360: # 10% of 3600s
queue.enqueue('refresh_product_cache', product_id)
return product
Pros: Reduces cache misses Cons: More complex, refresh overhead
Problem: "There are only two hard things in Computer Science: cache invalidation and naming things."
Strategies:
1. TTL-based (simplest):
cache.set("user:123", user_data, ttl=300) # 5 minutes
# Pros: Simple, automatic cleanup
# Cons: May serve stale data for up to 5 minutes
2. Event-based:
def update_user(user_id, data):
db.execute("UPDATE users SET ... WHERE id = ?", data, user_id)
cache.delete(f"user:{user_id}") # Invalidate immediately
# Also invalidate derived caches
cache.delete(f"user:{user_id}:posts")
cache.delete(f"user:{user_id}:followers")
3. Version-based:
def get_user(user_id):
version = db.query("SELECT version FROM users WHERE id = ?", user_id)
cache_key = f"user:{user_id}:v{version}"
user = cache.get(cache_key)
if not user:
user = db.query("SELECT * FROM users WHERE id = ?", user_id)
cache.set(cache_key, user, ttl=3600)
return user
def update_user(user_id, data):
db.execute("UPDATE users SET version = version + 1, ... WHERE id = ?", data, user_id)
# Old cache keys naturally expire, no need to delete
Use queues for:
| Queue | Best For | Guarantees | Throughput |
|---|---|---|---|
| Redis | Simple queues, fast | At-most-once | Very high |
| RabbitMQ | Complex routing, reliability | At-least-once | High |
| Apache Kafka | Event streaming, replay | Exactly-once | Very high |
| AWS SQS | Serverless, simple | At-least-once | High |
| Google Pub/Sub | GCP, fan-out | At-least-once | High |
# Producer (web app)
@app.post('/orders')
def create_order(order_data):
# Save to DB
order_id = db.insert("INSERT INTO orders ...", order_data)
# Enqueue async tasks
queue.enqueue('send_confirmation_email', order_id)
queue.enqueue('update_inventory', order_id)
queue.enqueue('notify_warehouse', order_id)
# Return immediately
return {'order_id': order_id, 'status': 'processing'}
# Consumer (worker)
def send_confirmation_email(order_id):
order = db.query("SELECT * FROM orders WHERE id = ?", order_id)
email_service.send(order.email, "Order confirmed", template)
Benefits:
# Multiple consumers for same event
event_bus.publish('order.created', {
'order_id': 123,
'user_id': 456,
'total': 99.99
})
# Subscriber 1: Email service
@event_bus.subscribe('order.created')
def send_email(event):
email_service.send(...)
# Subscriber 2: Analytics
@event_bus.subscribe('order.created')
def track_analytics(event):
analytics.track('Order Created', event)
# Subscriber 3: Inventory
@event_bus.subscribe('order.created')
def update_inventory(event):
inventory.decrement(...)
Benefits:
# Exponential backoff with max retries
@queue.job(retry=5, backoff='exponential')
def send_email(order_id):
# Retry delays: 1s, 2s, 4s, 8s, 16s
email_service.send(...)
# Dead letter queue for failed jobs
@queue.job(retry=3, on_failure='move_to_dlq')
def process_payment(order_id):
# After 3 failures, move to DLQ for manual review
payment_service.charge(...)
Use when: Batch processing, data warehouse
┌─────────┐ ┌───────────┐ ┌────────────┐
│ Sources │────►│ Transform │────►│ Target │
│(DB, API)│ │ (Python) │ │(Warehouse) │
└─────────┘ └───────────┘ └────────────┘
Example: Daily sales report
# Extract
orders = db.query("SELECT * FROM orders WHERE created_at >= ?", yesterday)
users = db.query("SELECT * FROM users WHERE id IN (?)", order_user_ids)
# Transform
sales_data = []
for order in orders:
user = users[order.user_id]
sales_data.append({
'date': order.created_at.date(),
'user_region': user.region,
'total': order.total,
'category': categorize(order.items)
})
# Load
warehouse.bulk_insert('daily_sales', sales_data)
Tools: Apache Airflow, dbt, Luigi
Use when: Cloud data warehouses with compute power
┌─────────┐ ┌────────────┐ ┌───────────┐
│ Sources │────►│ Target │────►│ Transform │
│(DB, API)│ │(Warehouse) │ │ (SQL) │
└─────────┘ └────────────┘ └───────────┘
Example:
-- 1. Load raw data (just copy)
COPY orders FROM 's3://bucket/orders.csv';
-- 2. Transform in warehouse (fast!)
CREATE TABLE sales_summary AS
SELECT
DATE(created_at) as date,
u.region,
SUM(o.total) as total_sales,
COUNT(*) as order_count
FROM orders o
JOIN users u ON o.user_id = u.id
WHERE created_at >= CURRENT_DATE - 1
GROUP BY 1, 2;
Benefit: Leverage warehouse compute, transform at scale
Tools: Snowflake, BigQuery, Redshift, dbt
Use when: Need low-latency processing
┌─────────┐ ┌────────┐ ┌───────────┐
│ Sources │────►│ Kafka │────►│ Processor │
│(Events) │ │(Buffer)│ │ (Flink) │
└─────────┘ └────────┘ └───────────┘
Example: Real-time fraud detection
# Stream processor
stream = kafka.subscribe('transactions')
for transaction in stream:
# Process each transaction in real-time
risk_score = fraud_model.predict(transaction)
if risk_score > 0.8:
# Flag for review
alert_service.notify('High risk transaction', transaction)
db.execute("UPDATE transactions SET status = 'review' WHERE id = ?",
transaction.id)
Tools: Apache Kafka, Flink, Spark Streaming, AWS Kinesis
Database:
Queues:
Application:
# Example alert rules
alerts:
- name: HighDatabaseLatency
condition: db.query.p95 > 200ms for 5 minutes
severity: warning
- name: HighReplicationLag
condition: db.replication_lag > 10s for 2 minutes
severity: critical
- name: LowCacheHitRate
condition: cache.hit_rate < 70% for 10 minutes
severity: warning
- name: QueueBacklog
condition: queue.depth > 10000 for 5 minutes
severity: warning
1. Dual Writes:
# Phase 1: Write to both old and new DB
def create_user(data):
user_id = old_db.insert("INSERT INTO users ...", data)
new_db.insert("INSERT INTO users ...", data) # Also write to new
return user_id
# Phase 2: Backfill historical data
# (Run in background)
for user in old_db.query("SELECT * FROM users"):
new_db.insert("INSERT INTO users ...", user)
# Phase 3: Read from new DB, verify against old
def get_user(user_id):
new_user = new_db.query("SELECT * FROM users WHERE id = ?", user_id)
old_user = old_db.query("SELECT * FROM users WHERE id = ?", user_id)
if new_user != old_user:
logger.error("Data mismatch!", user_id=user_id)
return new_user
# Phase 4: Stop writing to old DB
# Phase 5: Decommission old DB
Problem: Shard capacity full, need to split
Strategy: Virtual shards
# Instead of:
shard = user_id % 4 # Hard to change!
# Use:
virtual_shard = user_id % 1024 # Many virtual shards
physical_shard = shard_map[virtual_shard] # Map to physical shards
# shard_map initially:
# {0-255: 'shard_1', 256-511: 'shard_2', ...}
# Later, easily rebalance:
# {0-127: 'shard_1', 128-255: 'shard_5', ...}
When helping with infrastructure scaling:
## Current Architecture Analysis
### Bottlenecks Identified
- [Specific bottleneck 1]
- [Specific bottleneck 2]
### Recommended Architecture
[Architecture diagram in text]
### Components to Add
1. [Component]: [Why + how]
2. [Component]: [Why + how]
### Migration Plan
Phase 1: [Description]
- Week 1: [Tasks]
- Week 2: [Tasks]
Phase 2: [Description]
...
### Cost Impact
- Current: $[X]/month
- Projected: $[Y]/month
- ROI: [Justification]
### Risk Mitigation
- Risk 1: [Mitigation]
- Risk 2: [Mitigation]
Works with: