Detects shadow API endpoints running outside documented OpenAPI specs via traffic analysis, code scanning, and discovery tools. Useful for API security audits and inventory.
npx claudepluginhub killvxk/cybersecurity-skills-zhThis skill uses the workspace's default tool permissions.
影子API(Shadow API)是在组织环境中运行但未被追踪、记录或保护的API端点。它们来源于快速开发周期、被遗忘的测试环境、仍在运行的已废弃API版本、第三方集成,或未经治理部署的开发人员个人项目。影子API绕过认证和监控控制,为攻击者创造隐藏入口。研究表明,大型组织中多达30%的API端点未被记录,使得影子API检测成为API安全态势管理(API Security Posture Management)的关键组成部分。
Detects shadow API endpoints outside documented specs via traffic analysis against OpenAPI, code scanning, and discovery platforms. For API security audits and threat hunting.
Discovers shadow API endpoints operating outside documented specs using traffic analysis against OpenAPI, code scanning, and discovery platforms. For API security audits and attack surface management.
Discovers all API endpoints including documented, undocumented, shadow, zombie, and deprecated ones via passive traffic analysis, active scanning, DNS enumeration, JavaScript parsing, and cloud resource inventory. Triggers on API discovery, shadow API detection, inventory audit, or attack surface mapping requests.
Share bugs, ideas, or general feedback.
影子API(Shadow API)是在组织环境中运行但未被追踪、记录或保护的API端点。它们来源于快速开发周期、被遗忘的测试环境、仍在运行的已废弃API版本、第三方集成,或未经治理部署的开发人员个人项目。影子API绕过认证和监控控制,为攻击者创造隐藏入口。研究表明,大型组织中多达30%的API端点未被记录,使得影子API检测成为API安全态势管理(API Security Posture Management)的关键组成部分。
将实时API流量与已记录的OpenAPI规范进行对比,识别未记录的端点:
#!/usr/bin/env python3
"""影子API端点检测器
将观察到的API流量模式与已记录的OpenAPI规范进行对比,
以识别未记录的(影子)端点。
"""
import json
import re
import yaml
import sys
from collections import defaultdict
from datetime import datetime
from typing import Dict, List, Set, Tuple, Optional
from dataclasses import dataclass, field
@dataclass
class DiscoveredEndpoint:
method: str
path_pattern: str
first_seen: str
last_seen: str
request_count: int
source_ips: Set[str] = field(default_factory=set)
status_codes: Set[int] = field(default_factory=set)
has_auth_header: bool = False
documented: bool = False
class ShadowAPIDetector:
# 用于参数化路径段的常见模式
PARAM_PATTERNS = [
(re.compile(r'/\d+'), '/{id}'),
(re.compile(r'/[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}'), '/{uuid}'),
(re.compile(r'/[a-zA-Z0-9]{20,40}'), '/{token}'),
]
def __init__(self):
self.documented_endpoints: Set[Tuple[str, str]] = set()
self.discovered_endpoints: Dict[Tuple[str, str], DiscoveredEndpoint] = {}
def load_openapi_spec(self, spec_path: str):
"""从OpenAPI规范加载已记录的端点。"""
with open(spec_path, 'r') as f:
if spec_path.endswith('.json'):
spec = json.load(f)
else:
spec = yaml.safe_load(f)
paths = spec.get('paths', {})
for path, methods in paths.items():
# 规范化OpenAPI路径参数
normalized_path = re.sub(r'\{[^}]+\}', '{id}', path)
for method in methods:
if method.upper() in ('GET', 'POST', 'PUT', 'DELETE', 'PATCH', 'HEAD', 'OPTIONS'):
self.documented_endpoints.add((method.upper(), normalized_path))
print(f"已从 {spec_path} 加载 {len(self.documented_endpoints)} 个已记录端点")
def normalize_path(self, path: str) -> str:
"""通过将动态路径段替换为占位符来规范化观察到的路径。"""
# 删除查询字符串
path = path.split('?')[0]
for pattern, replacement in self.PARAM_PATTERNS:
path = pattern.sub(replacement, path)
return path
def process_access_log(self, log_file: str, log_format: str = "common"):
"""处理API访问日志以发现端点。"""
patterns = {
"common": re.compile(
r'(?P<ip>[\d.]+)\s+\S+\s+\S+\s+\[(?P<time>[^\]]+)\]\s+'
r'"(?P<method>\w+)\s+(?P<path>\S+)\s+\S+"\s+(?P<status>\d+)'
),
"json": None # 单独处理JSON格式日志
}
with open(log_file, 'r') as f:
for line in f:
if log_format == "json":
try:
entry = json.loads(line)
method = entry.get('method', entry.get('http_method', ''))
path = entry.get('path', entry.get('uri', ''))
status = int(entry.get('status', entry.get('status_code', 0)))
ip = entry.get('remote_addr', entry.get('client_ip', ''))
timestamp = entry.get('timestamp', entry.get('@timestamp', ''))
has_auth = bool(entry.get('authorization', entry.get('auth_header', '')))
except json.JSONDecodeError:
continue
else:
match = patterns[log_format].match(line)
if not match:
continue
method = match.group('method')
path = match.group('path')
status = int(match.group('status'))
ip = match.group('ip')
timestamp = match.group('time')
has_auth = 'Authorization' in line
# 只处理API路径
if not path.startswith('/api') and not path.startswith('/v'):
continue
normalized = self.normalize_path(path)
key = (method.upper(), normalized)
if key not in self.discovered_endpoints:
self.discovered_endpoints[key] = DiscoveredEndpoint(
method=method.upper(),
path_pattern=normalized,
first_seen=timestamp,
last_seen=timestamp,
request_count=0,
documented=(key in self.documented_endpoints)
)
endpoint = self.discovered_endpoints[key]
endpoint.request_count += 1
endpoint.last_seen = timestamp
endpoint.source_ips.add(ip)
endpoint.status_codes.add(status)
if has_auth:
endpoint.has_auth_header = True
def identify_shadow_apis(self) -> List[DiscoveredEndpoint]:
"""识别不在已记录规范中的端点。"""
shadows = []
for key, endpoint in self.discovered_endpoints.items():
if not endpoint.documented:
shadows.append(endpoint)
# 按请求数降序排列(最活跃的影子端点优先)
shadows.sort(key=lambda e: e.request_count, reverse=True)
return shadows
def classify_risk(self, endpoint: DiscoveredEndpoint) -> str:
"""对影子端点的风险级别进行分类。"""
risk_score = 0
# 未观察到认证
if not endpoint.has_auth_header:
risk_score += 3
# 高流量量
if endpoint.request_count > 1000:
risk_score += 2
elif endpoint.request_count > 100:
risk_score += 1
# 多个来源IP(更广泛的暴露)
if len(endpoint.source_ips) > 10:
risk_score += 2
# 成功响应(端点正常运行)
if 200 in endpoint.status_codes or 201 in endpoint.status_codes:
risk_score += 1
# 写操作风险更高
if endpoint.method in ('POST', 'PUT', 'DELETE', 'PATCH'):
risk_score += 2
# 敏感路径模式
sensitive_patterns = ['admin', 'internal', 'debug', 'test', 'backup',
'config', 'health', 'metrics', 'graphql', 'console']
for pattern in sensitive_patterns:
if pattern in endpoint.path_pattern.lower():
risk_score += 3
break
if risk_score >= 8:
return "CRITICAL"
elif risk_score >= 5:
return "HIGH"
elif risk_score >= 3:
return "MEDIUM"
return "LOW"
def generate_report(self) -> dict:
"""生成全面的影子API发现报告。"""
shadows = self.identify_shadow_apis()
total_documented = len(self.documented_endpoints)
total_discovered = len(self.discovered_endpoints)
report = {
"scan_date": datetime.now().isoformat(),
"summary": {
"documented_endpoints": total_documented,
"total_discovered_endpoints": total_discovered,
"shadow_endpoints": len(shadows),
"shadow_ratio": f"{len(shadows)/max(total_discovered,1)*100:.1f}%",
},
"shadow_endpoints": []
}
for endpoint in shadows:
risk = self.classify_risk(endpoint)
report["shadow_endpoints"].append({
"method": endpoint.method,
"path": endpoint.path_pattern,
"risk_level": risk,
"request_count": endpoint.request_count,
"unique_sources": len(endpoint.source_ips),
"authenticated": endpoint.has_auth_header,
"status_codes": sorted(endpoint.status_codes),
"first_seen": endpoint.first_seen,
"last_seen": endpoint.last_seen,
})
return report
def main():
detector = ShadowAPIDetector()
# 加载已记录的API规范
spec_files = sys.argv[1:] if len(sys.argv) > 1 else ["openapi.yaml"]
for spec in spec_files:
if spec.endswith(('.yaml', '.yml', '.json')):
detector.load_openapi_spec(spec)
# 处理访问日志
detector.process_access_log("/var/log/api/access.log")
report = detector.generate_report()
print(f"\n{'='*60}")
print(f"影子API发现报告")
print(f"{'='*60}")
print(f"已记录: {report['summary']['documented_endpoints']}")
print(f"已发现: {report['summary']['total_discovered_endpoints']}")
print(f"影子端点: {report['summary']['shadow_endpoints']} ({report['summary']['shadow_ratio']})")
print()
for ep in report["shadow_endpoints"]:
risk_marker = {"CRITICAL": "[!!!]", "HIGH": "[!!]", "MEDIUM": "[!]", "LOW": "[.]"}
print(f" {risk_marker.get(ep['risk_level'], '[?]')} {ep['method']} {ep['path']}")
print(f" 风险: {ep['risk_level']} | 请求数: {ep['request_count']} | 认证: {ep['authenticated']}")
# 保存完整报告
with open("shadow_api_report.json", "w") as f:
json.dump(report, f, indent=2, default=str)
print(f"\n完整报告已保存至 shadow_api_report.json")
if __name__ == "__main__":
main()
# AWS:发现不在文档中的API Gateway端点
aws apigateway get-rest-apis --query 'items[*].[name,id]' --output table
# 列出每个API的所有路由
aws apigatewayv2 get-apis --query 'Items[*].[Name,ApiId,ProtocolType]' --output table
# AWS Lambda函数URL(潜在的影子API)
aws lambda list-function-url-configs --function-name "*" 2>/dev/null
# 查找路由到未记录后端的ALB监听器规则
aws elbv2 describe-rules --listener-arn $LISTENER_ARN \
--query 'Rules[*].[Priority,Conditions[0].Values[0],Actions[0].TargetGroupArn]'
# 在源代码中搜索未记录的路由定义
# Express.js路由
grep -rn "app\.\(get\|post\|put\|delete\|patch\)" --include="*.js" --include="*.ts" src/
# Flask/Django路由
grep -rn "@app\.route\|@api\.route\|path(" --include="*.py" src/
# Spring Boot端点
grep -rn "@\(Get\|Post\|Put\|Delete\|Patch\)Mapping\|@RequestMapping" --include="*.java" src/
# 将找到的路由与OpenAPI规范对比
diff <(grep -roh "'/api/[^']*'" src/ | sort -u) \
<(yq '.paths | keys[]' openapi.yaml | sort -u)
# Kong插件配置 - 拒绝未注册的路由
plugins:
- name: request-validator
config:
allowed_content_types:
- application/json
body_schema: null
- name: pre-function
config:
access:
- |
-- 阻止对未注册端点的请求
local registered = kong.cache:get("registered_endpoints")
local path = kong.request.get_path()
local method = kong.request.get_method()
local key = method .. ":" .. path
if not registered[key] then
kong.log.warn("Shadow API access attempt: ", key)
return kong.response.exit(404, {error = "Endpoint not registered"})
end