From cybersecurity-skills
Builds Python pipeline to collect suspicious files from EDR/email gateways, submit to VirusTotal/Cuckoo/Any.Run sandboxes, generate IOC verdicts for SIEM. For SOC malware triage at scale.
npx claudepluginhub mukul975/anthropic-cybersecurity-skills --plugin cybersecurity-skillsThis skill uses the workspace's default tool permissions.
Use this skill when:
Applies Acme Corporation brand guidelines including colors, fonts, layouts, and messaging to generated PowerPoint, Excel, and PDF documents.
Builds DCF models with sensitivity analysis, Monte Carlo simulations, and scenario planning for investment valuation and risk assessment.
Calculates profitability (ROE, margins), liquidity (current ratio), leverage, efficiency, and valuation (P/E, EV/EBITDA) ratios from financial statements in CSV, JSON, text, or Excel for investment analysis.
Use this skill when:
Do not use for analyzing live malware samples in production environments — always use isolated sandbox infrastructure.
requests, vt-py, pefile librariesCollect suspicious files from multiple sources:
import requests
import hashlib
import os
from pathlib import Path
from datetime import datetime
class MalwareCollector:
def __init__(self, quarantine_dir="/opt/malware_quarantine"):
self.quarantine_dir = Path(quarantine_dir)
self.quarantine_dir.mkdir(exist_ok=True)
def collect_from_edr(self, edr_api_url, api_token):
"""Pull quarantined files from CrowdStrike Falcon"""
headers = {"Authorization": f"Bearer {api_token}"}
# Get recent quarantine events
response = requests.get(
f"{edr_api_url}/quarantine/queries/quarantined-files/v1",
headers=headers,
params={"filter": "state:'quarantined'", "limit": 50}
)
file_ids = response.json()["resources"]
for file_id in file_ids:
# Download quarantined file
dl_response = requests.get(
f"{edr_api_url}/quarantine/entities/quarantined-files/v1",
headers=headers,
params={"ids": file_id}
)
file_data = dl_response.content
sha256 = hashlib.sha256(file_data).hexdigest()
filepath = self.quarantine_dir / f"{sha256}.sample"
filepath.write_bytes(file_data)
yield {"sha256": sha256, "path": str(filepath), "source": "edr"}
def collect_from_email_gateway(self, smtp_quarantine_path):
"""Pull attachments from email gateway quarantine"""
import email
from email import policy
for eml_file in Path(smtp_quarantine_path).glob("*.eml"):
msg = email.message_from_binary_file(
eml_file.open("rb"), policy=policy.default
)
for attachment in msg.iter_attachments():
content = attachment.get_content()
if isinstance(content, str):
content = content.encode()
sha256 = hashlib.sha256(content).hexdigest()
filename = attachment.get_filename() or "unknown"
filepath = self.quarantine_dir / f"{sha256}.sample"
filepath.write_bytes(content)
yield {
"sha256": sha256,
"path": str(filepath),
"source": "email",
"original_filename": filename,
"sender": msg["From"],
"subject": msg["Subject"]
}
def compute_hashes(self, filepath):
"""Calculate MD5, SHA1, SHA256 for a file"""
with open(filepath, "rb") as f:
content = f.read()
return {
"md5": hashlib.md5(content).hexdigest(),
"sha1": hashlib.sha1(content).hexdigest(),
"sha256": hashlib.sha256(content).hexdigest(),
"size": len(content)
}
Check if the file is already known before sandbox submission:
import vt
class MalwarePreScreener:
def __init__(self, vt_api_key, mb_api_url="https://mb-api.abuse.ch/api/v1/"):
self.vt_client = vt.Client(vt_api_key)
self.mb_api_url = mb_api_url
def check_virustotal(self, sha256):
"""Lookup hash in VirusTotal"""
try:
file_obj = self.vt_client.get_object(f"/files/{sha256}")
stats = file_obj.last_analysis_stats
return {
"found": True,
"malicious": stats.get("malicious", 0),
"suspicious": stats.get("suspicious", 0),
"undetected": stats.get("undetected", 0),
"total": sum(stats.values()),
"threat_label": getattr(file_obj, "popular_threat_classification", {}).get(
"suggested_threat_label", "Unknown"
),
"type": getattr(file_obj, "type_description", "Unknown")
}
except vt.APIError:
return {"found": False}
def check_malwarebazaar(self, sha256):
"""Lookup hash in MalwareBazaar"""
response = requests.post(
self.mb_api_url,
data={"query": "get_info", "hash": sha256}
)
data = response.json()
if data["query_status"] == "ok":
entry = data["data"][0]
return {
"found": True,
"signature": entry.get("signature", "Unknown"),
"tags": entry.get("tags", []),
"file_type": entry.get("file_type", "Unknown"),
"first_seen": entry.get("first_seen", "Unknown")
}
return {"found": False}
def pre_screen(self, sha256):
"""Run all pre-screening checks"""
vt_result = self.check_virustotal(sha256)
mb_result = self.check_malwarebazaar(sha256)
verdict = "UNKNOWN"
if vt_result["found"] and vt_result.get("malicious", 0) > 10:
verdict = "KNOWN_MALICIOUS"
elif vt_result["found"] and vt_result.get("malicious", 0) == 0:
verdict = "LIKELY_CLEAN"
return {
"sha256": sha256,
"virustotal": vt_result,
"malwarebazaar": mb_result,
"pre_screen_verdict": verdict,
"needs_sandbox": verdict == "UNKNOWN"
}
def close(self):
self.vt_client.close()
Cuckoo Sandbox Submission:
class SandboxSubmitter:
def __init__(self, cuckoo_url="http://cuckoo.internal:8090"):
self.cuckoo_url = cuckoo_url
def submit_to_cuckoo(self, filepath, timeout=300):
"""Submit file to Cuckoo Sandbox"""
with open(filepath, "rb") as f:
response = requests.post(
f"{self.cuckoo_url}/tasks/create/file",
files={"file": f},
data={
"timeout": timeout,
"options": "procmemdump=yes,route=none",
"priority": 2,
"machine": "win10_x64"
}
)
task_id = response.json()["task_id"]
return task_id
def wait_for_analysis(self, task_id, poll_interval=30, max_wait=600):
"""Wait for sandbox analysis to complete"""
import time
elapsed = 0
while elapsed < max_wait:
response = requests.get(f"{self.cuckoo_url}/tasks/view/{task_id}")
status = response.json()["task"]["status"]
if status == "reported":
return self.get_report(task_id)
elif status == "failed_analysis":
return {"error": "Analysis failed"}
time.sleep(poll_interval)
elapsed += poll_interval
return {"error": "Analysis timed out"}
def get_report(self, task_id):
"""Retrieve analysis report"""
response = requests.get(f"{self.cuckoo_url}/tasks/report/{task_id}")
report = response.json()
# Extract key indicators
return {
"task_id": task_id,
"score": report.get("info", {}).get("score", 0),
"signatures": [
{"name": s["name"], "severity": s["severity"], "description": s["description"]}
for s in report.get("signatures", [])
],
"network": {
"dns": [d["request"] for d in report.get("network", {}).get("dns", [])],
"http": [
{"url": h["uri"], "method": h["method"]}
for h in report.get("network", {}).get("http", [])
],
"hosts": report.get("network", {}).get("hosts", [])
},
"dropped_files": [
{"name": f["name"], "sha256": f["sha256"], "size": f["size"]}
for f in report.get("dropped", [])
],
"processes": [
{"name": p["process_name"], "pid": p["pid"], "command_line": p.get("command_line", "")}
for p in report.get("behavior", {}).get("processes", [])
],
"registry_keys": [
k for k in report.get("behavior", {}).get("summary", {}).get("regkey_written", [])
]
}
def submit_to_joesandbox(self, filepath, joe_api_key, joe_url="https://jbxcloud.joesecurity.org/api"):
"""Submit to Joe Sandbox Cloud"""
with open(filepath, "rb") as f:
response = requests.post(
f"{joe_url}/v2/submission/new",
headers={"Authorization": f"Bearer {joe_api_key}"},
files={"sample": f},
data={
"systems": "w10_64",
"internet-access": False,
"report-cache": True
}
)
return response.json()["data"]["webid"]
class VerdictGenerator:
def __init__(self):
self.malicious_threshold = 7 # Cuckoo score threshold
def generate_verdict(self, pre_screen, sandbox_report):
"""Combine pre-screening and sandbox results for final verdict"""
iocs = {
"ips": [],
"domains": [],
"urls": [],
"hashes": [],
"registry_keys": [],
"files_dropped": []
}
# Extract IOCs from sandbox report
if sandbox_report:
iocs["domains"] = sandbox_report.get("network", {}).get("dns", [])
iocs["ips"] = sandbox_report.get("network", {}).get("hosts", [])
iocs["urls"] = [
h["url"] for h in sandbox_report.get("network", {}).get("http", [])
]
iocs["hashes"] = [
f["sha256"] for f in sandbox_report.get("dropped_files", [])
]
iocs["registry_keys"] = sandbox_report.get("registry_keys", [])[:10]
iocs["files_dropped"] = sandbox_report.get("dropped_files", [])
# Determine verdict
vt_malicious = pre_screen.get("virustotal", {}).get("malicious", 0)
sandbox_score = sandbox_report.get("score", 0) if sandbox_report else 0
sig_count = len(sandbox_report.get("signatures", [])) if sandbox_report else 0
combined_score = (vt_malicious * 2) + (sandbox_score * 10) + (sig_count * 5)
if combined_score >= 100:
verdict = "MALICIOUS"
confidence = "HIGH"
elif combined_score >= 50:
verdict = "SUSPICIOUS"
confidence = "MEDIUM"
elif combined_score >= 20:
verdict = "POTENTIALLY_UNWANTED"
confidence = "LOW"
else:
verdict = "CLEAN"
confidence = "HIGH"
return {
"verdict": verdict,
"confidence": confidence,
"combined_score": combined_score,
"iocs": iocs,
"vt_detections": vt_malicious,
"sandbox_score": sandbox_score,
"signatures": sandbox_report.get("signatures", []) if sandbox_report else []
}
def push_to_splunk(verdict_result, splunk_url, splunk_token):
"""Send malware analysis verdict to Splunk HEC"""
import json
event = {
"sourcetype": "malware_analysis",
"source": "malware_pipeline",
"event": {
"sha256": verdict_result["sha256"],
"verdict": verdict_result["verdict"],
"confidence": verdict_result["confidence"],
"score": verdict_result["combined_score"],
"vt_detections": verdict_result["vt_detections"],
"sandbox_score": verdict_result["sandbox_score"],
"malware_family": verdict_result.get("threat_label", "Unknown"),
"iocs": verdict_result["iocs"],
"signatures": [s["name"] for s in verdict_result["signatures"]]
}
}
response = requests.post(
f"{splunk_url}/services/collector/event",
headers={
"Authorization": f"Splunk {splunk_token}",
"Content-Type": "application/json"
},
json=event,
verify=not os.environ.get("SKIP_TLS_VERIFY", "").lower() == "true", # Set SKIP_TLS_VERIFY=true for self-signed certs in lab environments
)
return response.status_code == 200
def push_iocs_to_blocklist(iocs, firewall_api):
"""Push extracted IOCs to blocking infrastructure"""
for ip in iocs.get("ips", []):
requests.post(
f"{firewall_api}/block",
json={"type": "ip", "value": ip, "action": "block", "source": "malware_pipeline"}
)
for domain in iocs.get("domains", []):
requests.post(
f"{firewall_api}/block",
json={"type": "domain", "value": domain, "action": "sinkhole", "source": "malware_pipeline"}
)
def run_malware_pipeline(sample_path, config):
"""Execute full malware analysis pipeline"""
collector = MalwareCollector()
screener = MalwarePreScreener(config["vt_key"])
submitter = SandboxSubmitter(config["cuckoo_url"])
generator = VerdictGenerator()
# Step 1: Hash and pre-screen
hashes = collector.compute_hashes(sample_path)
pre_screen = screener.pre_screen(hashes["sha256"])
# Step 2: Submit to sandbox if unknown
sandbox_report = None
if pre_screen["needs_sandbox"]:
task_id = submitter.submit_to_cuckoo(sample_path)
sandbox_report = submitter.wait_for_analysis(task_id)
# Step 3: Generate verdict
verdict = generator.generate_verdict(pre_screen, sandbox_report)
verdict["sha256"] = hashes["sha256"]
verdict["threat_label"] = pre_screen.get("virustotal", {}).get("threat_label", "Unknown")
# Step 4: Push to SIEM
push_to_splunk(verdict, config["splunk_url"], config["splunk_token"])
# Step 5: Block if malicious
if verdict["verdict"] == "MALICIOUS":
push_iocs_to_blocklist(verdict["iocs"], config["firewall_api"])
screener.close()
return verdict
| Term | Definition |
|---|---|
| Dynamic Analysis | Executing malware in a sandbox to observe runtime behavior (process creation, network, file system changes) |
| Static Analysis | Examining malware without execution (hash lookup, string analysis, PE header inspection) |
| Sandbox Evasion | Techniques malware uses to detect sandbox environments and alter behavior to avoid analysis |
| IOC Extraction | Automated process of identifying network indicators, file artifacts, and registry changes from sandbox reports |
| Multi-AV Scanning | Submitting samples to multiple antivirus engines (VirusTotal) for consensus-based detection |
| Verdict | Final classification of a sample: Malicious, Suspicious, Potentially Unwanted, or Clean |
MALWARE ANALYSIS REPORT — Pipeline Submission
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Sample: invoice_march.docx
SHA256: a1b2c3d4e5f6a7b8...
File Type: Microsoft Word Document (macro-enabled)
Pre-Screening:
VirusTotal: 34/72 malicious (Emotet.Downloader)
MalwareBazaar: Tags: emotet, macro, downloader
Sandbox Analysis (Cuckoo):
Score: 9.2/10 (MALICIOUS)
Signatures:
- Macro executes PowerShell download cradle (severity: 8)
- Process injection into explorer.exe (severity: 9)
- Connects to known Emotet C2 server (severity: 9)
Extracted IOCs:
C2 IPs: 185.234.218[.]50:8080, 45.77.123[.]45:443
Domains: update-service[.]evil[.]com
Dropped Files: payload.dll (SHA256: b2c3d4e5...)
Registry: HKCU\Software\Microsoft\Windows\CurrentVersion\Run\Update
VERDICT: MALICIOUS (Emotet Downloader) — Confidence: HIGH
ACTIONS:
[DONE] IOCs pushed to Splunk threat intel
[DONE] C2 IPs blocked on firewall
[DONE] Domain sinkholed on DNS
[DONE] Hash blocked on endpoint