From cybersecurity-skills
Automates IOC enrichment with multi-source threat intel via SOAR playbooks or Python pipelines for SIEM alerts, email submissions, and bulk threat feed processing.
npx claudepluginhub mukul975/anthropic-cybersecurity-skills --plugin cybersecurity-skillsThis skill uses the workspace's default tool permissions.
Use this skill when:
Applies Acme Corporation brand guidelines including colors, fonts, layouts, and messaging to generated PowerPoint, Excel, and PDF documents.
Builds DCF models with sensitivity analysis, Monte Carlo simulations, and scenario planning for investment valuation and risk assessment.
Calculates profitability (ROE, margins), liquidity (current ratio), leverage, efficiency, and valuation (P/E, EV/EBITDA) ratios from financial statements in CSV, JSON, text, or Excel for investment analysis.
Use this skill when:
Do not use this skill for fully automated blocking decisions without human review — enrichment automation should inform decisions, not execute blocks autonomously for high-impact actions.
Define the enrichment flow for each IOC type:
SIEM Alert → Extract IOCs → Classify Type → Route to enrichment functions
IP Address → AbuseIPDB + Shodan + VirusTotal IP + MISP
Domain → VirusTotal Domain + PassiveTotal + Shodan + MISP
URL → URLScan.io + VirusTotal URL + Google Safe Browse
File Hash → VirusTotal Files + MalwareBazaar + MISP
→ Aggregate results → Calculate confidence score → Update alert → Notify analyst
import requests
import time
from dataclasses import dataclass, field
from typing import Optional
RATE_LIMIT_DELAY = 0.25 # 4 requests/second for VT free tier
@dataclass
class EnrichmentResult:
ioc_value: str
ioc_type: str
vt_malicious: int = 0
vt_total: int = 0
abuse_confidence: int = 0
shodan_ports: list = field(default_factory=list)
misp_events: list = field(default_factory=list)
confidence_score: int = 0
def enrich_ip(ip: str, vt_key: str, abuse_key: str, shodan_key: str) -> EnrichmentResult:
result = EnrichmentResult(ip, "ip")
# VirusTotal IP lookup
vt_resp = requests.get(
f"https://www.virustotal.com/api/v3/ip_addresses/{ip}",
headers={"x-apikey": vt_key}
)
if vt_resp.status_code == 200:
stats = vt_resp.json()["data"]["attributes"]["last_analysis_stats"]
result.vt_malicious = stats.get("malicious", 0)
result.vt_total = sum(stats.values())
time.sleep(RATE_LIMIT_DELAY)
# AbuseIPDB
abuse_resp = requests.get(
"https://api.abuseipdb.com/api/v2/check",
headers={"Key": abuse_key, "Accept": "application/json"},
params={"ipAddress": ip, "maxAgeInDays": 90}
)
if abuse_resp.status_code == 200:
result.abuse_confidence = abuse_resp.json()["data"]["abuseConfidenceScore"]
# Calculate composite confidence score
result.confidence_score = min(
(result.vt_malicious / max(result.vt_total, 1)) * 60 +
(result.abuse_confidence / 100) * 40, 100
)
return result
def enrich_hash(sha256: str, vt_key: str) -> EnrichmentResult:
result = EnrichmentResult(sha256, "sha256")
vt_resp = requests.get(
f"https://www.virustotal.com/api/v3/files/{sha256}",
headers={"x-apikey": vt_key}
)
if vt_resp.status_code == 200:
stats = vt_resp.json()["data"]["attributes"]["last_analysis_stats"]
result.vt_malicious = stats.get("malicious", 0)
result.vt_total = sum(stats.values())
result.confidence_score = int((result.vt_malicious / max(result.vt_total, 1)) * 100)
return result
In Cortex XSOAR, create an enrichment playbook:
!vt-file-scan or !vt-ip-scan commands!abuseipdb-check-ip command!misp-search for cross-referencingimport time
from functools import wraps
def rate_limited(max_per_second):
min_interval = 1.0 / max_per_second
def decorator(func):
last_called = [0.0]
@wraps(func)
def wrapper(*args, **kwargs):
elapsed = time.time() - last_called[0]
wait = min_interval - elapsed
if wait > 0:
time.sleep(wait)
result = func(*args, **kwargs)
last_called[0] = time.time()
return result
return wrapper
return decorator
def retry_on_429(max_retries=3):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_retries):
response = func(*args, **kwargs)
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 60))
time.sleep(retry_after)
else:
return response
return wrapper
return decorator
Track pipeline performance weekly:
| Term | Definition |
|---|---|
| SOAR | Security Orchestration, Automation, and Response — platform for automating security workflows and integrating disparate tools |
| Enrichment Playbook | Automated workflow sequence that adds contextual intelligence to raw security events |
| Rate Limiting | API provider restrictions on request frequency (e.g., VT free: 4 requests/minute); pipelines must respect these limits |
| Composite Confidence Score | Single score aggregating signals from multiple enrichment sources using weighted formula |
| Fan-out Pattern | Parallel execution of multiple enrichment queries simultaneously to minimize total enrichment latency |