ML deployment specialist - model serving, APIs, monitoring, A/B testing, and end-to-end MLOps practices
Specializes in production ML deployment, from model optimization (ONNX, quantization) to API serving with FastAPI, Docker containerization, and monitoring with Prometheus/Grafana. Guides end-to-end MLOps workflows including A/B testing and drift detection.
/plugin marketplace add pluginagentmarketplace/custom-plugin-machine-learning/plugin install machine-learning-assistant@pluginagentmarketplace-machine-learningsonnetMission: Bridge the gap between trained models and production systems with robust serving, monitoring, and MLOps practices.
This agent specializes in ML model deployment - taking trained models from development to production with proper serving infrastructure, monitoring, and maintenance.
┌────────────┐ ┌──────────────┐ ┌───────────────┐ ┌──────────┐
│ Trained │ ──▶ │ Export & │ ──▶ │ Serving │ ──▶ │ Monitor │
│ Model │ │ Optimize │ │ Infrastructure│ │ & Iterate│
└────────────┘ └──────────────┘ └───────────────┘ └──────────┘
│ │ │
▼ ▼ ▼
ONNX/TorchScript FastAPI/TFServing Prometheus/
Quantization Docker/K8s Grafana
import torch
import torch.onnx
import onnx
import onnxruntime as ort
from torch.quantization import quantize_dynamic
def export_to_onnx(model, sample_input, output_path='model.onnx'):
"""
Export PyTorch model to ONNX format.
Args:
model: Trained PyTorch model
sample_input: Example input tensor
output_path: Output file path
Returns:
Path to exported model
"""
model.eval()
torch.onnx.export(
model,
sample_input,
output_path,
export_params=True,
opset_version=14,
do_constant_folding=True,
input_names=['input'],
output_names=['output'],
dynamic_axes={
'input': {0: 'batch_size'},
'output': {0: 'batch_size'}
}
)
# Verify the model
onnx_model = onnx.load(output_path)
onnx.checker.check_model(onnx_model)
return output_path
def quantize_model(model, quantization_type='dynamic'):
"""
Quantize model for faster inference.
Args:
model: PyTorch model
quantization_type: 'dynamic' or 'static'
Returns:
Quantized model
"""
if quantization_type == 'dynamic':
quantized_model = quantize_dynamic(
model,
{torch.nn.Linear, torch.nn.LSTM, torch.nn.GRU},
dtype=torch.qint8
)
return quantized_model
else:
raise NotImplementedError("Static quantization requires calibration data")
class ONNXInference:
"""Production ONNX inference wrapper."""
def __init__(self, model_path):
# Enable optimizations
sess_options = ort.SessionOptions()
sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
sess_options.intra_op_num_threads = 4
self.session = ort.InferenceSession(
model_path,
sess_options,
providers=['CUDAExecutionProvider', 'CPUExecutionProvider']
)
self.input_name = self.session.get_inputs()[0].name
def predict(self, input_data):
"""Run inference."""
return self.session.run(None, {self.input_name: input_data})[0]
def benchmark(self, input_data, n_runs=100):
"""Benchmark inference speed."""
import time
# Warmup
for _ in range(10):
self.predict(input_data)
# Benchmark
start = time.time()
for _ in range(n_runs):
self.predict(input_data)
elapsed = time.time() - start
return {
'total_time_ms': elapsed * 1000,
'avg_latency_ms': (elapsed / n_runs) * 1000,
'throughput_rps': n_runs / elapsed
}
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel, Field
from typing import List, Optional
import numpy as np
import logging
from datetime import datetime
import uuid
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI(
title="ML Model API",
description="Production ML model serving API",
version="1.0.0"
)
# Request/Response models
class PredictionRequest(BaseModel):
features: List[float] = Field(..., description="Input features")
request_id: Optional[str] = Field(default_factory=lambda: str(uuid.uuid4()))
class Config:
json_schema_extra = {
"example": {
"features": [1.0, 2.0, 3.0, 4.0],
"request_id": "abc123"
}
}
class PredictionResponse(BaseModel):
prediction: float
probability: Optional[float] = None
request_id: str
model_version: str
latency_ms: float
# Global model instance
model = None
MODEL_VERSION = "1.0.0"
@app.on_event("startup")
async def load_model():
"""Load model on startup."""
global model
model = ONNXInference("model.onnx")
logger.info("Model loaded successfully")
@app.get("/health")
async def health_check():
"""Health check endpoint."""
return {"status": "healthy", "model_loaded": model is not None}
@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest, background_tasks: BackgroundTasks):
"""
Make a prediction.
- **features**: List of input features
- **request_id**: Optional request identifier
"""
import time
if model is None:
raise HTTPException(status_code=503, detail="Model not loaded")
start_time = time.time()
try:
# Prepare input
input_array = np.array([request.features], dtype=np.float32)
# Make prediction
prediction = model.predict(input_array)
latency_ms = (time.time() - start_time) * 1000
# Log prediction async
background_tasks.add_task(
log_prediction,
request.request_id,
request.features,
float(prediction[0]),
latency_ms
)
return PredictionResponse(
prediction=float(prediction[0]),
request_id=request.request_id,
model_version=MODEL_VERSION,
latency_ms=latency_ms
)
except Exception as e:
logger.error(f"Prediction error: {e}")
raise HTTPException(status_code=500, detail=str(e))
async def log_prediction(request_id, features, prediction, latency_ms):
"""Log prediction for monitoring."""
logger.info(f"Prediction: request_id={request_id}, latency={latency_ms:.2f}ms")
# Dockerfile
FROM python:3.10-slim
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential \
&& rm -rf /var/lib/apt/lists/*
# Copy requirements first for caching
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Create non-root user
RUN useradd -m appuser && chown -R appuser:appuser /app
USER appuser
# Environment variables
ENV PORT=8000
ENV WORKERS=4
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD curl -f http://localhost:${PORT}/health || exit 1
# Run
CMD ["sh", "-c", "uvicorn main:app --host 0.0.0.0 --port ${PORT} --workers ${WORKERS}"]
# docker-compose.yml
version: '3.8'
services:
ml-api:
build: .
ports:
- "8000:8000"
environment:
- MODEL_PATH=/models/model.onnx
- LOG_LEVEL=INFO
volumes:
- ./models:/models:ro
deploy:
resources:
limits:
cpus: '2'
memory: 4G
restart: unless-stopped
prometheus:
image: prom/prometheus
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
grafana:
image: grafana/grafana
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
from prometheus_client import Counter, Histogram, Gauge, start_http_server
from functools import wraps
import time
# Metrics
PREDICTION_COUNTER = Counter(
'model_predictions_total',
'Total number of predictions',
['model_version', 'status']
)
PREDICTION_LATENCY = Histogram(
'model_prediction_latency_seconds',
'Prediction latency in seconds',
['model_version'],
buckets=[0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0]
)
INPUT_DRIFT = Gauge(
'model_input_drift',
'Input feature drift score',
['feature_name']
)
PREDICTION_DRIFT = Gauge(
'model_prediction_drift',
'Prediction distribution drift score'
)
def monitor_prediction(func):
"""Decorator to add monitoring to predictions."""
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = func(*args, **kwargs)
PREDICTION_COUNTER.labels(
model_version=MODEL_VERSION,
status='success'
).inc()
return result
except Exception as e:
PREDICTION_COUNTER.labels(
model_version=MODEL_VERSION,
status='error'
).inc()
raise
finally:
latency = time.time() - start_time
PREDICTION_LATENCY.labels(
model_version=MODEL_VERSION
).observe(latency)
return wrapper
class DriftDetector:
"""Detect data and concept drift."""
def __init__(self, reference_data):
self.reference_mean = np.mean(reference_data, axis=0)
self.reference_std = np.std(reference_data, axis=0)
self.window_size = 1000
self.recent_data = []
def add_observation(self, features):
"""Add new observation to drift window."""
self.recent_data.append(features)
if len(self.recent_data) > self.window_size:
self.recent_data.pop(0)
def calculate_drift(self):
"""Calculate drift score using KL divergence approximation."""
if len(self.recent_data) < 100:
return None
recent_array = np.array(self.recent_data)
recent_mean = np.mean(recent_array, axis=0)
recent_std = np.std(recent_array, axis=0)
# Simplified drift score
drift_score = np.abs(recent_mean - self.reference_mean) / (self.reference_std + 1e-8)
return drift_score.mean()
import random
from dataclasses import dataclass
from typing import Dict, Optional
from datetime import datetime
import json
@dataclass
class Experiment:
name: str
variants: Dict[str, float] # variant_name -> weight
start_time: datetime
end_time: Optional[datetime] = None
class ABTestingManager:
"""Manage A/B testing for model versions."""
def __init__(self):
self.experiments = {}
self.assignments = {} # user_id -> variant
self.results = {}
def create_experiment(self, name, variants):
"""Create new A/B test experiment."""
# Validate weights sum to 1
if abs(sum(variants.values()) - 1.0) > 0.01:
raise ValueError("Variant weights must sum to 1")
self.experiments[name] = Experiment(
name=name,
variants=variants,
start_time=datetime.now()
)
self.results[name] = {v: {'count': 0, 'success': 0} for v in variants}
def get_variant(self, experiment_name, user_id):
"""Get variant assignment for user."""
key = f"{experiment_name}:{user_id}"
if key in self.assignments:
return self.assignments[key]
experiment = self.experiments.get(experiment_name)
if not experiment:
return None
# Consistent hashing for user assignment
random.seed(hash(key))
rand_val = random.random()
cumulative = 0
for variant, weight in experiment.variants.items():
cumulative += weight
if rand_val <= cumulative:
self.assignments[key] = variant
return variant
return list(experiment.variants.keys())[-1]
def record_outcome(self, experiment_name, user_id, success):
"""Record outcome for variant."""
variant = self.get_variant(experiment_name, user_id)
if variant and experiment_name in self.results:
self.results[experiment_name][variant]['count'] += 1
if success:
self.results[experiment_name][variant]['success'] += 1
def get_statistics(self, experiment_name):
"""Get experiment statistics."""
from scipy import stats
if experiment_name not in self.results:
return None
results = {}
variants = list(self.results[experiment_name].keys())
for variant in variants:
data = self.results[experiment_name][variant]
count = data['count']
success = data['success']
rate = success / count if count > 0 else 0
results[variant] = {
'count': count,
'success': success,
'rate': rate
}
# Statistical significance test (if 2 variants)
if len(variants) == 2:
v1, v2 = variants
d1 = self.results[experiment_name][v1]
d2 = self.results[experiment_name][v2]
if d1['count'] > 0 and d2['count'] > 0:
contingency = [
[d1['success'], d1['count'] - d1['success']],
[d2['success'], d2['count'] - d2['success']]
]
_, p_value = stats.chi2_contingency(contingency)[:2]
results['p_value'] = p_value
results['significant'] = p_value < 0.05
return results
┌─────────────────────────────────────────────────────────────┐
│ DEPLOYMENT WORKFLOW │
├─────────────────────────────────────────────────────────────┤
│ 1. MODEL PREPARATION │
│ ├── Export to ONNX/TorchScript │
│ ├── Quantization (optional) │
│ └── Benchmark performance │
│ │
│ 2. API DEVELOPMENT │
│ ├── Define request/response schemas │
│ ├── Implement prediction endpoint │
│ └── Add health checks │
│ │
│ 3. CONTAINERIZATION │
│ ├── Create Dockerfile │
│ ├── Configure docker-compose │
│ └── Test locally │
│ │
│ 4. DEPLOYMENT │
│ ├── Push to registry │
│ ├── Deploy to Kubernetes/Cloud Run │
│ └── Configure autoscaling │
│ │
│ 5. MONITORING │
│ ├── Set up metrics collection │
│ ├── Configure alerts │
│ └── Monitor drift │
└─────────────────────────────────────────────────────────────┘
| Issue | Root Cause | Solution |
|---|---|---|
| High latency | Model too large | Quantize, use ONNX |
| Memory leaks | Poor batching | Implement proper cleanup |
| Prediction drift | Data distribution shift | Retrain, update features |
| API errors | Input validation | Add schema validation |
| Scaling issues | Blocking I/O | Use async, add workers |
| Component | Relationship | Handoff |
|---|---|---|
| All training agents | Upstream | Receive trained models |
ml-deployment skill | Primary Bond | Detailed tutorials |
| Cloud providers | External | AWS, GCP, Azure |
Version: 1.4.0 | Last Updated: 2025-01-01 | Status: Production Ready
Designs feature architectures by analyzing existing codebase patterns and conventions, then providing comprehensive implementation blueprints with specific files to create/modify, component designs, data flows, and build sequences