Implements API Security Posture Management to discover, classify, risk-score APIs based on security controls, and enforce policies across the API lifecycle. Useful for continuous API monitoring.
npx claudepluginhub killvxk/cybersecurity-skills-zhThis skill uses the workspace's default tool permissions.
API安全态势管理(API Security Posture Management,API-SPM)通过自动发现、分类和风险评分组织内所有API(包括内部、外部、合作伙伴和影子端点),持续监控API攻击面。与点时间测试工具不同,API-SPM持续运行,检测配置漂移(Configuration Drift)、策略违规、缺失安全控制、敏感数据暴露和合规差距。它从DAST、SAST、SCA和运行时监控工具聚合发现结果,提供整个组织API风险态势的统一视图。
Implements API Security Posture Management: discovers, classifies, risk-scores APIs; assesses security controls. Includes Python engine for inventory and continuous monitoring with gateway/SIEM integrations.
Implements API Security Posture Management to continuously discover, classify, risk-score APIs, and enforce security policies across the lifecycle. For API inventory, monitoring, and compliance.
Discovers all API endpoints including documented, undocumented, shadow, zombie, and deprecated ones via passive traffic analysis, active scanning, DNS enumeration, JavaScript parsing, and cloud resource inventory. Triggers on API discovery, shadow API detection, inventory audit, or attack surface mapping requests.
Share bugs, ideas, or general feedback.
API安全态势管理(API Security Posture Management,API-SPM)通过自动发现、分类和风险评分组织内所有API(包括内部、外部、合作伙伴和影子端点),持续监控API攻击面。与点时间测试工具不同,API-SPM持续运行,检测配置漂移(Configuration Drift)、策略违规、缺失安全控制、敏感数据暴露和合规差距。它从DAST、SAST、SCA和运行时监控工具聚合发现结果,提供整个组织API风险态势的统一视图。
#!/usr/bin/env python3
"""API安全态势管理引擎
持续发现、分类和风险评分API,
以维护全面的安全态势清单。
"""
import json
import re
import hashlib
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Set
from dataclasses import dataclass, field
from enum import Enum
class APIClassification(Enum):
EXTERNAL = "external" # 外部API
INTERNAL = "internal" # 内部API
PARTNER = "partner" # 合作伙伴API
SHADOW = "shadow" # 影子API
DEPRECATED = "deprecated" # 已弃用API
class RiskLevel(Enum):
CRITICAL = 4 # 严重
HIGH = 3 # 高
MEDIUM = 2 # 中
LOW = 1 # 低
INFO = 0 # 信息
@dataclass
class SecurityControl:
name: str
present: bool
required: bool
severity: RiskLevel
details: str = ""
@dataclass
class APIEndpoint:
api_id: str
method: str
path: str
service_name: str
classification: APIClassification
owner: Optional[str] = None
version: Optional[str] = None
first_discovered: str = ""
last_seen: str = ""
documented: bool = False
security_controls: List[SecurityControl] = field(default_factory=list)
risk_score: float = 0.0
sensitive_data_types: Set[str] = field(default_factory=set)
compliance_tags: Set[str] = field(default_factory=set)
traffic_volume_daily: int = 0
class APIPostureManager:
SENSITIVE_PATTERNS = {
"ssn": re.compile(r'\b\d{3}-\d{2}-\d{4}\b'),
"credit_card": re.compile(r'\b\d{4}[- ]?\d{4}[- ]?\d{4}[- ]?\d{4}\b'),
"email": re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'),
"api_key": re.compile(r'\b[A-Za-z0-9]{32,}\b'),
"jwt": re.compile(r'eyJ[A-Za-z0-9_-]+\.eyJ[A-Za-z0-9_-]+\.[A-Za-z0-9_-]+'),
"phone": re.compile(r'\b\+?1?\d{10,15}\b'),
}
def __init__(self):
self.inventory: Dict[str, APIEndpoint] = {}
self.policy_rules: List[dict] = []
def generate_api_id(self, method: str, path: str, service: str) -> str:
raw = f"{service}:{method}:{path}"
return hashlib.sha256(raw.encode()).hexdigest()[:16]
def register_api(self, method: str, path: str, service_name: str,
classification: APIClassification,
documented: bool = False, owner: str = None) -> APIEndpoint:
api_id = self.generate_api_id(method, path, service_name)
now = datetime.now().isoformat()
if api_id in self.inventory:
endpoint = self.inventory[api_id]
endpoint.last_seen = now
return endpoint
endpoint = APIEndpoint(
api_id=api_id,
method=method,
path=path,
service_name=service_name,
classification=classification,
owner=owner,
first_discovered=now,
last_seen=now,
documented=documented
)
self.inventory[api_id] = endpoint
return endpoint
def assess_security_controls(self, endpoint: APIEndpoint,
traffic_sample: dict) -> List[SecurityControl]:
"""评估API端点上存在的安全控制措施。"""
controls = []
# 认证检查
has_auth = any(h in traffic_sample.get('request_headers', {})
for h in ['Authorization', 'X-API-Key', 'Cookie'])
controls.append(SecurityControl(
name="authentication",
present=has_auth,
required=True,
severity=RiskLevel.CRITICAL,
details="未检测到认证机制" if not has_auth else "认证已存在"
))
# TLS/HTTPS检查
is_https = traffic_sample.get('scheme', '').lower() == 'https'
controls.append(SecurityControl(
name="transport_encryption",
present=is_https,
required=True,
severity=RiskLevel.CRITICAL,
details="API通过无TLS的HTTP访问" if not is_https else "HTTPS已强制执行"
))
# 速率限制检查
has_rate_limit = any(h.startswith('X-RateLimit') or h == 'Retry-After'
for h in traffic_sample.get('response_headers', {}).keys())
controls.append(SecurityControl(
name="rate_limiting",
present=has_rate_limit,
required=True,
severity=RiskLevel.HIGH,
details="未检测到速率限制头" if not has_rate_limit else "速率限制已启用"
))
# CORS策略检查
cors_origin = traffic_sample.get('response_headers', {}).get('Access-Control-Allow-Origin', '')
has_strict_cors = cors_origin and cors_origin != '*'
controls.append(SecurityControl(
name="cors_policy",
present=has_strict_cors,
required=endpoint.classification == APIClassification.EXTERNAL,
severity=RiskLevel.HIGH if cors_origin == '*' else RiskLevel.MEDIUM,
details=f"CORS来源: {cors_origin}" if cors_origin else "无CORS头"
))
# 安全响应头检查
sec_headers = traffic_sample.get('response_headers', {})
required_headers = {
'X-Content-Type-Options': 'nosniff',
'Strict-Transport-Security': None,
'X-Frame-Options': None,
'Cache-Control': 'no-store',
}
missing = [h for h in required_headers if h not in sec_headers]
controls.append(SecurityControl(
name="security_headers",
present=len(missing) == 0,
required=True,
severity=RiskLevel.MEDIUM,
details=f"缺少响应头: {', '.join(missing)}" if missing else "所有安全响应头均已存在"
))
# 输入验证检查(从日志检测Schema验证错误)
has_validation = traffic_sample.get('has_schema_validation', False)
controls.append(SecurityControl(
name="input_validation",
present=has_validation,
required=True,
severity=RiskLevel.HIGH,
details="未检测到Schema验证" if not has_validation else "输入验证已启用"
))
endpoint.security_controls = controls
return controls
def calculate_risk_score(self, endpoint: APIEndpoint) -> float:
"""计算API端点的综合风险分数(0-100)。"""
score = 0.0
max_score = 0.0
# 安全控制评分
for control in endpoint.security_controls:
weight = control.severity.value * 5
max_score += weight
if not control.present and control.required:
score += weight
# 分类风险乘数
classification_weights = {
APIClassification.EXTERNAL: 1.5,
APIClassification.PARTNER: 1.3,
APIClassification.SHADOW: 2.0,
APIClassification.DEPRECATED: 1.8,
APIClassification.INTERNAL: 1.0,
}
multiplier = classification_weights.get(endpoint.classification, 1.0)
# 未文档化惩罚
if not endpoint.documented:
score += 10
# 敏感数据惩罚
score += len(endpoint.sensitive_data_types) * 5
# 归一化到0-100
if max_score > 0:
normalized = min(100, (score / max_score) * 100 * multiplier)
else:
normalized = 0
endpoint.risk_score = round(normalized, 1)
return endpoint.risk_score
def generate_posture_report(self) -> dict:
"""生成整个组织的API安全态势报告。"""
total = len(self.inventory)
if total == 0:
return {"error": "清单中无API"}
risk_distribution = {level.name: 0 for level in RiskLevel}
classification_counts = {c.value: 0 for c in APIClassification}
undocumented = 0
missing_auth = 0
missing_tls = 0
for endpoint in self.inventory.values():
self.calculate_risk_score(endpoint)
if endpoint.risk_score >= 75:
risk_distribution["CRITICAL"] += 1
elif endpoint.risk_score >= 50:
risk_distribution["HIGH"] += 1
elif endpoint.risk_score >= 25:
risk_distribution["MEDIUM"] += 1
else:
risk_distribution["LOW"] += 1
classification_counts[endpoint.classification.value] += 1
if not endpoint.documented:
undocumented += 1
for control in endpoint.security_controls:
if control.name == "authentication" and not control.present:
missing_auth += 1
if control.name == "transport_encryption" and not control.present:
missing_tls += 1
avg_risk = sum(e.risk_score for e in self.inventory.values()) / total
return {
"report_date": datetime.now().isoformat(),
"total_apis": total,
"average_risk_score": round(avg_risk, 1),
"risk_distribution": risk_distribution,
"classification": classification_counts,
"undocumented_apis": undocumented,
"missing_authentication": missing_auth,
"missing_tls": missing_tls,
"top_risks": sorted(
[{"api_id": e.api_id, "method": e.method, "path": e.path,
"service": e.service_name, "risk_score": e.risk_score,
"classification": e.classification.value}
for e in self.inventory.values()],
key=lambda x: x["risk_score"],
reverse=True
)[:20]
}
在所有API中定义并强制执行安全策略:
# api-security-policies.yaml
policies:
- name: require-authentication
description: 所有外部API必须要求认证
scope:
classification: [external, partner]
rule:
control: authentication
required: true
severity: critical
remediation: "添加OAuth2、API密钥或JWT认证"
- name: enforce-tls
description: 所有API必须使用HTTPS
scope:
classification: [external, internal, partner]
rule:
control: transport_encryption
required: true
severity: critical
remediation: "配置TLS证书并将HTTP重定向到HTTPS"
- name: require-rate-limiting
description: 外部API必须实施速率限制
scope:
classification: [external]
rule:
control: rate_limiting
required: true
severity: high
remediation: "在API网关层面配置速率限制"
- name: no-wildcard-cors
description: API不得使用通配符CORS来源
scope:
classification: [external]
rule:
control: cors_policy
condition: "origin != '*'"
severity: high
remediation: "在CORS配置中指定明确允许的来源"
- name: documentation-required
description: 所有API必须有OpenAPI文档
scope:
classification: [external, partner]
rule:
documented: true
severity: medium
remediation: "创建并发布OpenAPI规范"
- name: deprecation-sunset
description: 已弃用API必须包含Sunset响应头
scope:
classification: [deprecated]
rule:
header_present: "Sunset"
severity: medium
remediation: "添加包含计划下线日期的Sunset响应头"
| 指标 | 描述 | 目标 |
|---|---|---|
| API发现覆盖率 | 有文档的API占比 | > 95% |
| 平均风险分数 | 所有API的平均风险分数 | < 25 |
| 严重发现数 | 严重风险API数量 | 0 |
| 影子API数量 | 未记录/未管理的API | 0 |
| 认证覆盖率 | 有认证控制的API占比 | 100% |
| TLS覆盖率 | 使用HTTPS的API占比 | 100% |
| 策略合规率 | 符合所有策略的API占比 | > 90% |
| 平均修复时间 | 修复发现问题的平均天数 | < 7天 |