Scan API for security vulnerabilities
Performs comprehensive security scanning to identify OWASP API vulnerabilities and misconfigurations.
/plugin marketplace add jeremylongshore/claude-code-plugins-plus-skills/plugin install api-test-automation@claude-code-plugins-plusPerform comprehensive automated security scanning to identify OWASP API Security Top 10 vulnerabilities, misconfigurations, and potential attack vectors with detailed remediation guidance.
Use /scan-api-security when you need to:
DON'T use this when:
This command implements OWASP ZAP + Custom Scanners as the primary approach because:
Alternative considered: Burp Suite
Alternative considered: Manual testing only
Before running this command:
Set up OWASP ZAP or similar tools with API-specific rules and authentication.
Run comprehensive automated scans for known vulnerability patterns.
Verify critical findings and test business logic vulnerabilities.
Review findings, eliminate false positives, and prioritize by severity.
Create detailed report with findings, evidence, and remediation steps.
The command generates:
security-report.html - Executive summary with chartsvulnerabilities.json - Machine-readable findingsevidence/ - Screenshots and request/response logsremediation-guide.md - Fix recommendations by prioritysecurity-tests.py - Regression tests for found issuescompliance-checklist.md - Standards compliance status// security-scanner.js
const ZAPClient = require('zaproxy');
const axios = require('axios');
const jwt = require('jsonwebtoken');
const { createHash } = require('crypto');
class APISecurityScanner {
constructor(apiUrl, options = {}) {
this.apiUrl = apiUrl;
this.zapOptions = {
proxy: options.zapProxy || 'http://localhost:8080',
apiKey: options.zapApiKey || 'your-zap-api-key'
};
this.zap = new ZAPClient(this.zapOptions);
this.findings = [];
this.credentials = options.credentials || {};
}
async runComprehensiveScan() {
console.log('Starting comprehensive API security scan...');
try {
// Phase 1: Authentication Testing
await this.testAuthentication();
// Phase 2: Authorization Testing
await this.testAuthorization();
// Phase 3: Input Validation
await this.testInputValidation();
// Phase 4: Data Exposure
await this.testDataExposure();
// Phase 5: Rate Limiting
await this.testRateLimiting();
// Phase 6: Security Headers
await this.testSecurityHeaders();
// Phase 7: OWASP Top 10 API
await this.testOWASPTop10();
// Generate report
return this.generateReport();
} catch (error) {
console.error('Security scan failed:', error);
throw error;
}
}
async testAuthentication() {
console.log('Testing authentication mechanisms...');
const tests = [
// Test 1: Broken Authentication
{
name: 'JWT Algorithm Confusion',
test: async () => {
const token = jwt.sign({ user: 'admin' }, 'secret', { algorithm: 'HS256' });
const modifiedToken = token.replace('HS256', 'none');
try {
const response = await axios.get(`${this.apiUrl}/protected`, {
headers: { Authorization: `Bearer ${modifiedToken}` }
});
if (response.status === 200) {
this.addFinding({
severity: 'CRITICAL',
category: 'Authentication',
title: 'JWT Algorithm Confusion Vulnerability',
description: 'API accepts JWT tokens with "none" algorithm',
evidence: { token: modifiedToken, response: response.data },
remediation: 'Explicitly verify JWT algorithm, reject "none"'
});
}
} catch (error) {
// Expected behavior - authentication should fail
}
}
},
// Test 2: Weak Password Policy
{
name: 'Weak Password Policy',
test: async () => {
const weakPasswords = ['123456', 'password', 'admin'];
for (const password of weakPasswords) {
try {
const response = await axios.post(`${this.apiUrl}/register`, {
username: 'testuser',
password: password
});
if (response.status === 201) {
this.addFinding({
severity: 'HIGH',
category: 'Authentication',
title: 'Weak Password Policy',
description: `API accepts weak password: "${password}"`,
evidence: { password, response: response.status },
remediation: 'Implement strong password requirements'
});
}
} catch (error) {
// Good - weak password rejected
}
}
}
},
// Test 3: Session Fixation
{
name: 'Session Fixation',
test: async () => {
const fixedSession = 'fixed-session-id-12345';
try {
// Try to set a fixed session ID
const response = await axios.post(`${this.apiUrl}/login`,
{ username: 'user', password: 'pass' },
{ headers: { 'Cookie': `sessionId=${fixedSession}` } }
);
const setCookie = response.headers['set-cookie'];
if (setCookie && setCookie.includes(fixedSession)) {
this.addFinding({
severity: 'HIGH',
category: 'Authentication',
title: 'Session Fixation Vulnerability',
description: 'API accepts client-provided session IDs',
evidence: { providedSession: fixedSession, setCookie },
remediation: 'Always generate new session IDs on login'
});
}
} catch (error) {
// Expected - login might fail
}
}
}
];
for (const test of tests) {
try {
await test.test();
} catch (error) {
console.error(`Test "${test.name}" failed:`, error.message);
}
}
}
async testAuthorization() {
console.log('Testing authorization controls...');
// Test IDOR vulnerabilities
const userTokens = {
user1: await this.getAuthToken('user1', 'password1'),
user2: await this.getAuthToken('user2', 'password2')
};
// Try to access user2's data with user1's token
try {
const response = await axios.get(`${this.apiUrl}/users/user2/profile`, {
headers: { Authorization: `Bearer ${userTokens.user1}` }
});
if (response.status === 200) {
this.addFinding({
severity: 'CRITICAL',
category: 'Authorization',
title: 'Insecure Direct Object Reference (IDOR)',
description: 'User can access other users\' private data',
evidence: {
authenticatedAs: 'user1',
accessedData: 'user2/profile',
response: response.data
},
remediation: 'Implement proper authorization checks for all resources'
});
}
} catch (error) {
// Good - access denied
}
// Test privilege escalation
try {
const response = await axios.post(`${this.apiUrl}/admin/users`,
{ role: 'admin' },
{ headers: { Authorization: `Bearer ${userTokens.user1}` } }
);
if (response.status === 200) {
this.addFinding({
severity: 'CRITICAL',
category: 'Authorization',
title: 'Privilege Escalation',
description: 'Regular user can perform admin actions',
evidence: { endpoint: '/admin/users', response: response.status },
remediation: 'Implement role-based access control (RBAC)'
});
}
} catch (error) {
// Expected - should be forbidden
}
}
async testInputValidation() {
console.log('Testing input validation...');
const injectionPayloads = {
sql: ["' OR '1'='1", "admin'--", "1; DROP TABLE users--"],
nosql: ['{"$gt": ""}', '{"$ne": null}', '{"$regex": ".*"}'],
command: ['| ls -la', '; cat /etc/passwd', '`whoami`'],
xxe: ['<!DOCTYPE foo [<!ENTITY xxe SYSTEM "file:///etc/passwd">]>'],
xss: ['<script>alert(1)</script>', 'javascript:alert(1)', '<img src=x onerror=alert(1)>']
};
for (const [type, payloads] of Object.entries(injectionPayloads)) {
for (const payload of payloads) {
try {
const response = await axios.post(`${this.apiUrl}/search`, {
query: payload
});
// Check if payload appears unescaped in response
if (response.data && JSON.stringify(response.data).includes(payload)) {
this.addFinding({
severity: 'CRITICAL',
category: 'Injection',
title: `${type.toUpperCase()} Injection Vulnerability`,
description: `API vulnerable to ${type} injection`,
evidence: { payload, response: response.data },
remediation: `Implement proper input validation and parameterized queries`
});
}
} catch (error) {
// Error might indicate successful injection prevention
}
}
}
}
async testDataExposure() {
console.log('Testing for excessive data exposure...');
try {
const response = await axios.get(`${this.apiUrl}/users`);
if (response.data && Array.isArray(response.data)) {
const sensitiveFields = ['password', 'ssn', 'creditCard', 'apiKey', 'secret'];
const exposedFields = [];
response.data.forEach(user => {
sensitiveFields.forEach(field => {
if (user[field] !== undefined) {
exposedFields.push(field);
}
});
});
if (exposedFields.length > 0) {
this.addFinding({
severity: 'HIGH',
category: 'Data Exposure',
title: 'Sensitive Data Exposure',
description: 'API returns sensitive fields in responses',
evidence: { exposedFields: [...new Set(exposedFields)] },
remediation: 'Filter sensitive fields from API responses'
});
}
}
} catch (error) {
console.error('Data exposure test failed:', error.message);
}
}
async testRateLimiting() {
console.log('Testing rate limiting...');
const endpoint = `${this.apiUrl}/login`;
const requests = [];
const requestCount = 100;
// Send rapid requests
for (let i = 0; i < requestCount; i++) {
requests.push(
axios.post(endpoint, {
username: 'test',
password: `attempt${i}`
}).catch(err => ({ status: err.response?.status }))
);
}
const responses = await Promise.all(requests);
const successfulRequests = responses.filter(r => r.status !== 429).length;
if (successfulRequests === requestCount) {
this.addFinding({
severity: 'HIGH',
category: 'Rate Limiting',
title: 'Missing Rate Limiting',
description: 'API endpoints lack rate limiting protection',
evidence: {
endpoint,
requestsSent: requestCount,
successfulRequests
},
remediation: 'Implement rate limiting on all endpoints'
});
}
}
async testSecurityHeaders() {
console.log('Testing security headers...');
try {
const response = await axios.get(this.apiUrl);
const headers = response.headers;
const requiredHeaders = {
'x-content-type-options': 'nosniff',
'x-frame-options': 'DENY',
'x-xss-protection': '1; mode=block',
'strict-transport-security': 'max-age=31536000',
'content-security-policy': null // Just check existence
};
const missingHeaders = [];
for (const [header, expectedValue] of Object.entries(requiredHeaders)) {
if (!headers[header]) {
missingHeaders.push(header);
} else if (expectedValue && headers[header] !== expectedValue) {
missingHeaders.push(`${header} (incorrect value)`);
}
}
if (missingHeaders.length > 0) {
this.addFinding({
severity: 'MEDIUM',
category: 'Security Headers',
title: 'Missing Security Headers',
description: 'Important security headers are missing',
evidence: { missingHeaders },
remediation: 'Add all recommended security headers'
});
}
} catch (error) {
console.error('Security headers test failed:', error.message);
}
}
async testOWASPTop10() {
console.log('Running OWASP API Security Top 10 tests...');
// Use ZAP for comprehensive scanning
await this.zap.core.newSession('api-security-scan', true);
await this.zap.core.setMode('attack');
// Configure context
const contextId = await this.zap.context.newContext('API Context');
await this.zap.context.includeInContext(contextId, `${this.apiUrl}.*`);
// Run active scan
const scanId = await this.zap.ascan.scan(this.apiUrl, true, true);
// Wait for scan completion
let progress = 0;
while (progress < 100) {
progress = await this.zap.ascan.status(scanId);
await this.delay(5000);
console.log(`Scan progress: ${progress}%`);
}
// Get results
const alerts = await this.zap.core.alerts(this.apiUrl);
alerts.forEach(alert => {
this.addFinding({
severity: this.mapZAPSeverity(alert.risk),
category: 'OWASP Scan',
title: alert.name,
description: alert.description,
evidence: {
url: alert.url,
param: alert.param,
attack: alert.attack,
evidence: alert.evidence
},
remediation: alert.solution
});
});
}
addFinding(finding) {
this.findings.push({
...finding,
timestamp: new Date().toISOString(),
id: createHash('md5').update(JSON.stringify(finding)).digest('hex')
});
}
mapZAPSeverity(risk) {
const mapping = {
'High': 'CRITICAL',
'Medium': 'HIGH',
'Low': 'MEDIUM',
'Informational': 'LOW'
};
return mapping[risk] || 'MEDIUM';
}
async getAuthToken(username, password) {
try {
const response = await axios.post(`${this.apiUrl}/login`, {
username,
password
});
return response.data.token;
} catch (error) {
console.error(`Failed to authenticate ${username}`);
return null;
}
}
delay(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
generateReport() {
const report = {
scanDate: new Date().toISOString(),
targetAPI: this.apiUrl,
summary: {
total: this.findings.length,
critical: this.findings.filter(f => f.severity === 'CRITICAL').length,
high: this.findings.filter(f => f.severity === 'HIGH').length,
medium: this.findings.filter(f => f.severity === 'MEDIUM').length,
low: this.findings.filter(f => f.severity === 'LOW').length
},
findings: this.findings,
recommendations: this.generateRecommendations()
};
// Save report
require('fs').writeFileSync(
'security-report.json',
JSON.stringify(report, null, 2)
);
console.log('\nSecurity Scan Complete!');
console.log(`Total findings: ${report.summary.total}`);
console.log(`Critical: ${report.summary.critical}`);
console.log(`High: ${report.summary.high}`);
return report;
}
generateRecommendations() {
const recommendations = [];
if (this.findings.some(f => f.category === 'Authentication')) {
recommendations.push({
priority: 1,
title: 'Strengthen Authentication',
actions: [
'Implement MFA for sensitive operations',
'Use secure session management',
'Enforce strong password policies'
]
});
}
if (this.findings.some(f => f.category === 'Authorization')) {
recommendations.push({
priority: 1,
title: 'Implement Proper Authorization',
actions: [
'Use role-based access control (RBAC)',
'Validate user permissions for each request',
'Implement resource-level authorization'
]
});
}
return recommendations;
}
}
// Usage
const scanner = new APISecurityScanner('https://api.example.com', {
credentials: {
admin: { username: 'admin', password: 'admin123' },
user: { username: 'user', password: 'user123' }
},
zapProxy: 'http://localhost:8080',
zapApiKey: 'your-zap-api-key'
});
scanner.runComprehensiveScan()
.then(report => {
console.log('Security scan completed successfully');
// Send report via email or integrate with issue tracker
})
.catch(error => {
console.error('Security scan failed:', error);
process.exit(1);
});
# api_security_scanner.py
import requests
import json
import hashlib
import time
from typing import Dict, List, Any
from dataclasses import dataclass, asdict
from enum import Enum
import jwt
import base64
from urllib.parse import urlparse
class Severity(Enum):
CRITICAL = "CRITICAL"
HIGH = "HIGH"
MEDIUM = "MEDIUM"
LOW = "LOW"
INFO = "INFO"
@dataclass
class SecurityFinding:
severity: Severity
category: str
title: str
description: str
evidence: Dict[str, Any]
remediation: str
cwe_id: str = None
owasp_category: str = None
class APISecurityTester:
def __init__(self, base_url: str, auth_token: str = None):
self.base_url = base_url
self.session = requests.Session()
if auth_token:
self.session.headers['Authorization'] = f'Bearer {auth_token}'
self.findings: List[SecurityFinding] = []
def run_security_tests(self):
"""Run comprehensive security test suite."""
print("Starting API Security Testing...")
test_suites = [
self.test_broken_authentication,
self.test_broken_authorization,
self.test_excessive_data_exposure,
self.test_lack_of_resources_rate_limiting,
self.test_security_misconfiguration,
self.test_injection_vulnerabilities,
self.test_improper_assets_management,
self.test_insufficient_logging
]
for test_suite in test_suites:
try:
test_suite()
except Exception as e:
print(f"Test suite failed: {e}")
return self.generate_report()
def test_injection_vulnerabilities(self):
"""Test for various injection vulnerabilities."""
print("Testing for injection vulnerabilities...")
# SQL Injection payloads
sql_payloads = [
"' OR '1'='1",
"admin'--",
"' UNION SELECT * FROM users--",
"1' AND '1' = '1"
]
# Test each endpoint with injection payloads
endpoints = ['/search', '/users', '/products']
for endpoint in endpoints:
for payload in sql_payloads:
try:
response = self.session.get(
f"{self.base_url}{endpoint}",
params={'q': payload}
)
# Check for SQL error messages in response
error_indicators = [
'SQL syntax',
'mysql_fetch',
'ORA-01',
'PostgreSQL',
'SQLite'
]
response_text = response.text.lower()
for indicator in error_indicators:
if indicator.lower() in response_text:
self.add_finding(
severity=Severity.CRITICAL,
category="Injection",
title="SQL Injection Vulnerability",
description=f"Endpoint {endpoint} vulnerable to SQL injection",
evidence={
"endpoint": endpoint,
"payload": payload,
"indicator": indicator
},
remediation="Use parameterized queries",
cwe_id="CWE-89"
)
break
except Exception as e:
pass
def test_broken_authentication(self):
"""Test for authentication vulnerabilities."""
print("Testing authentication mechanisms...")
# Test JWT vulnerabilities
test_token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c"
# Try token with 'none' algorithm
header = base64.urlsafe_b64encode(
json.dumps({"alg": "none", "typ": "JWT"}).encode()
).decode().rstrip('=')
payload = test_token.split('.')[1]
none_token = f"{header}.{payload}."
response = self.session.get(
f"{self.base_url}/profile",
headers={'Authorization': f'Bearer {none_token}'}
)
if response.status_code == 200:
self.add_finding(
severity=Severity.CRITICAL,
category="Authentication",
title="JWT None Algorithm Vulnerability",
description="API accepts JWT tokens with 'none' algorithm",
evidence={"token": none_token[:50] + "..."},
remediation="Explicitly verify JWT algorithm",
cwe_id="CWE-347"
)
def add_finding(self, **kwargs):
"""Add a security finding to the results."""
finding = SecurityFinding(**kwargs)
self.findings.append(finding)
print(f" Found: {finding.title} ({finding.severity.value})")
def generate_report(self) -> Dict[str, Any]:
"""Generate comprehensive security report."""
report = {
"scan_date": time.strftime("%Y-%m-%d %H:%M:%S"),
"target_api": self.base_url,
"total_findings": len(self.findings),
"severity_breakdown": {
"CRITICAL": len([f for f in self.findings if f.severity == Severity.CRITICAL]),
"HIGH": len([f for f in self.findings if f.severity == Severity.HIGH]),
"MEDIUM": len([f for f in self.findings if f.severity == Severity.MEDIUM]),
"LOW": len([f for f in self.findings if f.severity == Severity.LOW])
},
"findings": [asdict(f) for f in self.findings]
}
# Save report
with open("security_report.json", "w") as f:
json.dump(report, f, indent=2)
print(f"\nSecurity scan complete. Found {len(self.findings)} issues.")
return report
# Usage example
if __name__ == "__main__":
scanner = APISecurityTester("https://api.example.com")
report = scanner.run_security_tests()
print(f"Report saved to security_report.json")
| Error | Cause | Solution |
|---|---|---|
| "Connection refused to ZAP" | ZAP proxy not running | Start ZAP daemon on configured port |
| "Permission denied" | No authorization for security testing | Obtain written permission before scanning |
| "Rate limited during scan" | Too many requests | Reduce scan speed, add delays |
| "False positive findings" | Overly aggressive rules | Manually verify and tune scanner rules |
| "Incomplete scan results" | Scan timeout | Increase timeout, scan in phases |
Scan Modes
passive: Non-intrusive scanning onlyactive: Full vulnerability testingtargeted: Focus on specific vulnerabilitiescompliance: Check against standardsAuthentication Types
bearer: JWT/OAuth tokensbasic: Username/passwordapikey: API key authenticationcertificate: Client certificatesDO:
DON'T:
OWASP API Security Top 10 (2023)
/api-authentication-builder - Implement secure authentication/api-rate-limiter - Add rate limiting protection/api-monitoring-dashboard - Monitor security events/api-response-validator - Validate API responses