Detects API enumeration attacks by monitoring sequential ID access patterns and authorization failures, including BOLA and IDOR exploits. Provides Splunk and Elastic SIEM detection rules.
npx claudepluginhub killvxk/cybersecurity-skills-zhThis skill uses the workspace's default tool permissions.
API枚举攻击(API Enumeration Attack)是指攻击者通过系统地探测具有顺序或可预测标识符的API端点,来发现和访问未授权资源。越权对象访问(Broken Object Level Authorization,BOLA)在OWASP API安全Top 10中被列为API1:2023,是最严重的API漏洞。攻击者操纵API请求中的对象标识符(用户ID、订单号、账户引用)来绕过授权并访问其他用户的数据。检测需要监控快速顺序访问尝试、授权失败和异常API使用行为的模式。
Detects API enumeration attacks including BOLA and IDOR by monitoring sequential ID access patterns, auth failures, and abnormal usage. For SOC incident response and detection rule building.
Detects API enumeration attacks including BOLA and IDOR by monitoring sequential ID probes, auth failures, and abnormal patterns in API logs and SIEMs.
Parses AWS API Gateway, Kong, and Nginx access logs using pandas to detect BOLA/IDOR attacks, rate limit bypasses, credential scanning, and injection attempts. Useful for investigating API abuse.
Share bugs, ideas, or general feedback.
API枚举攻击(API Enumeration Attack)是指攻击者通过系统地探测具有顺序或可预测标识符的API端点,来发现和访问未授权资源。越权对象访问(Broken Object Level Authorization,BOLA)在OWASP API安全Top 10中被列为API1:2023,是最严重的API漏洞。攻击者操纵API请求中的对象标识符(用户ID、订单号、账户引用)来绕过授权并访问其他用户的数据。检测需要监控快速顺序访问尝试、授权失败和异常API使用行为的模式。
攻击者遍历数字或可预测标识符:
GET /api/v1/users/1001 -> 200 OK
GET /api/v1/users/1002 -> 200 OK
GET /api/v1/users/1003 -> 403 Forbidden
GET /api/v1/users/1004 -> 200 OK
GET /api/v1/users/1005 -> 200 OK
...
检测指标:
即使是非顺序标识符,如果通过其他端点泄露也可被枚举:
# 攻击者首先从列表端点收集UUID
GET /api/v1/posts?page=1 -> 返回包含作者UUID的帖子对象
# 然后使用这些UUID访问受限用户数据
GET /api/v1/users/a3f2c1e4-... -> 私人用户档案
GET /api/v1/users/b7d9e8f1-... -> 私人用户档案
# 以user_id=100认证,尝试访问其他用户的订单
GET /api/v1/orders?user_id=101
GET /api/v1/orders?user_id=102
GET /api/v1/orders?user_id=103
# 检测API端点上的顺序ID枚举
index=api_logs sourcetype=api_access
| rex field=uri_path "(?<endpoint>/api/v\d+/\w+/)(?<object_id>\d+)"
| stats count as request_count,
dc(object_id) as unique_ids,
values(status_code) as status_codes,
min(_time) as first_seen,
max(_time) as last_seen
by src_ip, endpoint, user_session
| eval time_span = last_seen - first_seen
| eval requests_per_second = request_count / max(time_span, 1)
| where unique_ids > 20 AND requests_per_second > 2
| eval severity = case(
unique_ids > 100, "critical",
unique_ids > 50, "high",
unique_ids > 20, "medium",
1==1, "low"
)
| sort - unique_ids
| table src_ip, endpoint, unique_ids, request_count, requests_per_second,
status_codes, severity
# 通过授权失败模式检测BOLA
index=api_logs sourcetype=api_access status_code IN (401, 403)
| bin _time span=5m
| stats count as failure_count,
dc(uri_path) as unique_paths,
values(uri_path) as attempted_paths
by _time, src_ip, user_id
| where failure_count > 10
| eval attack_type = if(unique_paths > 5, "enumeration", "brute_force")
{
"rule": {
"name": "API Object Enumeration Detection",
"description": "Detects rapid sequential access to API objects with mixed authorization results",
"type": "threshold",
"index": ["api-access-*"],
"query": {
"bool": {
"must": [
{ "regexp": { "url.path": "/api/v[0-9]+/[a-z]+/[0-9]+" } }
],
"should": [
{ "term": { "http.response.status_code": 200 } },
{ "term": { "http.response.status_code": 403 } },
{ "term": { "http.response.status_code": 401 } }
]
}
},
"threshold": {
"field": ["source.ip"],
"value": 50,
"cardinality": [
{ "field": "url.path", "value": 20 }
]
},
"schedule": { "interval": "5m" },
"severity": "high",
"risk_score": 73,
"tags": ["OWASP-API1", "BOLA", "Enumeration"]
}
}
#!/usr/bin/env python3
"""API枚举攻击检测器
分析API访问日志以检测枚举模式,
包括BOLA、IDOR和顺序ID探测。
"""
import re
import sys
import json
from collections import defaultdict
from datetime import datetime, timedelta
from dataclasses import dataclass, field
from typing import List, Dict, Optional
@dataclass
class AccessRecord:
timestamp: datetime
source_ip: str
user_id: Optional[str]
method: str
path: str
status_code: int
object_id: Optional[str] = None
@dataclass
class EnumerationAlert:
source_ip: str
user_id: Optional[str]
endpoint_pattern: str
unique_object_ids: int
total_requests: int
time_window_seconds: float
requests_per_second: float
auth_failure_ratio: float
severity: str
attack_type: str
sample_ids: List[str] = field(default_factory=list)
class EnumerationDetector:
# 从API路径中提取对象ID的正则表达式模式
ID_PATTERNS = [
re.compile(r'/api/v\d+/(\w+)/(\d+)'), # 数字ID
re.compile(r'/api/v\d+/(\w+)/([a-f0-9\-]{36})'), # UUID
re.compile(r'/api/v\d+/(\w+)/([a-zA-Z0-9]{20,})'), # 长字母数字ID
]
def __init__(self, time_window_minutes: int = 5,
min_unique_ids: int = 15,
max_requests_per_second: float = 5.0):
self.time_window = timedelta(minutes=time_window_minutes)
self.min_unique_ids = min_unique_ids
self.max_rps = max_requests_per_second
self.access_log: List[AccessRecord] = []
def parse_log_line(self, line: str) -> Optional[AccessRecord]:
"""将通用日志格式行解析为AccessRecord对象。"""
log_pattern = re.compile(
r'(?P<ip>[\d.]+)\s+\S+\s+(?P<user>\S+)\s+'
r'\[(?P<time>[^\]]+)\]\s+'
r'"(?P<method>\w+)\s+(?P<path>\S+)\s+\S+"\s+'
r'(?P<status>\d+)'
)
match = log_pattern.match(line)
if not match:
return None
path = match.group('path')
object_id = None
for pattern in self.ID_PATTERNS:
id_match = pattern.search(path)
if id_match:
object_id = id_match.group(2)
break
return AccessRecord(
timestamp=datetime.strptime(match.group('time'), '%d/%b/%Y:%H:%M:%S %z'),
source_ip=match.group('ip'),
user_id=match.group('user') if match.group('user') != '-' else None,
method=match.group('method'),
path=path,
status_code=int(match.group('status')),
object_id=object_id
)
def analyze(self, records: List[AccessRecord]) -> List[EnumerationAlert]:
"""分析访问记录中的枚举模式。"""
alerts = []
# 按来源IP和端点模式分组
grouped = defaultdict(list)
for record in records:
if record.object_id:
# 通过移除具体对象ID来规范化端点
endpoint = re.sub(r'/[a-f0-9\-]{36}', '/{id}',
re.sub(r'/\d+', '/{id}', record.path))
key = (record.source_ip, record.user_id, endpoint)
grouped[key].append(record)
for (src_ip, user_id, endpoint), records_group in grouped.items():
if len(records_group) < self.min_unique_ids:
continue
# 按时间戳排序
records_group.sort(key=lambda r: r.timestamp)
# 分析时间窗口
window_start = 0
for window_start in range(len(records_group)):
window_records = []
for r in records_group[window_start:]:
if r.timestamp - records_group[window_start].timestamp <= self.time_window:
window_records.append(r)
unique_ids = set(r.object_id for r in window_records)
if len(unique_ids) < self.min_unique_ids:
continue
time_span = (window_records[-1].timestamp -
window_records[0].timestamp).total_seconds()
rps = len(window_records) / max(time_span, 1)
auth_failures = sum(1 for r in window_records
if r.status_code in (401, 403))
failure_ratio = auth_failures / len(window_records)
# 确定严重程度
if len(unique_ids) > 100:
severity = "critical"
elif len(unique_ids) > 50 or failure_ratio > 0.5:
severity = "high"
elif len(unique_ids) > 20:
severity = "medium"
else:
severity = "low"
# 确定攻击类型
ids_list = sorted([r.object_id for r in window_records
if r.object_id and r.object_id.isdigit()])
is_sequential = self._check_sequential(ids_list)
attack_type = "sequential_enumeration" if is_sequential else "random_enumeration"
alert = EnumerationAlert(
source_ip=src_ip,
user_id=user_id,
endpoint_pattern=endpoint,
unique_object_ids=len(unique_ids),
total_requests=len(window_records),
time_window_seconds=time_span,
requests_per_second=round(rps, 2),
auth_failure_ratio=round(failure_ratio, 2),
severity=severity,
attack_type=attack_type,
sample_ids=list(unique_ids)[:10]
)
alerts.append(alert)
break # 每组一个告警
return alerts
def _check_sequential(self, ids: List[str]) -> bool:
"""检查数字ID是否遵循顺序模式。"""
if len(ids) < 5:
return False
try:
numeric_ids = sorted(int(i) for i in ids)
sequential_count = sum(
1 for i in range(1, len(numeric_ids))
if numeric_ids[i] - numeric_ids[i-1] <= 2
)
return sequential_count / len(numeric_ids) > 0.7
except ValueError:
return False
def main():
detector = EnumerationDetector(
time_window_minutes=5,
min_unique_ids=15
)
log_file = sys.argv[1] if len(sys.argv) > 1 else "/var/log/api/access.log"
records = []
with open(log_file, 'r') as f:
for line in f:
record = detector.parse_log_line(line.strip())
if record:
records.append(record)
alerts = detector.analyze(records)
if alerts:
print(f"\n[!] 检测到 {len(alerts)} 个枚举攻击:\n")
for alert in alerts:
print(f" 来源IP: {alert.source_ip}")
print(f" 用户ID: {alert.user_id}")
print(f" 端点: {alert.endpoint_pattern}")
print(f" 访问的唯一ID数: {alert.unique_object_ids}")
print(f" 请求数/秒: {alert.requests_per_second}")
print(f" 认证失败率: {alert.auth_failure_ratio}")
print(f" 攻击类型: {alert.attack_type}")
print(f" 严重程度: {alert.severity.upper()}")
print(f" 样本ID: {alert.sample_ids}")
print()
else:
print("[+] 未检测到枚举攻击。")
if __name__ == "__main__":
main()
# 始终在数据层验证对象所有权
def get_user_order(request, order_id):
order = Order.objects.get(id=order_id)
if order.user_id != request.user.id:
raise PermissionDenied("无权访问此订单")
return order
import uuid
# 使用UUID替代顺序整数
class Order(Model):
id = UUIDField(default=uuid.uuid4, primary_key=True)
# Kong每个API路由的速率限制
plugins:
- name: rate-limiting
config:
minute: 30
policy: redis
limit_by: credential