From asi
Builds automated malware submission pipeline: collects suspicious files from endpoints/email gateways, submits to sandboxes/multi-engine scanners like VirusTotal/Cuckoo, generates IOC verdicts for SIEM. Scales SOC high-volume triage.
npx claudepluginhub plurigrid/asi --plugin asiThis skill uses the workspace's default tool permissions.
Use this skill when:
Builds Python pipeline to collect suspicious files from EDR/email gateways, submit to VirusTotal/Cuckoo/Any.Run sandboxes, generate IOC verdicts for SIEM. For SOC malware triage at scale.
Builds automated malware submission pipeline: collects suspicious files from EDR/email gateways, submits to sandboxes/VirusTotal/Cuckoo, extracts IOCs for SIEM integration. Scales SOC high-volume alert triage beyond manual processes.
Executes malware samples in Cuckoo Sandbox to observe runtime behavior like process creation, file changes, registry mods, network activity, and API calls. Generates reports for classification and IOC extraction.
Share bugs, ideas, or general feedback.
Use this skill when:
Do not use for analyzing live malware samples in production environments — always use isolated sandbox infrastructure.
requests, vt-py, pefile librariesCollect suspicious files from multiple sources:
import requests
import hashlib
import os
from pathlib import Path
from datetime import datetime
class MalwareCollector:
def __init__(self, quarantine_dir="/opt/malware_quarantine"):
self.quarantine_dir = Path(quarantine_dir)
self.quarantine_dir.mkdir(exist_ok=True)
def collect_from_edr(self, edr_api_url, api_token):
"""Pull quarantined files from CrowdStrike Falcon"""
headers = {"Authorization": f"Bearer {api_token}"}
# Get recent quarantine events
response = requests.get(
f"{edr_api_url}/quarantine/queries/quarantined-files/v1",
headers=headers,
params={"filter": "state:'quarantined'", "limit": 50}
)
file_ids = response.json()["resources"]
for file_id in file_ids:
# Download quarantined file
dl_response = requests.get(
f"{edr_api_url}/quarantine/entities/quarantined-files/v1",
headers=headers,
params={"ids": file_id}
)
file_data = dl_response.content
sha256 = hashlib.sha256(file_data).hexdigest()
filepath = self.quarantine_dir / f"{sha256}.sample"
filepath.write_bytes(file_data)
yield {"sha256": sha256, "path": str(filepath), "source": "edr"}
def collect_from_email_gateway(self, smtp_quarantine_path):
"""Pull attachments from email gateway quarantine"""
import email
from email import policy
for eml_file in Path(smtp_quarantine_path).glob("*.eml"):
msg = email.message_from_binary_file(
eml_file.open("rb"), policy=policy.default
)
for attachment in msg.iter_attachments():
content = attachment.get_content()
if isinstance(content, str):
content = content.encode()
sha256 = hashlib.sha256(content).hexdigest()
filename = attachment.get_filename() or "unknown"
filepath = self.quarantine_dir / f"{sha256}.sample"
filepath.write_bytes(content)
yield {
"sha256": sha256,
"path": str(filepath),
"source": "email",
"original_filename": filename,
"sender": msg["From"],
"subject": msg["Subject"]
}
def compute_hashes(self, filepath):
"""Calculate MD5, SHA1, SHA256 for a file"""
with open(filepath, "rb") as f:
content = f.read()
return {
"md5": hashlib.md5(content).hexdigest(),
"sha1": hashlib.sha1(content).hexdigest(),
"sha256": hashlib.sha256(content).hexdigest(),
"size": len(content)
}
Check if the file is already known before sandbox submission:
import vt
class MalwarePreScreener:
def __init__(self, vt_api_key, mb_api_url="https://mb-api.abuse.ch/api/v1/"):
self.vt_client = vt.Client(vt_api_key)
self.mb_api_url = mb_api_url
def check_virustotal(self, sha256):
"""Lookup hash in VirusTotal"""
try:
file_obj = self.vt_client.get_object(f"/files/{sha256}")
stats = file_obj.last_analysis_stats
return {
"found": True,
"malicious": stats.get("malicious", 0),
"suspicious": stats.get("suspicious", 0),
"undetected": stats.get("undetected", 0),
"total": sum(stats.values()),
"threat_label": getattr(file_obj, "popular_threat_classification", {}).get(
"suggested_threat_label", "Unknown"
),
"type": getattr(file_obj, "type_description", "Unknown")
}
except vt.APIError:
return {"found": False}
def check_malwarebazaar(self, sha256):
"""Lookup hash in MalwareBazaar"""
response = requests.post(
self.mb_api_url,
data={"query": "get_info", "hash": sha256}
)
data = response.json()
if data["query_status"] == "ok":
entry = data["data"][0]
return {
"found": True,
"signature": entry.get("signature", "Unknown"),
"tags": entry.get("tags", []),
"file_type": entry.get("file_type", "Unknown"),
"first_seen": entry.get("first_seen", "Unknown")
}
return {"found": False}
def pre_screen(self, sha256):
"""Run all pre-screening checks"""
vt_result = self.check_virustotal(sha256)
mb_result = self.check_malwarebazaar(sha256)
verdict = "UNKNOWN"
if vt_result["found"] and vt_result.get("malicious", 0) > 10:
verdict = "KNOWN_MALICIOUS"
elif vt_result["found"] and vt_result.get("malicious", 0) == 0:
verdict = "LIKELY_CLEAN"
return {
"sha256": sha256,
"virustotal": vt_result,
"malwarebazaar": mb_result,
"pre_screen_verdict": verdict,
"needs_sandbox": verdict == "UNKNOWN"
}
def close(self):
self.vt_client.close()
Cuckoo Sandbox Submission:
class SandboxSubmitter:
def __init__(self, cuckoo_url="http://cuckoo.internal:8090"):
self.cuckoo_url = cuckoo_url
def submit_to_cuckoo(self, filepath, timeout=300):
"""Submit file to Cuckoo Sandbox"""
with open(filepath, "rb") as f:
response = requests.post(
f"{self.cuckoo_url}/tasks/create/file",
files={"file": f},
data={
"timeout": timeout,
"options": "procmemdump=yes,route=none",
"priority": 2,
"machine": "win10_x64"
}
)
task_id = response.json()["task_id"]
return task_id
def wait_for_analysis(self, task_id, poll_interval=30, max_wait=600):
"""Wait for sandbox analysis to complete"""
import time
elapsed = 0
while elapsed < max_wait:
response = requests.get(f"{self.cuckoo_url}/tasks/view/{task_id}")
status = response.json()["task"]["status"]
if status == "reported":
return self.get_report(task_id)
elif status == "failed_analysis":
return {"error": "Analysis failed"}
time.sleep(poll_interval)
elapsed += poll_interval
return {"error": "Analysis timed out"}
def get_report(self, task_id):
"""Retrieve analysis report"""
response = requests.get(f"{self.cuckoo_url}/tasks/report/{task_id}")
report = response.json()
# Extract key indicators
return {
"task_id": task_id,
"score": report.get("info", {}).get("score", 0),
"signatures": [
{"name": s["name"], "severity": s["severity"], "description": s["description"]}
for s in report.get("signatures", [])
],
"network": {
"dns": [d["request"] for d in report.get("network", {}).get("dns", [])],
"http": [
{"url": h["uri"], "method": h["method"]}
for h in report.get("network", {}).get("http", [])
],
"hosts": report.get("network", {}).get("hosts", [])
},
"dropped_files": [
{"name": f["name"], "sha256": f["sha256"], "size": f["size"]}
for f in report.get("dropped", [])
],
"processes": [
{"name": p["process_name"], "pid": p["pid"], "command_line": p.get("command_line", "")}
for p in report.get("behavior", {}).get("processes", [])
],
"registry_keys": [
k for k in report.get("behavior", {}).get("summary", {}).get("regkey_written", [])
]
}
def submit_to_joesandbox(self, filepath, joe_api_key, joe_url="https://jbxcloud.joesecurity.org/api"):
"""Submit to Joe Sandbox Cloud"""
with open(filepath, "rb") as f:
response = requests.post(
f"{joe_url}/v2/submission/new",
headers={"Authorization": f"Bearer {joe_api_key}"},
files={"sample": f},
data={
"systems": "w10_64",
"internet-access": False,
"report-cache": True
}
)
return response.json()["data"]["webid"]
class VerdictGenerator:
def __init__(self):
self.malicious_threshold = 7 # Cuckoo score threshold
def generate_verdict(self, pre_screen, sandbox_report):
"""Combine pre-screening and sandbox results for final verdict"""
iocs = {
"ips": [],
"domains": [],
"urls": [],
"hashes": [],
"registry_keys": [],
"files_dropped": []
}
# Extract IOCs from sandbox report
if sandbox_report:
iocs["domains"] = sandbox_report.get("network", {}).get("dns", [])
iocs["ips"] = sandbox_report.get("network", {}).get("hosts", [])
iocs["urls"] = [
h["url"] for h in sandbox_report.get("network", {}).get("http", [])
]
iocs["hashes"] = [
f["sha256"] for f in sandbox_report.get("dropped_files", [])
]
iocs["registry_keys"] = sandbox_report.get("registry_keys", [])[:10]
iocs["files_dropped"] = sandbox_report.get("dropped_files", [])
# Determine verdict
vt_malicious = pre_screen.get("virustotal", {}).get("malicious", 0)
sandbox_score = sandbox_report.get("score", 0) if sandbox_report else 0
sig_count = len(sandbox_report.get("signatures", [])) if sandbox_report else 0
combined_score = (vt_malicious * 2) + (sandbox_score * 10) + (sig_count * 5)
if combined_score >= 100:
verdict = "MALICIOUS"
confidence = "HIGH"
elif combined_score >= 50:
verdict = "SUSPICIOUS"
confidence = "MEDIUM"
elif combined_score >= 20:
verdict = "POTENTIALLY_UNWANTED"
confidence = "LOW"
else:
verdict = "CLEAN"
confidence = "HIGH"
return {
"verdict": verdict,
"confidence": confidence,
"combined_score": combined_score,
"iocs": iocs,
"vt_detections": vt_malicious,
"sandbox_score": sandbox_score,
"signatures": sandbox_report.get("signatures", []) if sandbox_report else []
}
def push_to_splunk(verdict_result, splunk_url, splunk_token):
"""Send malware analysis verdict to Splunk HEC"""
import json
event = {
"sourcetype": "malware_analysis",
"source": "malware_pipeline",
"event": {
"sha256": verdict_result["sha256"],
"verdict": verdict_result["verdict"],
"confidence": verdict_result["confidence"],
"score": verdict_result["combined_score"],
"vt_detections": verdict_result["vt_detections"],
"sandbox_score": verdict_result["sandbox_score"],
"malware_family": verdict_result.get("threat_label", "Unknown"),
"iocs": verdict_result["iocs"],
"signatures": [s["name"] for s in verdict_result["signatures"]]
}
}
response = requests.post(
f"{splunk_url}/services/collector/event",
headers={
"Authorization": f"Splunk {splunk_token}",
"Content-Type": "application/json"
},
json=event,
verify=not os.environ.get("SKIP_TLS_VERIFY", "").lower() == "true", # Set SKIP_TLS_VERIFY=true for self-signed certs in lab environments
)
return response.status_code == 200
def push_iocs_to_blocklist(iocs, firewall_api):
"""Push extracted IOCs to blocking infrastructure"""
for ip in iocs.get("ips", []):
requests.post(
f"{firewall_api}/block",
json={"type": "ip", "value": ip, "action": "block", "source": "malware_pipeline"}
)
for domain in iocs.get("domains", []):
requests.post(
f"{firewall_api}/block",
json={"type": "domain", "value": domain, "action": "sinkhole", "source": "malware_pipeline"}
)
def run_malware_pipeline(sample_path, config):
"""Execute full malware analysis pipeline"""
collector = MalwareCollector()
screener = MalwarePreScreener(config["vt_key"])
submitter = SandboxSubmitter(config["cuckoo_url"])
generator = VerdictGenerator()
# Step 1: Hash and pre-screen
hashes = collector.compute_hashes(sample_path)
pre_screen = screener.pre_screen(hashes["sha256"])
# Step 2: Submit to sandbox if unknown
sandbox_report = None
if pre_screen["needs_sandbox"]:
task_id = submitter.submit_to_cuckoo(sample_path)
sandbox_report = submitter.wait_for_analysis(task_id)
# Step 3: Generate verdict
verdict = generator.generate_verdict(pre_screen, sandbox_report)
verdict["sha256"] = hashes["sha256"]
verdict["threat_label"] = pre_screen.get("virustotal", {}).get("threat_label", "Unknown")
# Step 4: Push to SIEM
push_to_splunk(verdict, config["splunk_url"], config["splunk_token"])
# Step 5: Block if malicious
if verdict["verdict"] == "MALICIOUS":
push_iocs_to_blocklist(verdict["iocs"], config["firewall_api"])
screener.close()
return verdict
| Term | Definition |
|---|---|
| Dynamic Analysis | Executing malware in a sandbox to observe runtime behavior (process creation, network, file system changes) |
| Static Analysis | Examining malware without execution (hash lookup, string analysis, PE header inspection) |
| Sandbox Evasion | Techniques malware uses to detect sandbox environments and alter behavior to avoid analysis |
| IOC Extraction | Automated process of identifying network indicators, file artifacts, and registry changes from sandbox reports |
| Multi-AV Scanning | Submitting samples to multiple antivirus engines (VirusTotal) for consensus-based detection |
| Verdict | Final classification of a sample: Malicious, Suspicious, Potentially Unwanted, or Clean |
MALWARE ANALYSIS REPORT — Pipeline Submission
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Sample: invoice_march.docx
SHA256: a1b2c3d4e5f6a7b8...
File Type: Microsoft Word Document (macro-enabled)
Pre-Screening:
VirusTotal: 34/72 malicious (Emotet.Downloader)
MalwareBazaar: Tags: emotet, macro, downloader
Sandbox Analysis (Cuckoo):
Score: 9.2/10 (MALICIOUS)
Signatures:
- Macro executes PowerShell download cradle (severity: 8)
- Process injection into explorer.exe (severity: 9)
- Connects to known Emotet C2 server (severity: 9)
Extracted IOCs:
C2 IPs: 185.234.218[.]50:8080, 45.77.123[.]45:443
Domains: update-service[.]evil[.]com
Dropped Files: payload.dll (SHA256: b2c3d4e5...)
Registry: HKCU\Software\Microsoft\Windows\CurrentVersion\Run\Update
VERDICT: MALICIOUS (Emotet Downloader) — Confidence: HIGH
ACTIONS:
[DONE] IOCs pushed to Splunk threat intel
[DONE] C2 IPs blocked on firewall
[DONE] Domain sinkholed on DNS
[DONE] Hash blocked on endpoint