Mines optimal RBAC roles from user-permission assignments using bottom-up clustering, formal concept analysis, and graph methods to reduce role explosion and enforce least privilege. For IAM optimization.
npx claudepluginhub killvxk/cybersecurity-skills-zhThis skill uses the workspace's default tool permissions.
角色挖掘(Role Mining)是分析现有用户-权限分配以发现基于角色的访问控制(Role-Based Access Control,RBAC)系统最优角色的过程。组织通过职位变更、项目分配和临时授权随时间积累了过多权限,导致存在大量重叠的细粒度角色,即"角色爆炸"。角色挖掘使用数据分析技术——包括聚类算法、形式概念分析和图方法——将权限整合为最小角色集,准确反映业务职能的同时强制执行最小权限原则。
Discovers optimal RBAC roles from user-permission data using bottom-up/top-down mining, clustering, FCA, and graph algorithms to reduce role explosion.
Discovers optimal RBAC roles using bottom-up and top-down mining techniques from user-permission assignments, reducing role explosion and enforcing least privilege. Useful for IAM optimization and audits.
Share bugs, ideas, or general feedback.
角色挖掘(Role Mining)是分析现有用户-权限分配以发现基于角色的访问控制(Role-Based Access Control,RBAC)系统最优角色的过程。组织通过职位变更、项目分配和临时授权随时间积累了过多权限,导致存在大量重叠的细粒度角色,即"角色爆炸"。角色挖掘使用数据分析技术——包括聚类算法、形式概念分析和图方法——将权限整合为最小角色集,准确反映业务职能的同时强制执行最小权限原则。
| 方法 | 描述 | 最适用场景 |
|---|---|---|
| 自底向上 | 分析现有权限以发现常见模式 | 权限有机增长的大型数据集 |
| 自顶向下 | 基于业务需求和职位描述设计角色 | 绿地 RBAC 建设或组织重构 |
| 混合方法 | 将自底向上分析与自顶向下业务验证结合 | 大多数生产环境 |
1. 权限聚类(Permission Clustering):使用 k-means 或层次聚类对具有相似权限集的用户进行分组。同一聚类中的用户共享一个公共角色。
2. 形式概念分析(Formal Concept Analysis,FCA):数学框架,从二进制用户-权限矩阵中识别完整的概念集(共享精确权限集的用户组)。
3. 图方法挖掘(Graph-Based Mining):将用户和权限建模为二部图,然后找出代表候选角色的密集子图。
4. 布尔矩阵分解(Boolean Matrix Decomposition):将用户-权限矩阵 U 分解为 U ≈ R × P,其中 R 将用户映射到角色,P 将角色映射到权限。
| 指标 | 公式 | 目标 |
|---|---|---|
| 角色数量 | 挖掘后的不同角色总数 | 最小化 |
| 覆盖率 | 被挖掘角色解释的权限 / 总权限 | > 95% |
| 加权结构复杂度(WSC) | 角色-用户 + 角色-权限分配之和 | 最小化 |
| 偏差 | 不被分配角色覆盖的额外权限 | < 5% |
从所有身份来源收集当前访问状态:
import pandas as pd
import numpy as np
# 加载用户-权限分配
# 格式:user_id, permission_id(每行一个分配)
assignments = pd.read_csv("user_permissions.csv")
# 创建二进制用户-权限矩阵(UPA 矩阵)
upa_matrix = assignments.pivot_table(
index="user_id",
columns="permission_id",
aggfunc="size",
fill_value=0
)
upa_matrix = (upa_matrix > 0).astype(int)
print(f"用户数:{upa_matrix.shape[0]}")
print(f"权限数:{upa_matrix.shape[1]}")
print(f"分配数:{assignments.shape[0]}")
print(f"密度:{upa_matrix.values.sum() / upa_matrix.size:.2%}")
from sklearn.cluster import AgglomerativeClustering
from sklearn.metrics import silhouette_score
def find_optimal_clusters(matrix, max_k=50):
"""使用轮廓分析找到最优角色数量。"""
scores = []
for k in range(2, min(max_k, matrix.shape[0])):
clustering = AgglomerativeClustering(
n_clusters=k, metric="jaccard", linkage="average"
)
labels = clustering.fit_predict(matrix)
score = silhouette_score(matrix, labels, metric="jaccard")
scores.append((k, score))
optimal_k = max(scores, key=lambda x: x[1])[0]
return optimal_k, scores
def mine_roles_clustering(upa_matrix, n_clusters):
"""使用 Jaccard 距离上的层次聚类挖掘角色。"""
clustering = AgglomerativeClustering(
n_clusters=n_clusters, metric="jaccard", linkage="average"
)
user_matrix = upa_matrix.values
labels = clustering.fit_predict(user_matrix)
roles = {}
for cluster_id in range(n_clusters):
cluster_users = upa_matrix.index[labels == cluster_id]
cluster_permissions = upa_matrix.loc[cluster_users]
# 核心角色 = 超过 80% 聚类成员拥有的权限
permission_frequency = cluster_permissions.mean()
core_permissions = permission_frequency[permission_frequency >= 0.8].index.tolist()
roles[f"Role_{cluster_id}"] = {
"permissions": core_permissions,
"user_count": len(cluster_users),
"users": cluster_users.tolist(),
"coverage": permission_frequency[permission_frequency >= 0.8].mean()
}
return roles, labels
def mine_roles_fca(upa_matrix, min_support=3):
"""使用形式概念分析(频繁闭合项集)挖掘角色。"""
from itertools import combinations
users = upa_matrix.index.tolist()
permissions = upa_matrix.columns.tolist()
concepts = []
# 找出所有至少被 min_support 个用户共享的最大权限集
for size in range(len(permissions), 0, -1):
for perm_combo in combinations(permissions, size):
perm_set = set(perm_combo)
# 找出拥有该集合中所有权限的用户
matching_users = []
for user in users:
user_perms = set(upa_matrix.columns[upa_matrix.loc[user] == 1])
if perm_set.issubset(user_perms):
matching_users.append(user)
if len(matching_users) >= min_support:
# 检查是否为闭合概念(不存在具有相同范围的超集)
is_closed = True
for concept in concepts:
if set(matching_users) == set(concept["users"]) and \
perm_set.issubset(set(concept["permissions"])):
is_closed = False
break
if is_closed:
concepts.append({
"permissions": list(perm_set),
"users": matching_users,
"support": len(matching_users)
})
if len(concepts) > 100: # 限制数量以提高性能
break
return concepts
def evaluate_role_set(roles, upa_matrix):
"""评估挖掘角色集的质量。"""
total_assignments = upa_matrix.values.sum()
covered_assignments = 0
extra_assignments = 0
for role_name, role_data in roles.items():
role_perms = set(role_data["permissions"])
for user in role_data["users"]:
user_perms = set(upa_matrix.columns[upa_matrix.loc[user] == 1])
covered = role_perms.intersection(user_perms)
extra = role_perms - user_perms
covered_assignments += len(covered)
extra_assignments += len(extra)
metrics = {
"total_roles": len(roles),
"total_assignments": total_assignments,
"covered_assignments": covered_assignments,
"coverage_rate": covered_assignments / total_assignments if total_assignments else 0,
"extra_permissions": extra_assignments,
"deviation_rate": extra_assignments / (covered_assignments + extra_assignments) if (covered_assignments + extra_assignments) else 0,
"avg_role_size": np.mean([len(r["permissions"]) for r in roles.values()]),
"avg_users_per_role": np.mean([r["user_count"] for r in roles.values()]),
}
return metrics
挖掘候选角色后: