Detects anomalous authentication patterns using UEBA, statistical baselines, and ML models to identify impossible travel, brute force, credential stuffing, password spraying, and account takeovers from auth logs like Azure AD and Okta.
npx claudepluginhub killvxk/cybersecurity-skills-zhThis skill uses the workspace's default tool permissions.
- 安全运营需要通过认证日志分析识别被攻陷的账户
Detects anomalous authentication patterns in logs using UEBA analytics, statistical baselines, and ML to flag impossible travel, brute force, credential stuffing, and compromised accounts.
Detects anomalous authentication patterns in logs using UEBA analytics, statistical baselines, and ML models to identify impossible travel, brute force, credential stuffing, password spraying, and compromised accounts. For security analysis of login behaviors.
Performs UEBA on SIEM logs with Splunk queries to detect anomalous user activities like impossible travel, abnormal login times, access patterns, and privilege abuse. For SOC teams identifying compromised accounts or insider threats.
Share bugs, ideas, or general feedback.
不适用于针对单次失败登录的静态规则告警;异常检测需要跨时间和实体维度的统计基线,以减少误报。
从所有身份来源聚合认证事件:
import pandas as pd
import json
from datetime import datetime, timedelta
from collections import defaultdict
# 从多个来源解析认证日志
def normalize_auth_logs(log_source, raw_logs):
"""将认证事件规范化为统一格式。"""
normalized = []
for event in raw_logs:
if log_source == "azure_ad":
normalized.append({
"timestamp": event["createdDateTime"],
"user": event["userPrincipalName"],
"source_ip": event["ipAddress"],
"location": {
"city": event.get("location", {}).get("city"),
"state": event.get("location", {}).get("state"),
"country": event.get("location", {}).get("countryOrRegion"),
"lat": event.get("location", {}).get("geoCoordinates", {}).get("latitude"),
"lon": event.get("location", {}).get("geoCoordinates", {}).get("longitude")
},
"result": "success" if event["status"]["errorCode"] == 0 else "failure",
"failure_reason": event["status"].get("failureReason", ""),
"app": event.get("appDisplayName", "Unknown"),
"device": event.get("deviceDetail", {}).get("operatingSystem", "Unknown"),
"browser": event.get("deviceDetail", {}).get("browser", "Unknown"),
"mfa_result": event.get("authenticationDetails", [{}])[0].get("succeeded", None),
"risk_level": event.get("riskLevelDuringSignIn", "none"),
"client_app": event.get("clientAppUsed", "Unknown"),
"source": "azure_ad"
})
elif log_source == "okta":
normalized.append({
"timestamp": event["published"],
"user": event["actor"]["alternateId"],
"source_ip": event["client"]["ipAddress"],
"location": {
"city": event["client"].get("geographicalContext", {}).get("city"),
"state": event["client"].get("geographicalContext", {}).get("state"),
"country": event["client"].get("geographicalContext", {}).get("country"),
"lat": event["client"].get("geographicalContext", {}).get("geolocation", {}).get("lat"),
"lon": event["client"].get("geographicalContext", {}).get("geolocation", {}).get("lon")
},
"result": "success" if event["outcome"]["result"] == "SUCCESS" else "failure",
"failure_reason": event["outcome"].get("reason", ""),
"app": event.get("target", [{}])[0].get("displayName", "Unknown"),
"device": event["client"].get("device", "Unknown"),
"browser": event["client"].get("userAgent", {}).get("browser", "Unknown"),
"source": "okta"
})
elif log_source == "windows_ad":
normalized.append({
"timestamp": event["TimeCreated"],
"user": event["TargetUserName"],
"source_ip": event.get("IpAddress", ""),
"location": None, # 需要 GeoIP 增强
"result": "success" if event["EventId"] in [4624, 4648] else "failure",
"failure_reason": event.get("FailureReason", ""),
"logon_type": event.get("LogonType", ""),
"source": "windows_ad"
})
return pd.DataFrame(normalized)
# 为缺少位置信息的 Windows AD 日志添加 GeoIP 数据
import geoip2.database
def enrich_geoip(df, geoip_db_path="/opt/geoip/GeoLite2-City.mmdb"):
"""为缺少位置信息的事件添加地理位置数据。"""
reader = geoip2.database.Reader(geoip_db_path)
for idx, row in df.iterrows():
if row["location"] is None and row["source_ip"]:
try:
response = reader.city(row["source_ip"])
df.at[idx, "location"] = {
"city": response.city.name,
"country": response.country.iso_code,
"lat": response.location.latitude,
"lon": response.location.longitude
}
except Exception:
pass
reader.close()
return df
识别地理位置上不可能的登录:
from math import radians, sin, cos, sqrt, atan2
from datetime import datetime
def haversine_distance(lat1, lon1, lat2, lon2):
"""计算两点之间的大圆距离(单位:千米)。"""
R = 6371 # 地球半径(千米)
lat1, lon1, lat2, lon2 = map(radians, [lat1, lon1, lat2, lon2])
dlat = lat2 - lat1
dlon = lon2 - lon1
a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
c = 2 * atan2(sqrt(a), sqrt(1-a))
return R * c
def detect_impossible_travel(df, max_speed_kmh=900):
"""
检测不可能旅行事件——用户在物理上不可能的时间内
从两个不同地点进行认证。
max_speed_kmh:最大现实旅行速度(900 千米/小时 ≈ 商业航班)
"""
alerts = []
# 按用户和时间戳排序
df_sorted = df.sort_values(["user", "timestamp"])
for user, user_events in df_sorted.groupby("user"):
successful_events = user_events[user_events["result"] == "success"]
for i in range(1, len(successful_events)):
prev = successful_events.iloc[i-1]
curr = successful_events.iloc[i]
# 如果缺少位置数据则跳过
if not prev.get("location") or not curr.get("location"):
continue
if not prev["location"].get("lat") or not curr["location"].get("lat"):
continue
# 计算距离和时间差
distance_km = haversine_distance(
prev["location"]["lat"], prev["location"]["lon"],
curr["location"]["lat"], curr["location"]["lon"]
)
time_diff = (pd.Timestamp(curr["timestamp"]) -
pd.Timestamp(prev["timestamp"])).total_seconds() / 3600
if time_diff <= 0:
continue
required_speed = distance_km / time_diff
# 如果所需速度超过最大现实旅行速度则标记
if required_speed > max_speed_kmh and distance_km > 100:
alerts.append({
"alert_type": "IMPOSSIBLE_TRAVEL",
"severity": "HIGH",
"user": user,
"timestamp": curr["timestamp"],
"details": {
"location_1": f"{prev['location']['city']}, {prev['location']['country']}",
"location_2": f"{curr['location']['city']}, {curr['location']['country']}",
"time_1": prev["timestamp"],
"time_2": curr["timestamp"],
"distance_km": round(distance_km, 1),
"time_hours": round(time_diff, 2),
"required_speed_kmh": round(required_speed, 1),
"source_ip_1": prev["source_ip"],
"source_ip_2": curr["source_ip"]
}
})
return alerts
# 运行不可能旅行检测
travel_alerts = detect_impossible_travel(auth_df)
print(f"不可能旅行告警:{len(travel_alerts)}")
for alert in travel_alerts:
print(f" [{alert['severity']}] {alert['user']}: "
f"{alert['details']['location_1']} -> {alert['details']['location_2']} "
f"({alert['details']['distance_km']} 千米,{alert['details']['time_hours']} 小时)")
识别认证日志中的凭据攻击模式:
from collections import Counter
def detect_brute_force(df, threshold_failures=10, window_minutes=10):
"""
检测暴力破解攻击:在短时间窗口内对
单个账户的多次失败尝试。
"""
alerts = []
failed = df[df["result"] == "failure"].copy()
failed["timestamp"] = pd.to_datetime(failed["timestamp"])
for user, user_fails in failed.groupby("user"):
user_fails_sorted = user_fails.sort_values("timestamp")
# 滑动窗口分析
for i, row in user_fails_sorted.iterrows():
window_start = row["timestamp"]
window_end = window_start + timedelta(minutes=window_minutes)
window_events = user_fails_sorted[
(user_fails_sorted["timestamp"] >= window_start) &
(user_fails_sorted["timestamp"] <= window_end)
]
if len(window_events) >= threshold_failures:
source_ips = window_events["source_ip"].unique()
alerts.append({
"alert_type": "BRUTE_FORCE",
"severity": "HIGH",
"user": user,
"timestamp": str(window_start),
"details": {
"failed_attempts": len(window_events),
"window_minutes": window_minutes,
"source_ips": list(source_ips),
"distributed": len(source_ips) > 1,
"failure_reasons": dict(Counter(window_events["failure_reason"]))
}
})
break # 每次检测每个用户只生成一个告警
return alerts
def detect_password_spray(df, threshold_users=10, window_minutes=30):
"""
检测密码喷洒:在短时间窗口内从同一来源
对许多不同账户的失败登录(每个用户 1-2 次尝试)。
"""
alerts = []
failed = df[df["result"] == "failure"].copy()
failed["timestamp"] = pd.to_datetime(failed["timestamp"])
for source_ip, ip_events in failed.groupby("source_ip"):
ip_events_sorted = ip_events.sort_values("timestamp")
for i, row in ip_events_sorted.iterrows():
window_start = row["timestamp"]
window_end = window_start + timedelta(minutes=window_minutes)
window_events = ip_events_sorted[
(ip_events_sorted["timestamp"] >= window_start) &
(ip_events_sorted["timestamp"] <= window_end)
]
unique_users = window_events["user"].nunique()
attempts_per_user = len(window_events) / unique_users if unique_users > 0 else 0
# 密码喷洒:针对大量用户,每个用户尝试次数少
if unique_users >= threshold_users and attempts_per_user <= 3:
# 检查是否有成功登录(账户被攻陷)
success_after = df[
(df["source_ip"] == source_ip) &
(df["result"] == "success") &
(pd.to_datetime(df["timestamp"]) > window_start) &
(pd.to_datetime(df["timestamp"]) < window_end + timedelta(hours=1))
]
alerts.append({
"alert_type": "PASSWORD_SPRAY",
"severity": "CRITICAL" if len(success_after) > 0 else "HIGH",
"timestamp": str(window_start),
"details": {
"source_ip": source_ip,
"targeted_users": unique_users,
"total_attempts": len(window_events),
"avg_attempts_per_user": round(attempts_per_user, 1),
"window_minutes": window_minutes,
"successful_logins_after": len(success_after),
"compromised_accounts": list(success_after["user"].unique()) if len(success_after) > 0 else []
}
})
break
return alerts
# 运行检测
brute_force_alerts = detect_brute_force(auth_df)
spray_alerts = detect_password_spray(auth_df)
print(f"暴力破解告警:{len(brute_force_alerts)}")
print(f"密码喷洒告警:{len(spray_alerts)}")
创建用户行为档案并标记统计异常:
import numpy as np
from scipy import stats
from sklearn.ensemble import IsolationForest
def build_user_baseline(df, user, lookback_days=90):
"""为特定用户建立行为基线。"""
user_events = df[df["user"] == user].copy()
user_events["timestamp"] = pd.to_datetime(user_events["timestamp"])
user_events["hour"] = user_events["timestamp"].dt.hour
user_events["day_of_week"] = user_events["timestamp"].dt.dayofweek
baseline = {
"user": user,
"typical_hours": {
"start": int(user_events["hour"].quantile(0.05)),
"end": int(user_events["hour"].quantile(0.95)),
"mean": float(user_events["hour"].mean()),
"std": float(user_events["hour"].std())
},
"typical_days": list(user_events["day_of_week"].mode().values),
"typical_ips": list(user_events["source_ip"].value_counts().head(10).index),
"typical_locations": list(
user_events["location"].apply(
lambda x: x.get("country") if isinstance(x, dict) else None
).dropna().value_counts().head(5).index
),
"typical_apps": list(user_events["app"].value_counts().head(10).index),
"typical_devices": list(user_events["device"].value_counts().head(5).index),
"avg_daily_logins": float(
user_events.groupby(user_events["timestamp"].dt.date).size().mean()
),
"std_daily_logins": float(
user_events.groupby(user_events["timestamp"].dt.date).size().std()
),
"failure_rate": float(
(user_events["result"] == "failure").mean()
)
}
return baseline
def detect_behavioral_anomalies(event, baseline):
"""将新认证事件与用户基线进行比较。"""
anomalies = []
event_time = pd.Timestamp(event["timestamp"])
# 非工作时间登录检测
hour = event_time.hour
if baseline["typical_hours"]["std"] > 0:
z_score = abs(hour - baseline["typical_hours"]["mean"]) / baseline["typical_hours"]["std"]
if z_score > 2.5:
anomalies.append({
"type": "OFF_HOURS_LOGIN",
"severity": "MEDIUM",
"detail": f"在 {hour}:00 登录(基线:{baseline['typical_hours']['start']}:00-{baseline['typical_hours']['end']}:00)",
"z_score": round(z_score, 2)
})
# 新来源 IP
if event["source_ip"] not in baseline["typical_ips"]:
anomalies.append({
"type": "NEW_SOURCE_IP",
"severity": "MEDIUM",
"detail": f"从未知 IP 登录:{event['source_ip']}"
})
# 新国家/地区
if event.get("location") and isinstance(event["location"], dict):
country = event["location"].get("country")
if country and country not in baseline["typical_locations"]:
anomalies.append({
"type": "NEW_COUNTRY",
"severity": "HIGH",
"detail": f"从新国家/地区登录:{country}"
})
# 新应用程序
if event.get("app") and event["app"] not in baseline["typical_apps"]:
anomalies.append({
"type": "NEW_APPLICATION",
"severity": "LOW",
"detail": f"访问新应用程序:{event['app']}"
})
# 新设备
if event.get("device") and event["device"] not in baseline["typical_devices"]:
anomalies.append({
"type": "NEW_DEVICE",
"severity": "MEDIUM",
"detail": f"从新设备登录:{event['device']}"
})
# 仅工作日用户的周末登录
if event_time.dayofweek >= 5 and 5 not in baseline["typical_days"] and 6 not in baseline["typical_days"]:
anomalies.append({
"type": "WEEKEND_LOGIN",
"severity": "LOW",
"detail": f"检测到周末登录(典型工作日:{baseline['typical_days']})"
})
return anomalies
def isolation_forest_anomaly_detection(df):
"""使用孤立森林(Isolation Forest)进行多变量异常检测。"""
# 特征工程
features_df = df.copy()
features_df["timestamp"] = pd.to_datetime(features_df["timestamp"])
features_df["hour"] = features_df["timestamp"].dt.hour
features_df["day_of_week"] = features_df["timestamp"].dt.dayofweek
features_df["is_failure"] = (features_df["result"] == "failure").astype(int)
# 编码分类特征
features_df["ip_frequency"] = features_df.groupby("source_ip")["source_ip"].transform("count")
features_df["user_frequency"] = features_df.groupby("user")["user"].transform("count")
feature_columns = ["hour", "day_of_week", "is_failure", "ip_frequency", "user_frequency"]
X = features_df[feature_columns].fillna(0)
# 训练孤立森林
model = IsolationForest(
n_estimators=200,
contamination=0.01, # 预期 1% 的异常率
random_state=42,
n_jobs=-1
)
features_df["anomaly_score"] = model.fit_predict(X)
features_df["anomaly_probability"] = model.score_samples(X)
# 提取异常(标记为 -1)
anomalies = features_df[features_df["anomaly_score"] == -1]
return anomalies.sort_values("anomaly_probability")
为常见认证攻击模式部署检测规则:
# 用于认证异常检测的 Splunk SPL 查询
# 1. 暴力破解检测
# 名称:认证暴力破解 - 多次失败登录
# 严重级别:高
brute_force_spl: |
index=auth sourcetype IN ("azure:aad:signin", "okta:im:log", "WinEventLog:Security")
(result="failure" OR EventCode=4625)
| bin _time span=10m
| stats count as failed_attempts dc(src_ip) as unique_ips
values(src_ip) as source_ips
latest(_time) as last_attempt
by user _time
| where failed_attempts >= 10
| eval alert_type=if(unique_ips > 3, "分布式暴力破解", "标准暴力破解")
# 2. 密码喷洒检测
# 名称:密码喷洒攻击 - 相同来源针对多个用户
# 严重级别:严重
password_spray_spl: |
index=auth sourcetype IN ("azure:aad:signin", "okta:im:log")
result="failure"
| bin _time span=30m
| stats dc(user) as targeted_users count as total_attempts
values(user) as users_targeted
by src_ip _time
| where targeted_users >= 10
| eval attempts_per_user = round(total_attempts / targeted_users, 1)
| where attempts_per_user <= 3
| eval severity=if(targeted_users > 50, "CRITICAL", "HIGH")
# 3. 不可能旅行检测
# 名称:不可能旅行 - 地理上不一致的登录
# 严重级别:高
impossible_travel_spl: |
index=auth result="success"
| iplocation src_ip
| sort user _time
| streamstats current=f last(lat) as prev_lat last(lon) as prev_lon
last(_time) as prev_time last(City) as prev_city last(Country) as prev_country
by user
| where isnotnull(prev_lat) AND isnotnull(lat)
| eval distance_km = 6371 * 2 * asin(sqrt(
pow(sin((lat - prev_lat) * pi() / 360), 2) +
cos(prev_lat * pi() / 180) * cos(lat * pi() / 180) *
pow(sin((lon - prev_lon) * pi() / 360), 2)))
| eval time_hours = (_time - prev_time) / 3600
| eval required_speed = distance_km / time_hours
| where required_speed > 900 AND distance_km > 100
# 4. 撞库检测
# 名称:撞库攻击 - 大量失败登录后部分成功
# 严重级别:严重
credential_stuffing_spl: |
index=auth
| bin _time span=1h
| stats count(eval(result="failure")) as failures
count(eval(result="success")) as successes
dc(user) as unique_users
dc(src_ip) as unique_ips
by src_ip _time
| where failures > 100 AND successes > 0 AND unique_users > 20
| eval success_rate = round(successes / (failures + successes) * 100, 2)
| where success_rate < 5
将多个检测信号合并为风险评分:
def calculate_auth_risk_score(user, alerts, baseline):
"""
计算认证事件的综合风险评分。
将多个异常信号与加权评分相结合。
"""
score = 0
risk_factors = []
weights = {
"IMPOSSIBLE_TRAVEL": 40,
"PASSWORD_SPRAY": 35,
"BRUTE_FORCE": 30,
"CREDENTIAL_STUFFING": 35,
"NEW_COUNTRY": 25,
"OFF_HOURS_LOGIN": 15,
"NEW_SOURCE_IP": 10,
"NEW_DEVICE": 10,
"NEW_APPLICATION": 5,
"WEEKEND_LOGIN": 5,
"MFA_BYPASS": 45,
"LEGACY_PROTOCOL": 20
}
for alert in alerts:
alert_type = alert.get("type") or alert.get("alert_type")
weight = weights.get(alert_type, 10)
# 根据严重级别调整权重
severity_multiplier = {
"CRITICAL": 2.0,
"HIGH": 1.5,
"MEDIUM": 1.0,
"LOW": 0.5
}
severity = alert.get("severity", "MEDIUM")
adjusted_weight = weight * severity_multiplier.get(severity, 1.0)
score += adjusted_weight
risk_factors.append({
"factor": alert_type,
"weight": adjusted_weight,
"detail": alert.get("detail", alert.get("details", ""))
})
# 将评分规范化到 0-100
normalized_score = min(100, score)
# 确定风险级别
if normalized_score >= 80:
risk_level = "CRITICAL"
recommended_action = "立即暂停账户并展开调查"
elif normalized_score >= 60:
risk_level = "HIGH"
recommended_action = "强制 MFA 重新注册并通知 SOC"
elif normalized_score >= 40:
risk_level = "MEDIUM"
recommended_action = "要求升级认证"
elif normalized_score >= 20:
risk_level = "LOW"
recommended_action = "监控并记录用于趋势分析"
else:
risk_level = "INFORMATIONAL"
recommended_action = "无需采取行动"
return {
"user": user,
"risk_score": normalized_score,
"risk_level": risk_level,
"recommended_action": recommended_action,
"risk_factors": sorted(risk_factors, key=lambda x: x["weight"], reverse=True),
"timestamp": datetime.utcnow().isoformat()
}
| 术语 | 定义 |
|---|---|
| 不可能旅行(Impossible Travel) | 认证异常,用户在物理上不可能旅行的时间内从两个地理位置相距甚远的地方登录 |
| 密码喷洒(Password Spraying) | 凭据攻击,对大量账户尝试少量常用密码,以规避账户锁定阈值 |
| 撞库(Credential Stuffing) | 自动化攻击,使用从数据泄露中获取的用户名/密码组合,尝试非法访问账户 |
| UEBA(用户和实体行为分析) | 使用机器学习和统计分析建立行为基线并检测偏差的技术 |
| 行为基线(Behavioral Baseline) | 用户正常认证模式的统计档案,包括典型时间、位置、设备和应用程序 |
| 孤立森林(Isolation Forest) | 无监督机器学习算法,通过孤立与大多数数据点不同的观测值来检测异常 |
| 风险评分(Risk Score) | 聚合多个异常信号并进行加权评分的综合数值,用于优先处理认证威胁 |
场景背景:SOC 发现来自云 VPS IP 地址的失败认证尝试激增,针对 200 多个账户。两小时后,一名高管账户从同一 IP 范围显示成功认证,随后创建了邮箱规则并发生数据外泄。
方法:
常见陷阱:
认证异常检测报告
=========================================
分析期间: 2026-02-01 至 2026-02-24
总认证事件数: 2,847,392
监控用户数: 3,847
告警来源: Azure AD、Okta、Windows AD
威胁检测概要
密码喷洒攻击: 3 次
暴力破解攻击: 12 次
不可能旅行: 8 次
撞库攻击: 1 次
行为异常: 47 次
高风险账户
[CRITICAL] j.smith@corp.com 评分:92
- 不可能旅行:芝加哥 -> 莫斯科(7,876 千米,0.5 小时)
- 密码喷洒目标,随后成功登录
- 新设备和浏览器指纹
- 非工作时间访问 SharePoint 和电子邮件
行动:账户已暂停,SOC 已启动调查
[HIGH] m.johnson@corp.com 评分:67
- 从新国家/地区登录(巴西)
- 新来源 IP 不匹配 VPN 范围
- 访问 HR 应用程序超出正常模式
行动:要求 MFA 重新注册,已通知经理
[MEDIUM] a.williams@corp.com 评分:38
- 周末 UTC 03:00 登录
- 新设备(Linux,通常使用 Windows)
行动:已应用升级认证
攻击活动详情
密码喷洒活动 #1:
来源: 185.220.101.x/24(Tor 出口节点)
目标用户: 247
成功率: 0.8%(2 个账户被攻陷)
被攻陷账户: j.smith@corp.com、r.davis@corp.com
持续时间: 45 分钟
模式: 每个用户 2 次尝试,3 秒间隔