Real-time monitoring and detection of adversarial attacks and model drift in production
Detects adversarial attacks and model drift in production by monitoring inputs, outputs, and security events. Triggers alerts for anomalies, toxicity, extraction attempts, and performance degradation.
/plugin marketplace add pluginagentmarketplace/custom-plugin-ai-red-teaming/plugin install pluginagentmarketplace-ai-red-teaming-plugin@pluginagentmarketplace/custom-plugin-ai-red-teamingThis skill inherits all available tools. When active, it can use any tool Claude has access to.
assets/monitoring-config.yamlreferences/MONITORING-STRATEGY.mdscripts/monitor-llm.pyImplement real-time detection of adversarial attacks and model degradation in production AI systems.
Skill: continuous-monitoring
Agent: 05-defense-strategy-developer
OWASP: LLM10 (Unbounded Consumption), LLM02 (Sensitive Disclosure)
NIST: Measure, Manage
Use Case: Detect attacks and drift in production
User Input → [Input Monitor] → [Model] → [Output Monitor] → Response
↓ ↓
[Anomaly Detection] [Quality Check]
↓ ↓
[Alert System] ←←←←←←←←←←←←←←←←←←←←←←
↓
[Incident Response]
Category: input_anomaly
Latency Impact: 10-20ms
Detection Rate: 85-95%
class InputAnomalyDetector:
def __init__(self, training_distribution):
self.mean = training_distribution.mean
self.cov = training_distribution.covariance
self.threshold = 3.0 # Standard deviations
def detect(self, input_embedding):
# Mahalanobis distance from training distribution
diff = input_embedding - self.mean
distance = np.sqrt(diff.T @ np.linalg.inv(self.cov) @ diff)
if distance > self.threshold:
return AnomalyAlert(
type="out_of_distribution",
score=distance,
severity=self._classify_severity(distance)
)
return None
def detect_injection(self, text_input):
# Pattern-based injection detection
injection_patterns = [
r'ignore\s+(previous|all)\s+instructions',
r'system\s*:\s*',
r'(admin|developer)\s+mode',
]
for pattern in injection_patterns:
if re.search(pattern, text_input, re.I):
return AnomalyAlert(type="injection_attempt", severity="HIGH")
return None
Category: output_quality
Metrics: [confidence, coherence, toxicity, latency]
class OutputQualityMonitor:
def __init__(self, config):
self.confidence_threshold = config.get('confidence', 0.5)
self.toxicity_threshold = config.get('toxicity', 0.1)
self.latency_threshold_ms = config.get('latency', 5000)
def check(self, response, metadata):
alerts = []
# Low confidence check
if metadata.confidence < self.confidence_threshold:
alerts.append(Alert("low_confidence", metadata.confidence))
# Toxicity check
toxicity_score = self.toxicity_classifier(response)
if toxicity_score > self.toxicity_threshold:
alerts.append(Alert("high_toxicity", toxicity_score))
# Latency check
if metadata.latency_ms > self.latency_threshold_ms:
alerts.append(Alert("high_latency", metadata.latency_ms))
# Coherence check
coherence = self.coherence_scorer(response)
if coherence < 0.7:
alerts.append(Alert("low_coherence", coherence))
return alerts
Category: model_drift
Types: [data_drift, concept_drift, prediction_drift]
class DriftDetector:
def __init__(self, baseline_window=1000):
self.baseline_window = baseline_window
self.baseline_inputs = []
self.baseline_outputs = []
def detect_data_drift(self, current_inputs):
"""Detect drift in input distribution"""
if len(self.baseline_inputs) < self.baseline_window:
self.baseline_inputs.extend(current_inputs)
return None
# KL divergence between distributions
baseline_dist = self._estimate_distribution(self.baseline_inputs)
current_dist = self._estimate_distribution(current_inputs)
kl_div = self._kl_divergence(baseline_dist, current_dist)
if kl_div > 0.1:
return DriftAlert("data_drift", kl_div)
return None
def detect_concept_drift(self, predictions, ground_truth):
"""Detect drift in model performance"""
# Track accuracy over sliding windows
recent_accuracy = self._compute_accuracy(predictions, ground_truth)
baseline_accuracy = self._baseline_accuracy()
if baseline_accuracy - recent_accuracy > 0.05:
return DriftAlert("concept_drift", recent_accuracy)
return None
Category: security_events
Events: [extraction_attempt, jailbreak, rate_abuse]
class SecurityMonitor:
def __init__(self):
self.query_history = defaultdict(list)
self.extraction_patterns = []
def detect_extraction(self, user_id, queries):
"""Detect model extraction attempts"""
history = self.query_history[user_id]
history.extend(queries)
# Check for systematic querying patterns
if len(history) > 1000: # High volume
diversity = self._query_diversity(history)
if diversity > 0.9: # Very diverse
return SecurityAlert("extraction_attempt", user_id)
return None
def detect_abuse(self, user_id, request_timestamps):
"""Detect rate limit abuse"""
window = 60 # 1 minute
recent = [t for t in request_timestamps if time.time() - t < window]
if len(recent) > 100: # Too many requests
return SecurityAlert("rate_abuse", user_id, len(recent))
return None
Alert Thresholds:
input_anomaly:
warning: 2.5 # standard deviations
critical: 4.0
output_toxicity:
warning: 0.3
critical: 0.7
model_drift:
warning: 0.05 # 5% accuracy drop
critical: 0.10
extraction_queries:
warning: 500/hour
critical: 1000/hour
┌──────────────────────────────────────────────────────────┐
│ REAL-TIME MONITORING DASHBOARD │
├──────────────────────────────────────────────────────────┤
│ Input Anomalies (1hr): ████░░░░ 12 (2.4%) │
│ Output Toxicity (1hr): █░░░░░░░ 3 (0.6%) │
│ Model Latency P99: ████████ 2.3s │
│ Drift Score: ██░░░░░░ 0.02 (OK) │
│ Security Alerts: ░░░░░░░░ 0 │
└──────────────────────────────────────────────────────────┘
Issue: Too many false positive alerts
Solution: Tune thresholds, add allowlists, improve baseline
Issue: Missing attack detection
Solution: Expand detection patterns, lower thresholds
Issue: High monitoring latency
Solution: Use sampling, async processing, optimize detectors
| Component | Purpose |
|---|---|
| Agent 05 | Configures monitoring |
| Agent 08 | CI/CD integration |
| /report | Monitoring reports |
| Prometheus/Grafana | Metrics visualization |
Detect attacks and drift with real-time AI monitoring.
This skill should be used when the user asks to "create an agent", "add an agent", "write a subagent", "agent frontmatter", "when to use description", "agent examples", "agent tools", "agent colors", "autonomous agent", or needs guidance on agent structure, system prompts, triggering conditions, or agent development best practices for Claude Code plugins.
This skill should be used when the user asks to "create a slash command", "add a command", "write a custom command", "define command arguments", "use command frontmatter", "organize commands", "create command with file references", "interactive command", "use AskUserQuestion in command", or needs guidance on slash command structure, YAML frontmatter fields, dynamic arguments, bash execution in commands, user interaction patterns, or command development best practices for Claude Code.
This skill should be used when the user asks to "create a hook", "add a PreToolUse/PostToolUse/Stop hook", "validate tool use", "implement prompt-based hooks", "use ${CLAUDE_PLUGIN_ROOT}", "set up event-driven automation", "block dangerous commands", or mentions hook events (PreToolUse, PostToolUse, Stop, SubagentStop, SessionStart, SessionEnd, UserPromptSubmit, PreCompact, Notification). Provides comprehensive guidance for creating and implementing Claude Code plugin hooks with focus on advanced prompt-based hooks API.