Builds identity governance lifecycle processes including JML automation, role mining, access workflows, recertification, and orphaned account remediation using IGA platforms. For compliance and access management needs.
npx claudepluginhub killvxk/cybersecurity-skills-zhThis skill uses the workspace's default tool permissions.
- 组织缺乏自动化的入职-调岗-离职(Joiner-Mover-Leaver,JML)身份管理流程
Builds identity governance lifecycle processes with JML automation, role mining, access request workflows, recertification, and orphaned account remediation using IGA platforms. For enterprise identity management and compliance.
Designs identity governance lifecycle processes including JML automation, role mining, access request workflows, recertification, and orphaned account remediation with IGA platforms.
Executes SailPoint IdentityIQ entitlement reviews including manager certifications, targeted access reviews, role verification, SoD remediation, and automated revocation workflows. For IGA compliance and access governance.
Share bugs, ideas, or general feedback.
不适用于单一应用程序的用户管理;身份治理面向跨系统的生命周期管理,需要将权威 HR 来源与下游应用程序配置进行关联。
绘制从入职到离职的身份生命周期图:
"""
身份生命周期状态机
定义所有身份状态及有效转换,以及自动化动作。
"""
IDENTITY_LIFECYCLE = {
"states": {
"PRE_HIRE": {
"description": "在入职日期前从 HR 数据源创建的身份",
"automated_actions": [
"在 IGA 平台创建身份记录",
"生成唯一员工 ID",
"创建邮箱预留",
"基于职位代码分配天赋角色",
"启动背景调查工作流"
],
"valid_transitions": ["ACTIVE", "CANCELLED"]
},
"ACTIVE": {
"description": "员工已入职,完整访问权限已配置",
"automated_actions": [
"创建 Active Directory 账户",
"创建电子邮件邮箱",
"配置天赋应用程序访问权限",
"分配部门特定角色",
"添加到分发组",
"颁发 MFA 令牌/安全密钥",
"为远程工作者创建 VPN 账户"
],
"valid_transitions": ["ROLE_CHANGE", "LEAVE_OF_ABSENCE", "TERMINATED"]
},
"ROLE_CHANGE": {
"description": "员工调岗、晋升或更换部门",
"automated_actions": [
"根据新职位代码重新计算角色分配",
"移除对前部门应用程序的访问权限",
"配置对新部门应用程序的访问权限",
"更新组成员身份",
"在目录中转移经理关系",
"对保留的权限触发访问审查",
"通知新经理关于继承的访问权限"
],
"valid_transitions": ["ACTIVE", "LEAVE_OF_ABSENCE", "TERMINATED"]
},
"LEAVE_OF_ABSENCE": {
"description": "员工处于长期休假(医疗、育儿、学术休假)",
"automated_actions": [
"禁用交互式登录(保留账户)",
"暂停 VPN 访问",
"设置外出自动回复",
"将邮箱委托给经理",
"保留所有角色分配以便返回",
"从 HR 数据源设置重新激活日期"
],
"valid_transitions": ["ACTIVE", "TERMINATED"]
},
"TERMINATED": {
"description": "员工已离开组织",
"automated_actions": [
"立即禁用 AD 账户",
"撤销所有应用程序访问权限",
"撤销 VPN 和远程访问权限",
"将邮箱转为共享邮箱(经理可访问 90 天)",
"将 OneDrive 文件转移给经理",
"从所有安全和分发组中移除",
"撤销 OAuth 令牌和 API 密钥",
"从移动设备清除企业数据",
"归档身份记录",
"在保留期后安排账户删除"
],
"valid_transitions": ["REHIRE", "DELETED"]
},
"REHIRE": {
"description": "之前离职的员工重新入职",
"automated_actions": [
"重新激活现有身份记录",
"重置凭据并要求重新注册 MFA",
"根据新职位代码配置权限(不使用之前的访问权限)",
"在前 30 天内标记为增强访问审查"
],
"valid_transitions": ["ACTIVE"]
},
"DELETED": {
"description": "在保留期后永久删除账户",
"automated_actions": [
"删除 AD 账户",
"删除电子邮件邮箱存档",
"从 IGA 中移除身份记录",
"生成删除审计日志"
],
"valid_transitions": []
}
},
"retention_periods": {
"terminated_to_deleted": "90 天(默认)",
"mailbox_retention": "90 天作为共享邮箱",
"onedrive_retention": "经理访问 30 天,然后归档",
"audit_log_retention": "合规要求保留 7 年"
}
}
将 HR 系统连接为身份数据的单一真相来源:
"""
HR 来源集成 - Workday 到 IGA 平台连接器
轮询 Workday 获取员工生命周期事件并触发配置。
"""
import requests
from datetime import datetime, timedelta
import logging
class WorkdayIdentityConnector:
def __init__(self, config):
self.base_url = config["workday_api_url"]
self.tenant = config["tenant"]
self.client_id = config["client_id"]
self.client_secret = config["client_secret"]
self.session = requests.Session()
self.logger = logging.getLogger("workday_connector")
def get_access_token(self):
"""向 Workday REST API 进行认证。"""
token_url = f"{self.base_url}/ccx/oauth2/{self.tenant}/token"
response = self.session.post(token_url, data={
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret
})
response.raise_for_status()
return response.json()["access_token"]
def fetch_worker_changes(self, since_datetime):
"""获取自上次同步以来的所有员工生命周期事件。"""
headers = {"Authorization": f"Bearer {self.get_access_token()}"}
params = {
"Updated_From": since_datetime.isoformat(),
"Updated_Through": datetime.utcnow().isoformat(),
"Count": 100
}
workers = []
url = f"{self.base_url}/ccx/api/v1/{self.tenant}/workers"
while url:
response = self.session.get(url, headers=headers, params=params)
response.raise_for_status()
data = response.json()
workers.extend(data.get("data", []))
url = data.get("next", None)
params = {}
return workers
def map_lifecycle_event(self, worker):
"""将 Workday 员工数据映射到身份生命周期事件。"""
worker_data = worker.get("workerData", {})
employment = worker_data.get("employmentData", {})
personal = worker_data.get("personalData", {})
event = {
"employee_id": worker.get("id"),
"first_name": personal.get("legalName", {}).get("firstName"),
"last_name": personal.get("legalName", {}).get("lastName"),
"email": worker_data.get("emailAddress"),
"job_code": employment.get("jobProfile", {}).get("id"),
"job_title": employment.get("jobProfile", {}).get("name"),
"department": employment.get("organization", {}).get("name"),
"department_code": employment.get("organization", {}).get("id"),
"manager_id": employment.get("managerId"),
"location": employment.get("location", {}).get("name"),
"cost_center": employment.get("costCenter", {}).get("id"),
"hire_date": employment.get("hireDate"),
"termination_date": employment.get("terminationDate"),
"status": employment.get("status"),
"worker_type": employment.get("workerType"),
}
# 确定生命周期转换
if event["status"] == "Active" and event["hire_date"]:
hire_date = datetime.fromisoformat(event["hire_date"])
if hire_date > datetime.utcnow():
event["lifecycle_event"] = "PRE_HIRE"
else:
event["lifecycle_event"] = "JOINER"
elif event["status"] == "Active":
event["lifecycle_event"] = "MOVER" # 部门或角色变更
elif event["status"] == "Terminated":
event["lifecycle_event"] = "LEAVER"
elif event["status"] == "On Leave":
event["lifecycle_event"] = "LEAVE_OF_ABSENCE"
return event
def process_lifecycle_events(self, since_datetime):
"""身份生命周期事件的主处理循环。"""
workers = self.fetch_worker_changes(since_datetime)
events = []
for worker in workers:
event = self.map_lifecycle_event(worker)
events.append(event)
self.logger.info(
f"生命周期事件:{event['lifecycle_event']} - "
f"{event['first_name']} {event['last_name']} "
f"(员工ID:{event['employee_id']})"
)
return events
根据职能定义角色,用于自动化配置:
"""
角色挖掘引擎
分析现有访问模式,推导角色定义
用于天赋(自动)配置。
"""
import pandas as pd
from collections import Counter
from itertools import combinations
class RoleMiningEngine:
def __init__(self, access_data):
"""
access_data:DataFrame,包含列
[employee_id, job_code, department, application, entitlement]
"""
self.access_data = access_data
def mine_birthright_roles(self, min_assignment_pct=0.8):
"""
识别应根据职位代码自动分配的权限。
若同一职位代码中 80% 以上的用户拥有某权限,
则该权限成为天赋访问权限。
"""
birthright_roles = {}
for job_code, group in self.access_data.groupby("job_code"):
total_users = group["employee_id"].nunique()
entitlement_counts = group.groupby(
["application", "entitlement"]
)["employee_id"].nunique()
birthright_entitlements = []
for (app, ent), count in entitlement_counts.items():
pct = count / total_users
if pct >= min_assignment_pct:
birthright_entitlements.append({
"application": app,
"entitlement": ent,
"assignment_percentage": round(pct * 100, 1),
"user_count": count
})
if birthright_entitlements:
birthright_roles[job_code] = {
"job_code": job_code,
"total_users": total_users,
"birthright_entitlements": birthright_entitlements
}
return birthright_roles
def detect_role_explosion(self):
"""识别重叠度过高的角色,说明需要整合。"""
roles = self.access_data.groupby("job_code").apply(
lambda x: set(zip(x["application"], x["entitlement"]))
)
overlap_report = []
for (role1, ents1), (role2, ents2) in combinations(roles.items(), 2):
if len(ents1) == 0 or len(ents2) == 0:
continue
overlap = len(ents1 & ents2)
max_size = max(len(ents1), len(ents2))
overlap_pct = overlap / max_size * 100
if overlap_pct > 70:
overlap_report.append({
"role_1": role1,
"role_2": role2,
"role_1_entitlements": len(ents1),
"role_2_entitlements": len(ents2),
"overlapping_entitlements": overlap,
"overlap_percentage": round(overlap_pct, 1),
"recommendation": "CONSOLIDATE" if overlap_pct > 90 else "REVIEW"
})
return sorted(overlap_report, key=lambda x: x["overlap_percentage"], reverse=True)
def find_orphaned_access(self):
"""
找出不再与任何角色定义对齐的权限。
这些是随着时间积累的例外情况。
"""
# 获取天赋权限定义
birthright = self.mine_birthright_roles(min_assignment_pct=0.5)
orphaned = []
for _, row in self.access_data.iterrows():
job_birthright = birthright.get(row["job_code"], {})
expected_ents = set()
for ent in job_birthright.get("birthright_entitlements", []):
expected_ents.add((ent["application"], ent["entitlement"]))
current_ent = (row["application"], row["entitlement"])
if current_ent not in expected_ents:
orphaned.append({
"employee_id": row["employee_id"],
"job_code": row["job_code"],
"application": row["application"],
"entitlement": row["entitlement"],
"recommendation": "审查是否撤销"
})
return pd.DataFrame(orphaned)
实施基于风险的审批自助访问申请:
"""
访问申请工作流引擎
处理自助访问申请,根据所申请权限的风险分类
进行多级审批。
"""
ACCESS_REQUEST_WORKFLOW = {
"risk_levels": {
"LOW": {
"description": "标准业务应用程序",
"examples": ["电子邮件分发组", "SharePoint 团队站点", "标准 SaaS 应用"],
"approval_chain": ["manager"],
"sla_hours": 4,
"auto_approve_if_birthright": True
},
"MEDIUM": {
"description": "敏感数据访问或提升权限",
"examples": ["CRM 管理员", "财务报告", "HR 系统"],
"approval_chain": ["manager", "application_owner"],
"sla_hours": 24,
"auto_approve_if_birthright": False
},
"HIGH": {
"description": "特权访问或受监管数据",
"examples": ["数据库管理员", "云管理员", "PAM 保险库访问"],
"approval_chain": ["manager", "application_owner", "security_team"],
"sla_hours": 48,
"auto_approve_if_birthright": False,
"require_justification": True,
"require_time_limit": True
},
"CRITICAL": {
"description": "域管理员、root 访问或生产数据修改",
"examples": ["Domain Admin", "AWS root", "生产数据库写入"],
"approval_chain": ["manager", "application_owner", "security_team", "ciso"],
"sla_hours": 72,
"auto_approve_if_birthright": False,
"require_justification": True,
"require_time_limit": True,
"require_sod_check": True,
"max_duration_days": 90
}
}
}
class AccessRequestEngine:
def __init__(self, iga_client, risk_catalog):
self.iga = iga_client
self.risk_catalog = risk_catalog
def submit_request(self, requester_id, entitlement_id, justification, duration_days=None):
"""提交访问申请,自动进行风险分类。"""
# 对所申请权限进行风险级别分类
risk_level = self.risk_catalog.get_risk_level(entitlement_id)
workflow = ACCESS_REQUEST_WORKFLOW["risk_levels"][risk_level]
# 检查权限是否为申请者角色的天赋权限
requester = self.iga.get_identity(requester_id)
is_birthright = self.iga.is_birthright_for_role(
entitlement_id, requester["job_code"]
)
if is_birthright and workflow.get("auto_approve_if_birthright"):
return self._auto_approve(requester_id, entitlement_id, "天赋访问权限")
# 如需要,执行职责分离(SOD)检查
if workflow.get("require_sod_check"):
sod_violations = self.iga.check_sod(requester_id, entitlement_id)
if sod_violations:
return {
"status": "SOD_VIOLATION",
"violations": sod_violations,
"action": "申请需要补偿性控制审批"
}
# 创建审批链
request = {
"requester": requester_id,
"entitlement": entitlement_id,
"risk_level": risk_level,
"justification": justification,
"duration_days": duration_days or workflow.get("max_duration_days"),
"approval_chain": self._build_approval_chain(
requester, workflow["approval_chain"]
),
"sla_deadline": workflow["sla_hours"],
"status": "PENDING_APPROVAL"
}
return self.iga.create_request(request)
def _build_approval_chain(self, requester, approver_types):
"""将审批链解析为实际的审批者身份。"""
chain = []
for approver_type in approver_types:
if approver_type == "manager":
chain.append({
"type": "manager",
"identity": requester["manager_id"],
"fallback": requester.get("skip_manager_id")
})
elif approver_type == "application_owner":
chain.append({
"type": "application_owner",
"identity": "resolved_at_runtime",
"fallback": "it-governance-team"
})
elif approver_type == "security_team":
chain.append({
"type": "group",
"identity": "security-governance-team",
"required_approvals": 1
})
elif approver_type == "ciso":
chain.append({
"type": "role",
"identity": "CISO",
"fallback": "deputy-ciso"
})
return chain
识别并修复没有活跃身份关联的账户:
"""
孤立账户检测
识别目标系统中在权威 HR 来源中没有对应活跃身份的账户。
"""
class OrphanedAccountDetector:
def __init__(self, hr_connector, app_connectors):
self.hr = hr_connector
self.apps = app_connectors
def detect_orphaned_accounts(self):
"""对照 HR 活跃员工比对应用程序账户。"""
active_employees = set(self.hr.get_active_employee_ids())
orphaned_accounts = []
for app_name, connector in self.apps.items():
app_accounts = connector.get_all_accounts()
for account in app_accounts:
correlated_id = account.get("employee_id") or account.get("correlation_id")
if correlated_id and correlated_id not in active_employees:
# 检查是否最近离职(在宽限期内)
termination_info = self.hr.get_termination_info(correlated_id)
orphaned_accounts.append({
"application": app_name,
"account_name": account["username"],
"correlated_employee_id": correlated_id,
"account_status": account.get("status", "unknown"),
"last_login": account.get("last_login"),
"termination_date": termination_info.get("date") if termination_info else None,
"days_since_termination": (
(datetime.utcnow() - termination_info["date"]).days
if termination_info and termination_info.get("date") else None
),
"risk_level": self._assess_orphan_risk(account, termination_info)
})
elif not correlated_id:
# 未关联账户 - 没有与任何员工的关联
orphaned_accounts.append({
"application": app_name,
"account_name": account["username"],
"correlated_employee_id": None,
"account_status": account.get("status", "unknown"),
"last_login": account.get("last_login"),
"risk_level": "HIGH",
"reason": "未关联 - 无员工身份关联"
})
return orphaned_accounts
def _assess_orphan_risk(self, account, termination_info):
"""评估孤立账户的风险级别。"""
if account.get("is_privileged"):
return "CRITICAL"
if termination_info and termination_info.get("involuntary"):
return "HIGH"
if account.get("status") == "active":
return "HIGH"
return "MEDIUM"
def generate_remediation_plan(self, orphaned_accounts):
"""为孤立账户创建修复行动计划。"""
plan = []
for account in orphaned_accounts:
if account["risk_level"] == "CRITICAL":
action = "DISABLE_IMMEDIATELY"
sla = "4 小时"
elif account["risk_level"] == "HIGH":
action = "DISABLE_WITHIN_24H"
sla = "24 小时"
else:
action = "REVIEW_AND_DISABLE"
sla = "7 天"
plan.append({
**account,
"remediation_action": action,
"sla": sla,
"assigned_to": "identity-governance-team"
})
return sorted(plan, key=lambda x: ["CRITICAL", "HIGH", "MEDIUM", "LOW"].index(x["risk_level"]))
| 术语 | 定义 |
|---|---|
| 入职-调岗-离职(Joiner-Mover-Leaver,JML) | 核心身份生命周期转换,包括员工入职(Joiner)、角色/部门变更(Mover)和离职(Leaver) |
| 天赋访问权限(Birthright Access) | 根据职位代码、部门或地点自动配置的基线权限,无需提交访问申请 |
| 角色挖掘(Role Mining) | 通过识别相似职能中的常见权限分组,分析现有访问模式以推导角色定义 |
| 孤立账户(Orphaned Account) | 在权威 HR 来源中不再有对应活跃身份的应用账户,代表安全风险 |
| 权威来源(Authoritative Source) | 记录系统(通常是 HR),作为身份属性和就业状态的单一真相来源 |
| 访问申请工作流(Access Request Workflow) | 使用户能够申请额外权限的自助服务流程,带有基于风险的审批路由 |
场景背景:快速成长的公司没有自动化身份生命周期。IT 手动创建账户,新员工需要 3-5 天才能就绪。离职员工保留访问权限长达数周。审计发现 45 个应用程序中存在 2,300 个孤立账户。
方法:
常见陷阱:
身份治理生命周期报告
=======================================
权威来源: Workday
IGA 平台: SailPoint IdentityIQ
身份总数: 10,247
在职员工: 9,834
承包商: 413
生命周期自动化
入职(预入职)SLA: 目标:0 天 | 实际:平均 0.2 天
调岗处理 SLA: 目标:1 天 | 实际:平均 0.8 天
离职禁用 SLA: 目标:1 小时 | 实际:平均 0.5 小时
配置指标(过去 30 天)
新入职员工: 187
自动配置: 174(93.0%)
人工干预: 13(7.0%)
角色变更处理: 89
离职处理: 43
1 小时 SLA 内: 41(95.3%)
角色治理
已定义角色: 127
天赋角色: 48
每角色平均权限数: 12.3
重叠度 > 70%: 8 对(建议整合)
孤立账户
检测到: 23
严重: 2(特权账户)
高: 8
中: 13
已修复(30 天): 19
未处理: 4
访问申请
提交: 342
自动审批(天赋): 87(25.4%)
已批准: 231(67.5%)
已拒绝: 24(7.0%)
平均审批时间: 6.2 小时
SOD 违规标记: 12