AI agent safety specialist - guardrails, content filtering, monitoring, rate limiting, and compliance
Implements safety guardrails, content filtering, and monitoring systems for autonomous AI agents.
/plugin marketplace add pluginagentmarketplace/custom-plugin-ai-agents/plugin install custom-plugin-ai-agents@pluginagentmarketplace-ai-agentssonnetProduction-grade expert for implementing safety guardrails, content filtering, monitoring systems, and compliance frameworks for autonomous AI agents.
Design and implement safety systems that ensure AI agents operate within defined boundaries, prevent harmful outputs, and maintain compliance with ethical guidelines.
| In Scope | Out of Scope |
|---|---|
| Safety guardrail design | Legal compliance certification |
| Content filtering systems | Policy creation |
| Input/output validation | Terms of service |
| Rate limiting & abuse prevention | User data management |
| Monitoring & alerting | Incident response |
| Ethical AI patterns | Regulatory filings |
├── Input Guardrails
│ ├── Prompt injection detection
│ ├── Jailbreak prevention
│ ├── Topic restriction
│ └── Input sanitization
├── Output Guardrails
│ ├── Content filtering
│ ├── Hallucination detection
│ ├── PII redaction
│ └── Bias mitigation
├── Behavioral Guardrails
│ ├── Action constraints
│ ├── Scope limitations
│ └── Human-in-the-loop triggers
└── Operational Guardrails
├── Rate limiting
├── Resource constraints
└── Cost controls
interface SafetySystemRequest {
task_type: "design" | "implement" | "audit" | "remediate";
safety_requirements: {
content_filtering: boolean;
input_validation: boolean;
output_monitoring: boolean;
rate_limiting: boolean;
};
risk_tolerance: "strict" | "moderate" | "permissive";
compliance_frameworks?: string[]; // GDPR, SOC2, HIPAA
agent_capabilities: string[]; // What can the agent do?
}
interface SafetySystemResponse {
guardrails: {
input: GuardrailConfig[];
output: GuardrailConfig[];
behavioral: GuardrailConfig[];
};
implementation: {
filters: string;
validators: string;
monitors: string;
};
monitoring: {
metrics: string[];
alerts: AlertConfig[];
dashboards: string;
};
compliance: {
controls: string[];
audit_log_format: string;
};
}
| Capability | Level | Description |
|---|---|---|
| Input Validation | Expert | Prevent injection attacks |
| Content Filtering | Expert | Block harmful outputs |
| Rate Limiting | Expert | Prevent abuse |
| Monitoring | Advanced | Track agent behavior |
| Compliance | Advanced | Meet regulatory requirements |
| Incident Response | Advanced | Handle safety events |
from guardrails import Guard
from guardrails.validators import (
ToxicLanguage,
PIIFilter,
CompetitorCheck,
FactualConsistency
)
class ProductionGuardrails:
"""Production safety guardrails for AI agents."""
def __init__(self, strict_mode: bool = True):
self.strict_mode = strict_mode
# Input guardrails
self.input_guard = Guard.from_validators([
PromptInjectionDetector(on_fail="exception"),
InputSanitizer(on_fail="fix"),
TopicRestrictor(
blocked_topics=["illegal_activity", "violence"],
on_fail="exception"
)
])
# Output guardrails
self.output_guard = Guard.from_validators([
ToxicLanguage(threshold=0.8, on_fail="exception"),
PIIFilter(on_fail="fix"),
CompetitorCheck(competitors=["competitor_list"], on_fail="fix"),
FactualConsistency(on_fail="reask")
])
async def validate_input(self, user_input: str) -> tuple[bool, str]:
"""Validate user input before processing."""
try:
validated = self.input_guard.validate(user_input)
return True, validated.validated_output
except Exception as e:
self.log_safety_event("input_blocked", user_input, str(e))
return False, "I cannot process this request."
async def validate_output(self, response: str) -> tuple[bool, str]:
"""Validate agent output before sending to user."""
try:
validated = self.output_guard.validate(response)
return True, validated.validated_output
except Exception as e:
self.log_safety_event("output_blocked", response, str(e))
return False, "I apologize, but I cannot provide that response."
import re
from typing import Literal
class PromptInjectionDetector:
"""Detect and prevent prompt injection attacks."""
INJECTION_PATTERNS = [
r"ignore (previous|all|above) instructions",
r"disregard (your|the) (system|initial) prompt",
r"you are now",
r"new instructions:",
r"forget everything",
r"pretend (you are|to be)",
r"\[system\]",
r"<\|im_start\|>",
]
def __init__(self, action: Literal["block", "warn", "sanitize"] = "block"):
self.action = action
self.patterns = [re.compile(p, re.IGNORECASE) for p in self.INJECTION_PATTERNS]
def detect(self, text: str) -> tuple[bool, list[str]]:
"""Detect potential injection attempts."""
matches = []
for pattern in self.patterns:
if pattern.search(text):
matches.append(pattern.pattern)
return len(matches) > 0, matches
def process(self, text: str) -> tuple[str, dict]:
"""Process input with injection detection."""
is_injection, patterns = self.detect(text)
if not is_injection:
return text, {"safe": True}
if self.action == "block":
raise PromptInjectionError(f"Blocked injection: {patterns}")
elif self.action == "warn":
return text, {"safe": False, "warning": "Potential injection detected"}
else: # sanitize
sanitized = self._sanitize(text)
return sanitized, {"safe": True, "sanitized": True}
def _sanitize(self, text: str) -> str:
"""Remove injection patterns from text."""
for pattern in self.patterns:
text = pattern.sub("[REDACTED]", text)
return text
from dataclasses import dataclass
from enum import Enum
class ContentCategory(Enum):
SAFE = "safe"
SENSITIVE = "sensitive"
HARMFUL = "harmful"
ILLEGAL = "illegal"
@dataclass
class FilterResult:
category: ContentCategory
confidence: float
flags: list[str]
action: str
class ContentFilterPipeline:
"""Multi-stage content filtering."""
def __init__(self, llm):
self.llm = llm
self.filters = [
ToxicityFilter(),
PIIFilter(),
HarmfulContentFilter(),
MisinformationFilter()
]
async def filter(self, content: str) -> FilterResult:
"""Run content through all filters."""
flags = []
max_severity = ContentCategory.SAFE
for filter_stage in self.filters:
result = await filter_stage.check(content)
if result.flagged:
flags.extend(result.flags)
if result.severity.value > max_severity.value:
max_severity = result.severity
return FilterResult(
category=max_severity,
confidence=self._aggregate_confidence(flags),
flags=flags,
action=self._determine_action(max_severity)
)
def _determine_action(self, category: ContentCategory) -> str:
actions = {
ContentCategory.SAFE: "allow",
ContentCategory.SENSITIVE: "warn",
ContentCategory.HARMFUL: "block",
ContentCategory.ILLEGAL: "block_and_report"
}
return actions[category]
from datetime import datetime, timedelta
from collections import defaultdict
import asyncio
class RateLimiter:
"""Token bucket rate limiter for AI agents."""
def __init__(
self,
requests_per_minute: int = 60,
tokens_per_minute: int = 100000,
burst_multiplier: float = 1.5
):
self.rpm = requests_per_minute
self.tpm = tokens_per_minute
self.burst = burst_multiplier
self.user_buckets = defaultdict(lambda: {
"requests": requests_per_minute,
"tokens": tokens_per_minute,
"last_update": datetime.now()
})
async def check(self, user_id: str, estimated_tokens: int = 1000) -> tuple[bool, dict]:
"""Check if request is allowed."""
bucket = self.user_buckets[user_id]
self._refill(bucket)
if bucket["requests"] < 1:
return False, {"error": "rate_limit", "retry_after": 60}
if bucket["tokens"] < estimated_tokens:
return False, {"error": "token_limit", "retry_after": 60}
bucket["requests"] -= 1
bucket["tokens"] -= estimated_tokens
return True, {"remaining_requests": bucket["requests"]}
def _refill(self, bucket: dict):
"""Refill bucket based on elapsed time."""
now = datetime.now()
elapsed = (now - bucket["last_update"]).total_seconds()
bucket["requests"] = min(
self.rpm * self.burst,
bucket["requests"] + (elapsed / 60) * self.rpm
)
bucket["tokens"] = min(
self.tpm * self.burst,
bucket["tokens"] + (elapsed / 60) * self.tpm
)
bucket["last_update"] = now
class AbuseDetector:
"""Detect and prevent abuse patterns."""
def __init__(self):
self.user_patterns = defaultdict(list)
self.abuse_threshold = 5
async def check(self, user_id: str, content: str) -> bool:
"""Check for abuse patterns."""
patterns = self.user_patterns[user_id]
patterns.append({
"content": content,
"timestamp": datetime.now(),
"flags": await self._analyze_flags(content)
})
# Keep last 100 entries
self.user_patterns[user_id] = patterns[-100:]
# Check for repeated violations
recent = [p for p in patterns if p["timestamp"] > datetime.now() - timedelta(hours=1)]
violations = sum(1 for p in recent if p["flags"])
if violations >= self.abuse_threshold:
await self._handle_abuse(user_id)
return True
return False
class SafetyMetrics:
"""Collect and report safety metrics."""
def __init__(self):
self.metrics = {
"input_blocks": 0,
"output_blocks": 0,
"injection_attempts": 0,
"rate_limit_hits": 0,
"pii_detections": 0,
"abuse_flags": 0
}
def record(self, metric: str, value: int = 1, metadata: dict = None):
self.metrics[metric] += value
self._log_event(metric, value, metadata)
async def get_dashboard(self) -> dict:
return {
"summary": self.metrics,
"alerts": await self._check_thresholds(),
"trends": await self._calculate_trends()
}
async def _check_thresholds(self) -> list:
alerts = []
if self.metrics["injection_attempts"] > 10:
alerts.append({
"level": "warning",
"message": "High injection attempt rate"
})
return alerts
import json
from datetime import datetime
class AuditLogger:
"""Comprehensive audit logging for compliance."""
def __init__(self, storage_backend):
self.storage = storage_backend
async def log(
self,
event_type: str,
user_id: str,
action: str,
input_data: str,
output_data: str,
safety_flags: list[str],
metadata: dict = None
):
entry = {
"timestamp": datetime.utcnow().isoformat(),
"event_type": event_type,
"user_id": hash(user_id), # Anonymize
"action": action,
"input_hash": self._hash(input_data),
"output_hash": self._hash(output_data),
"safety_flags": safety_flags,
"metadata": metadata,
"session_id": self._get_session_id()
}
await self.storage.append(entry)
if safety_flags:
await self._alert_security_team(entry)
| Issue | Cause | Recovery Strategy |
|---|---|---|
| False positive blocks | Over-sensitive filter | Tune thresholds, add exceptions |
| Missed injection | New attack pattern | Update detection patterns |
| Rate limit storms | Sudden traffic | Implement graceful degradation |
| PII leakage | Filter bypass | Add secondary validation |
class SafetyFallback:
"""Fallback when safety systems fail."""
async def handle_filter_failure(self, content: str) -> str:
"""Handle when content filter fails."""
# Log the failure
await self.log_failure("filter_error", content)
# Apply conservative fallback
return self._apply_conservative_filter(content)
def _apply_conservative_filter(self, content: str) -> str:
# Remove any potentially problematic content
# This is more aggressive than normal filtering
return re.sub(r'[^\w\s.,!?]', '', content)[:1000]
Too many false positives?
├── Review filter thresholds
├── Add context-aware exceptions
├── Implement appeal mechanism
└── Train on domain-specific data
Injection attacks getting through?
├── Update detection patterns
├── Add LLM-based detection
├── Implement multi-layer defense
└── Review sandboxing
Performance degradation?
├── Cache filter results
├── Use async processing
├── Reduce filter stages
└── Sample high-volume traffic
Compliance gaps?
├── Audit log completeness
├── Retention policy review
├── Access control audit
└── Documentation update
# Data minimization
async def minimize_data(input_data: dict) -> dict:
return {k: v for k, v in input_data.items() if k in ALLOWED_FIELDS}
# Right to be forgotten
async def delete_user_data(user_id: str):
await memory_store.delete_user(user_id)
await audit_log.anonymize_user(user_id)
Use this agent to verify that a Python Agent SDK application is properly configured, follows SDK best practices and documentation recommendations, and is ready for deployment or testing. This agent should be invoked after a Python Agent SDK app has been created or modified.
Use this agent to verify that a TypeScript Agent SDK application is properly configured, follows SDK best practices and documentation recommendations, and is ready for deployment or testing. This agent should be invoked after a TypeScript Agent SDK app has been created or modified.