Automates responses to AWS GuardDuty findings using EventBridge and Lambda, including EC2 instance isolation and SNS notifications. Useful for real-time cloud security incident handling.
npx claudepluginhub killvxk/cybersecurity-skills-zhThis skill uses the workspace's default tool permissions.
Amazon GuardDuty 是一种威胁检测服务,持续监控 AWS 账户的恶意活动和未授权行为。通过将 GuardDuty 与 Amazon EventBridge 和 AWS Lambda 集成,安全团队实现自动化、实时的威胁响应,将平均响应时间(MTTR)从数小时缩短到数秒。GuardDuty 分析 VPC 流日志、CloudTrail 管理和数据事件、DNS 日志、EKS 审计日志和 S3 数据事件。
Automates AWS GuardDuty findings processing with EventBridge rules and Lambda functions for real-time incident response, EC2 quarantine, and SNS notifications.
Automates AWS GuardDuty threat findings processing via EventBridge and Lambda for real-time incident response, resource quarantine, and security notifications.
Guides deployment and operation of Amazon GuardDuty for continuous AWS threat detection on S3, EKS, EC2 runtime monitoring, and Lambda. Covers finding severity interpretation and EventBridge/Lambda response automation.
Share bugs, ideas, or general feedback.
Amazon GuardDuty 是一种威胁检测服务,持续监控 AWS 账户的恶意活动和未授权行为。通过将 GuardDuty 与 Amazon EventBridge 和 AWS Lambda 集成,安全团队实现自动化、实时的威胁响应,将平均响应时间(MTTR)从数小时缩短到数秒。GuardDuty 分析 VPC 流日志、CloudTrail 管理和数据事件、DNS 日志、EKS 审计日志和 S3 数据事件。
# 启用 GuardDuty
aws guardduty create-detector --enable --finding-publishing-frequency FIFTEEN_MINUTES
# 启用附加数据源
aws guardduty update-detector \
--detector-id DETECTOR_ID \
--data-sources '{
"S3Logs": {"Enable": true},
"Kubernetes": {"AuditLogs": {"Enable": true}},
"MalwareProtection": {"ScanEc2InstanceWithFindings": {"EbsVolumes": true}},
"RuntimeMonitoring": {"Enable": true}
}'
{
"source": ["aws.guardduty"],
"detail-type": ["GuardDuty Finding"],
"detail": {
"severity": [{"numeric": [">=", 7.0]}]
}
}
aws events put-rule \
--name "guardduty-high-severity" \
--event-pattern '{
"source": ["aws.guardduty"],
"detail-type": ["GuardDuty Finding"],
"detail": {
"severity": [{"numeric": [">=", 7.0]}]
}
}'
aws events put-targets \
--rule "guardduty-high-severity" \
--targets "Id"="lambda-handler","Arn"="arn:aws:lambda:us-east-1:123456789012:function:guardduty-response"
import boto3
import json
import os
ec2 = boto3.client('ec2')
sns = boto3.client('sns')
QUARANTINE_SG = os.environ.get('QUARANTINE_SECURITY_GROUP')
SNS_TOPIC = os.environ.get('SNS_TOPIC_ARN')
def lambda_handler(event, context):
finding = event['detail']
finding_type = finding['type']
severity = finding['severity']
account_id = finding['accountId']
region = finding['region']
# 提取资源信息
resource = finding.get('resource', {})
resource_type = resource.get('resourceType', '')
if resource_type == 'Instance':
instance_id = resource['instanceDetails']['instanceId']
instance_tags = {t['key']: t['value']
for t in resource['instanceDetails'].get('tags', [])}
# 如果已隔离则跳过
if instance_tags.get('SecurityStatus') == 'Quarantined':
return {'statusCode': 200, 'body': 'Already quarantined'}
# 获取当前安全组用于取证
instance = ec2.describe_instances(InstanceIds=[instance_id])
current_sgs = [sg['GroupId'] for sg in
instance['Reservations'][0]['Instances'][0]['SecurityGroups']]
# 使用发现信息和原始安全组标记实例
ec2.create_tags(
Resources=[instance_id],
Tags=[
{'Key': 'SecurityStatus', 'Value': 'Quarantined'},
{'Key': 'GuardDutyFinding', 'Value': finding_type},
{'Key': 'OriginalSecurityGroups', 'Value': ','.join(current_sgs)},
{'Key': 'QuarantineTime', 'Value': finding['updatedAt']}
]
)
# 移至隔离安全组(阻断所有流量)
if QUARANTINE_SG:
ec2.modify_instance_attribute(
InstanceId=instance_id,
Groups=[QUARANTINE_SG]
)
# 创建 EBS 快照用于取证
volumes = ec2.describe_volumes(
Filters=[{'Name': 'attachment.instance-id', 'Values': [instance_id]}]
)
for vol in volumes['Volumes']:
ec2.create_snapshot(
VolumeId=vol['VolumeId'],
Description=f'GuardDuty forensic snapshot - {finding_type}',
TagSpecifications=[{
'ResourceType': 'snapshot',
'Tags': [
{'Key': 'Purpose', 'Value': 'ForensicCapture'},
{'Key': 'SourceInstance', 'Value': instance_id},
{'Key': 'FindingType', 'Value': finding_type}
]
}]
)
# 通知安全团队
sns.publish(
TopicArn=SNS_TOPIC,
Subject=f'[GuardDuty] {finding_type} - 实例 {instance_id} 已隔离',
Message=json.dumps({
'action': 'instance_quarantined',
'instance_id': instance_id,
'finding_type': finding_type,
'severity': severity,
'account': account_id,
'region': region,
'original_security_groups': current_sgs,
'description': finding.get('description', '')
}, indent=2)
)
return {
'statusCode': 200,
'body': f'实例 {instance_id} 已隔离并创建快照'
}
return {'statusCode': 200, 'body': '非 EC2 发现已处理'}
import boto3
import json
import os
iam = boto3.client('iam')
sns = boto3.client('sns')
SNS_TOPIC = os.environ.get('SNS_TOPIC_ARN')
def lambda_handler(event, context):
finding = event['detail']
finding_type = finding['type']
if 'IAMUser' not in finding_type and 'UnauthorizedAccess' not in finding_type:
return {'statusCode': 200, 'body': 'Not an IAM finding'}
resource = finding.get('resource', {})
access_key_details = resource.get('accessKeyDetails', {})
user_name = access_key_details.get('userName', '')
access_key_id = access_key_details.get('accessKeyId', '')
if not user_name:
return {'statusCode': 200, 'body': 'No user identified'}
actions_taken = []
# 停用受损的访问密钥
if access_key_id and access_key_id != 'GeneratedFindingAccessKeyId':
try:
iam.update_access_key(
UserName=user_name,
AccessKeyId=access_key_id,
Status='Inactive'
)
actions_taken.append(f'已停用访问密钥 {access_key_id}')
except Exception as e:
actions_taken.append(f'停用密钥失败: {str(e)}')
# 为用户附加拒绝所有策略
deny_policy = {
"Version": "2012-10-17",
"Statement": [{
"Effect": "Deny",
"Action": "*",
"Resource": "*"
}]
}
try:
iam.put_user_policy(
UserName=user_name,
PolicyName='GuardDuty-DenyAll-Quarantine',
PolicyDocument=json.dumps(deny_policy)
)
actions_taken.append(f'已为 {user_name} 应用拒绝所有策略')
except Exception as e:
actions_taken.append(f'应用拒绝策略失败: {str(e)}')
# 通知
sns.publish(
TopicArn=SNS_TOPIC,
Subject=f'[GuardDuty] IAM 入侵 - {user_name}',
Message=json.dumps({
'finding_type': finding_type,
'user': user_name,
'access_key': access_key_id,
'actions_taken': actions_taken,
'severity': finding['severity']
}, indent=2)
)
return {'statusCode': 200, 'body': json.dumps(actions_taken)}
resource "aws_guardduty_detector" "main" {
enable = true
finding_publishing_frequency = "FIFTEEN_MINUTES"
datasources {
s3_logs { enable = true }
kubernetes { audit_logs { enable = true } }
malware_protection {
scan_ec2_instance_with_findings {
ebs_volumes { enable = true }
}
}
}
}
resource "aws_cloudwatch_event_rule" "guardduty_high" {
name = "guardduty-high-severity"
description = "GuardDuty 高严重性发现"
event_pattern = jsonencode({
source = ["aws.guardduty"]
detail-type = ["GuardDuty Finding"]
detail = {
severity = [{ numeric = [">=", 7.0] }]
}
})
}
resource "aws_cloudwatch_event_target" "lambda" {
rule = aws_cloudwatch_event_rule.guardduty_high.name
arn = aws_lambda_function.guardduty_response.arn
}
| 类别 | 严重性范围 | 示例 |
|---|---|---|
| Backdoor | 5.0 - 8.0 | Backdoor:EC2/C&CActivity |
| CryptoCurrency | 5.0 - 8.0 | CryptoCurrency:EC2/BitcoinTool |
| Trojan | 5.0 - 8.0 | Trojan:EC2/BlackholeTraffic |
| UnauthorizedAccess | 5.0 - 8.0 | UnauthorizedAccess:IAMUser/ConsoleLogin |
| Recon | 2.0 - 5.0 | Recon:EC2/PortProbeUnprotected |
| Persistence | 5.0 - 8.0 | Persistence:IAMUser/AnomalousBehavior |
# 指定 GuardDuty 管理员
aws guardduty enable-organization-admin-account \
--admin-account-id 111111111111
# 为新账户自动启用
aws guardduty update-organization-configuration \
--detector-id DETECTOR_ID \
--auto-enable