Builds automated IOC enrichment pipelines in OpenCTI using connectors for VirusTotal, Shodan, AbuseIPDB, GreyNoise to enrich STIX indicators with threat context.
npx claudepluginhub killvxk/cybersecurity-skills-zhThis skill uses the workspace's default tool permissions.
OpenCTI 是一个以 STIX 2.1 为原生数据模型的开源网络威胁情报知识管理平台。本技能涵盖使用 OpenCTI 连接器生态系统构建自动化 IOC 富化流水线,通过 VirusTotal、Shodan、AbuseIPDB、GreyNoise 等来源对指标进行富化。该流水线自动对新摄入的指标进行富化,将其与已知威胁行为者和攻击活动关联,并为分析师优先排序进行评分。
Builds automated IOC enrichment pipeline with OpenCTI connectors for VirusTotal, Shodan, AbuseIPDB, GreyNoise to enrich STIX indicators, correlate threats, and score confidence for prioritization.
Builds automated IOC enrichment pipeline using OpenCTI connectors for VirusTotal, Shodan, AbuseIPDB, GreyNoise to enrich indicators with context, correlations, and confidence scores.
Automates IOC enrichment from threat intel sources like VirusTotal, Shodan, and MISP using Python pipelines or SOAR playbooks. For SIEM alert triage, phishing IOC batching, reducing analyst time.
Share bugs, ideas, or general feedback.
OpenCTI 是一个以 STIX 2.1 为原生数据模型的开源网络威胁情报知识管理平台。本技能涵盖使用 OpenCTI 连接器生态系统构建自动化 IOC 富化流水线,通过 VirusTotal、Shodan、AbuseIPDB、GreyNoise 等来源对指标进行富化。该流水线自动对新摄入的指标进行富化,将其与已知威胁行为者和攻击活动关联,并为分析师优先排序进行评分。
pycti 库OpenCTI 使用 GraphQL API 前端,以 ElasticSearch 作为存储后端,以 Redis/RabbitMQ 用于连接器通信。数据以 STIX 2.1 对象和关系的形式原生存储。连接器分为以下类别:外部导入(推送摄取)、内部导入(文件解析)、内部富化(上下文添加)和流式处理(实时导出)。
内部富化连接器在创建新可观测对象时自动触发,或由分析师手动触发。每个连接器接收 STIX 对象、查询外部服务,并返回 STIX 2.1 bundle,以附加的上下文、标签和关系扩充原始可观测对象。
OpenCTI 对指标使用 0-100 置信度等级。富化连接器可根据外部验证更新置信度分数:VirusTotal 检测率、Shodan 暴露数据、AbuseIPDB 报告数量和 GreyNoise 分类结果。
# docker-compose.yml(核心服务)
version: '3'
services:
opencti:
image: opencti/platform:6.4.4
environment:
- APP__PORT=8080
- APP__ADMIN__EMAIL=admin@opencti.io
- APP__ADMIN__PASSWORD=ChangeMeNow
- APP__ADMIN__TOKEN=your-admin-token-uuid
- ELASTICSEARCH__URL=http://elasticsearch:9200
- MINIO__ENDPOINT=minio
- RABBITMQ__HOSTNAME=rabbitmq
ports:
- "8080:8080"
depends_on:
- elasticsearch
- minio
- rabbitmq
- redis
connector-virustotal:
image: opencti/connector-virustotal:6.4.4
environment:
- OPENCTI_URL=http://opencti:8080
- OPENCTI_TOKEN=your-admin-token-uuid
- CONNECTOR_ID=connector-virustotal-id
- CONNECTOR_NAME=VirusTotal
- CONNECTOR_SCOPE=StixFile,Artifact,IPv4-Addr,Domain-Name,Url
- CONNECTOR_AUTO=true
- VIRUSTOTAL_TOKEN=your-vt-api-key
- VIRUSTOTAL_MAX_TLP=TLP:AMBER
connector-shodan:
image: opencti/connector-shodan:6.4.4
environment:
- OPENCTI_URL=http://opencti:8080
- OPENCTI_TOKEN=your-admin-token-uuid
- CONNECTOR_ID=connector-shodan-id
- CONNECTOR_NAME=Shodan
- CONNECTOR_SCOPE=IPv4-Addr
- CONNECTOR_AUTO=true
- SHODAN_TOKEN=your-shodan-api-key
- SHODAN_MAX_TLP=TLP:AMBER
connector-abuseipdb:
image: opencti/connector-abuseipdb:6.4.4
environment:
- OPENCTI_URL=http://opencti:8080
- OPENCTI_TOKEN=your-admin-token-uuid
- CONNECTOR_ID=connector-abuseipdb-id
- CONNECTOR_NAME=AbuseIPDB
- CONNECTOR_SCOPE=IPv4-Addr
- CONNECTOR_AUTO=true
- ABUSEIPDB_API_KEY=your-abuseipdb-key
import os
from pycti import OpenCTIConnectorHelper, get_config_variable
from stix2 import (
Bundle, Indicator, Note, Relationship,
IPv4Address, DomainName
)
import requests
class CustomEnrichmentConnector:
def __init__(self):
config = {
"opencti": {
"url": os.environ.get("OPENCTI_URL"),
"token": os.environ.get("OPENCTI_TOKEN"),
},
"connector": {
"id": os.environ.get("CONNECTOR_ID"),
"name": "CustomEnrichment",
"scope": "IPv4-Addr,Domain-Name,Url",
"auto": True,
"type": "INTERNAL_ENRICHMENT",
},
}
self.helper = OpenCTIConnectorHelper(config)
self.helper.listen(self._process_message)
def _process_message(self, data):
entity_id = data["entity_id"]
stix_object = self.helper.api.stix_cyber_observable.read(id=entity_id)
if not stix_object:
return "未找到可观测对象"
observable_type = stix_object["entity_type"]
observable_value = stix_object.get("value", "")
enrichment_results = []
if observable_type == "IPv4-Addr":
enrichment_results = self._enrich_ip(observable_value, entity_id)
elif observable_type == "Domain-Name":
enrichment_results = self._enrich_domain(observable_value, entity_id)
if enrichment_results:
bundle = Bundle(objects=enrichment_results, allow_custom=True)
self.helper.send_stix2_bundle(bundle.serialize())
return "富化完成"
def _enrich_ip(self, ip_address, entity_id):
"""使用 GreyNoise、AbuseIPDB 上下文富化 IP 地址。"""
objects = []
# GreyNoise Community API
try:
gn_response = requests.get(
f"https://api.greynoise.io/v3/community/{ip_address}",
headers={"key": os.environ.get("GREYNOISE_API_KEY")},
timeout=30,
)
if gn_response.status_code == 200:
gn_data = gn_response.json()
classification = gn_data.get("classification", "unknown")
noise = gn_data.get("noise", False)
riot = gn_data.get("riot", False)
note_content = (
f"## GreyNoise 富化\n"
f"- 分类: {classification}\n"
f"- 互联网噪声: {noise}\n"
f"- RIOT(良性服务): {riot}\n"
f"- 名称: {gn_data.get('name', 'N/A')}\n"
f"- 最后发现: {gn_data.get('last_seen', 'N/A')}"
)
note = Note(
content=note_content,
object_refs=[entity_id],
abstract=f"GreyNoise: {classification}",
allow_custom=True,
)
objects.append(note)
# 根据分类添加标签
if classification == "malicious":
self.helper.api.stix_cyber_observable.add_label(
id=entity_id, label_name="greynoise:malicious"
)
elif riot:
self.helper.api.stix_cyber_observable.add_label(
id=entity_id, label_name="greynoise:benign-service"
)
except Exception as e:
self.helper.log_error(f"GreyNoise 富化失败: {e}")
return objects
def _enrich_domain(self, domain, entity_id):
"""使用 WHOIS 和 DNS 上下文富化域名。"""
objects = []
try:
# 使用 SecurityTrails API 进行域名富化
st_response = requests.get(
f"https://api.securitytrails.com/v1/domain/{domain}",
headers={"APIKEY": os.environ.get("SECURITYTRAILS_API_KEY")},
timeout=30,
)
if st_response.status_code == 200:
st_data = st_response.json()
current_dns = st_data.get("current_dns", {})
a_records = [
r.get("ip") for r in current_dns.get("a", {}).get("values", [])
]
note_content = (
f"## SecurityTrails 富化\n"
f"- A 记录: {', '.join(a_records)}\n"
f"- Alexa 排名: {st_data.get('alexa_rank', 'N/A')}\n"
f"- 主机名: {st_data.get('hostname', 'N/A')}"
)
note = Note(
content=note_content,
object_refs=[entity_id],
abstract=f"SecurityTrails: {domain}",
allow_custom=True,
)
objects.append(note)
except Exception as e:
self.helper.log_error(f"SecurityTrails 富化失败: {e}")
return objects
if __name__ == "__main__":
connector = CustomEnrichmentConnector()