Detects and tests OWASP API3:2023 BOPLA vulnerabilities in APIs, including excessive data exposure and mass assignment attacks via Python scripts. Useful for API penetration testing.
npx claudepluginhub killvxk/cybersecurity-skills-zhThis skill uses the workspace's default tool permissions.
对象属性级授权失效(Broken Object Property Level Authorization,BOPLA)被OWASP API安全Top 10归类为API3:2023,结合了两类相关漏洞:过度数据暴露(API返回超出所需的数据)和批量赋值(Mass Assignment,API接受超出预期的数据)。即使API正确地执行了对象级授权,也可能无法控制用户对对象特定属性的读写权限。攻击者利用此漏洞从API响应中读取敏感属性,或向请求体中注入额外属性来修改其无权访问的字段。
Detects and tests OWASP API3:2023 Broken Object Property Level Authorization (BOPLA) vulnerabilities including excessive data exposure and mass assignment attacks in APIs.
Detects and tests Broken Object Property Level Authorization (BOPLA) vulnerabilities in APIs, including excessive data exposure and mass assignment. Guides pen-testing with Burp, Postman, Python requests.
Tests API endpoints for mass assignment vulnerabilities by injecting unauthorized fields like role, isAdmin, price, balance into requests. Useful for OWASP API3:2023 BOLA audits in Rails, Django, Express, Spring apps.
Share bugs, ideas, or general feedback.
对象属性级授权失效(Broken Object Property Level Authorization,BOPLA)被OWASP API安全Top 10归类为API3:2023,结合了两类相关漏洞:过度数据暴露(API返回超出所需的数据)和批量赋值(Mass Assignment,API接受超出预期的数据)。即使API正确地执行了对象级授权,也可能无法控制用户对对象特定属性的读写权限。攻击者利用此漏洞从API响应中读取敏感属性,或向请求体中注入额外属性来修改其无权访问的字段。
API返回的对象属性超出客户端所需:
// GET /api/v1/users/123
// 响应包含UI未显示的敏感字段:
{
"id": 123,
"username": "john_doe",
"email": "john@example.com",
"name": "John Doe",
"ssn": "123-45-6789", // 敏感 - UI不需要
"salary": 95000, // 敏感 - UI不需要
"internal_notes": "VIP client", // 内部 - 不应暴露
"password_hash": "$2b$12...", // 严重 - 永远不应暴露
"role": "admin", // 可能暴露权限信息
"created_by": "system_admin", // 内部元数据
"credit_card_last4": "4242" // PCI合规违规
}
API未过滤地将客户端提供的数据绑定到内部对象属性:
// 普通用户更新请求
PUT /api/v1/users/123
Content-Type: application/json
{
"name": "John Updated",
"email": "new@example.com",
"role": "admin", // 攻击者注入:权限提升
"is_verified": true, // 攻击者注入:绕过验证
"discount_rate": 100, // 攻击者注入:业务逻辑滥用
"account_balance": 999999 // 攻击者注入:金融欺诈
}
#!/usr/bin/env python3
"""BOPLA漏洞扫描器
测试API是否存在对象属性级授权失效(BOPLA)漏洞,
包括过度数据暴露和批量赋值。
"""
import requests
import json
import sys
from typing import Dict, List, Optional, Set
from dataclasses import dataclass, field
from copy import deepcopy
@dataclass
class BOPLAFinding:
endpoint: str
method: str
vulnerability_type: str # "excessive_exposure" 或 "mass_assignment"
severity: str
property_name: str
details: str
class BOPLAScanner:
SENSITIVE_PROPERTY_PATTERNS = {
"critical": [
"password", "password_hash", "secret", "token", "api_key",
"private_key", "secret_key", "access_token", "refresh_token",
],
"high": [
"ssn", "social_security", "tax_id", "credit_card", "card_number",
"cvv", "bank_account", "routing_number",
],
"medium": [
"salary", "income", "internal_notes", "admin_notes",
"created_by", "modified_by", "ip_address", "session_id",
"role", "permissions", "is_admin", "is_superuser", "privilege",
],
"low": [
"phone", "address", "date_of_birth", "dob", "age",
"gender", "ethnicity", "religion",
]
}
MASS_ASSIGNMENT_FIELDS = [
("role", "admin"),
("is_admin", True),
("is_verified", True),
("is_active", True),
("email_verified", True),
("account_type", "premium"),
("discount_rate", 100),
("credit_limit", 999999),
("permissions", ["admin", "write", "delete"]),
("account_balance", 999999),
("subscription_tier", "enterprise"),
("rate_limit", 999999),
]
def __init__(self, base_url: str, auth_headers: Dict[str, str]):
self.base_url = base_url.rstrip('/')
self.auth_headers = auth_headers
self.findings: List[BOPLAFinding] = []
def test_excessive_data_exposure(self, endpoint: str,
expected_fields: Set[str]) -> List[BOPLAFinding]:
"""测试API响应是否包含超出预期字段的内容。"""
findings = []
url = f"{self.base_url}{endpoint}"
try:
response = requests.get(url, headers=self.auth_headers, timeout=10)
if response.status_code != 200:
return findings
data = response.json()
# 处理单对象和列表响应
objects = data if isinstance(data, list) else [data]
if isinstance(data, dict) and "data" in data:
objects = data["data"] if isinstance(data["data"], list) else [data["data"]]
for obj in objects[:5]: # 检查前5个对象
if not isinstance(obj, dict):
continue
response_fields = set(self._flatten_keys(obj))
unexpected_fields = response_fields - expected_fields
for field_name in unexpected_fields:
severity = self._classify_sensitivity(field_name)
if severity:
finding = BOPLAFinding(
endpoint=endpoint,
method="GET",
vulnerability_type="excessive_exposure",
severity=severity,
property_name=field_name,
details=f"响应中出现意外的敏感字段 '{field_name}'"
)
findings.append(finding)
self.findings.append(finding)
except (requests.exceptions.RequestException, json.JSONDecodeError):
pass
return findings
def test_mass_assignment(self, endpoint: str, method: str = "PUT",
original_data: Optional[dict] = None) -> List[BOPLAFinding]:
"""测试API是否接受并处理额外注入的属性。"""
findings = []
url = f"{self.base_url}{endpoint}"
# 首先获取当前对象状态
if original_data is None:
try:
response = requests.get(url, headers=self.auth_headers, timeout=10)
if response.status_code == 200:
original_data = response.json()
else:
original_data = {}
except (requests.exceptions.RequestException, json.JSONDecodeError):
original_data = {}
# 测试每个批量赋值字段
for field_name, injected_value in self.MASS_ASSIGNMENT_FIELDS:
if field_name in original_data:
# 字段存在 - 测试是否可以修改
original_value = original_data[field_name]
if original_value == injected_value:
continue # 已有该值
test_data = deepcopy(original_data)
test_data[field_name] = injected_value
headers = {**self.auth_headers, "Content-Type": "application/json"}
try:
if method == "PUT":
response = requests.put(url, json=test_data,
headers=headers, timeout=10)
elif method == "PATCH":
response = requests.patch(url, json={field_name: injected_value},
headers=headers, timeout=10)
elif method == "POST":
response = requests.post(url, json=test_data,
headers=headers, timeout=10)
if response.status_code in (200, 201, 204):
# 验证字段是否实际被修改
verify_response = requests.get(url, headers=self.auth_headers, timeout=10)
if verify_response.status_code == 200:
updated_data = verify_response.json()
if updated_data.get(field_name) == injected_value:
finding = BOPLAFinding(
endpoint=endpoint,
method=method,
vulnerability_type="mass_assignment",
severity="CRITICAL" if field_name in ["role", "is_admin", "permissions"]
else "HIGH",
property_name=field_name,
details=f"成功注入 '{field_name}={injected_value}'"
)
findings.append(finding)
self.findings.append(finding)
# 如可能则恢复原始值
if field_name in original_data:
restore_data = {field_name: original_data[field_name]}
requests.patch(url, json=restore_data,
headers=headers, timeout=10)
except requests.exceptions.RequestException:
continue
return findings
def test_graphql_property_exposure(self, graphql_endpoint: str,
query: str) -> List[BOPLAFinding]:
"""测试GraphQL API是否存在属性级授权问题。"""
findings = []
url = f"{self.base_url}{graphql_endpoint}"
# 用于发现可用字段的自省查询
introspection = """
{
__schema {
types {
name
fields {
name
type { name kind }
}
}
}
}
"""
try:
response = requests.post(
url,
json={"query": introspection},
headers=self.auth_headers,
timeout=10
)
if response.status_code == 200:
data = response.json()
if "errors" not in data:
finding = BOPLAFinding(
endpoint=graphql_endpoint,
method="POST",
vulnerability_type="excessive_exposure",
severity="MEDIUM",
property_name="__schema",
details="GraphQL自省已启用 - 完整模式已暴露"
)
findings.append(finding)
self.findings.append(finding)
except requests.exceptions.RequestException:
pass
return findings
def _flatten_keys(self, obj: dict, prefix: str = "") -> List[str]:
"""递归展开嵌套字典的键。"""
keys = []
for key, value in obj.items():
full_key = f"{prefix}.{key}" if prefix else key
keys.append(full_key)
if isinstance(value, dict):
keys.extend(self._flatten_keys(value, full_key))
return keys
def _classify_sensitivity(self, field_name: str) -> Optional[str]:
"""对字段名进行敏感度分类。"""
lower_name = field_name.lower().split('.')[-1]
for severity, patterns in self.SENSITIVE_PROPERTY_PATTERNS.items():
for pattern in patterns:
if pattern in lower_name:
return severity.upper()
return None
def generate_report(self) -> dict:
return {
"total_findings": len(self.findings),
"by_type": {
"excessive_exposure": len([f for f in self.findings
if f.vulnerability_type == "excessive_exposure"]),
"mass_assignment": len([f for f in self.findings
if f.vulnerability_type == "mass_assignment"]),
},
"by_severity": {
"CRITICAL": len([f for f in self.findings if f.severity == "CRITICAL"]),
"HIGH": len([f for f in self.findings if f.severity == "HIGH"]),
"MEDIUM": len([f for f in self.findings if f.severity == "MEDIUM"]),
"LOW": len([f for f in self.findings if f.severity == "LOW"]),
},
"findings": [
{
"endpoint": f.endpoint,
"method": f.method,
"type": f.vulnerability_type,
"severity": f.severity,
"property": f.property_name,
"details": f.details,
}
for f in self.findings
]
}
# 服务端:显式属性白名单
class UserSerializer:
# 只暴露这些字段 - 永远不要使用to_json()或to_dict()
PUBLIC_FIELDS = ['id', 'username', 'name', 'avatar_url']
OWNER_FIELDS = PUBLIC_FIELDS + ['email', 'phone', 'preferences']
ADMIN_FIELDS = OWNER_FIELDS + ['role', 'created_at', 'last_login']
def serialize(self, user, requesting_user):
if requesting_user.is_admin:
fields = self.ADMIN_FIELDS
elif requesting_user.id == user.id:
fields = self.OWNER_FIELDS
else:
fields = self.PUBLIC_FIELDS
return {field: getattr(user, field) for field in fields}
# 批量赋值保护 - 可写字段的显式白名单
WRITABLE_FIELDS = {'name', 'email', 'phone', 'avatar_url', 'preferences'}
def update_user(user_id, request_data, requesting_user):
# 过滤掉不在白名单中的字段
safe_data = {k: v for k, v in request_data.items() if k in WRITABLE_FIELDS}
# 仅使用安全数据应用更新
User.objects.filter(id=user_id).update(**safe_data)