Uses Shodan API to analyze IP reputation, identifying open ports, running services, known vulnerabilities, and hosting context for threat intelligence enrichment and incident classification.
npx claudepluginhub killvxk/cybersecurity-skills-zhThis skill uses the workspace's default tool permissions.
Shodan 是全球首个互联网设备搜索引擎,持续扫描 IPv4 和 IPv6 地址空间,记录开放端口、运行服务、SSL 证书和已知漏洞。本技能涵盖使用 Shodan API 和 InternetDB 免费 API 富化来自安全告警的 IP 地址、根据暴露服务和漏洞评估威胁级别、识别托管基础设施模式,以及将 IP 声誉数据集成到 SOC 分类和威胁情报工作流中。
Analyzes IP reputation using Shodan API to identify open ports, services, vulnerabilities, and hosting context for threat intelligence and incident triage.
Analyzes IP address reputation using Shodan API and InternetDB to identify open ports, services, vulnerabilities, and hosting context for threat intelligence and incident triage.
Guides Shodan CLI, web, and API usage for pentesting reconnaissance: host discovery, filtered searches, vulnerability banners, and asset inventory.
Share bugs, ideas, or general feedback.
Shodan 是全球首个互联网设备搜索引擎,持续扫描 IPv4 和 IPv6 地址空间,记录开放端口、运行服务、SSL 证书和已知漏洞。本技能涵盖使用 Shodan API 和 InternetDB 免费 API 富化来自安全告警的 IP 地址、根据暴露服务和漏洞评估威胁级别、识别托管基础设施模式,以及将 IP 声誉数据集成到 SOC 分类和威胁情报工作流中。
shodan 库(pip install shodan)Shodan 中的每条 IP 记录包含:开放端口和协议、Banner 数据(服务响应)、SSL/TLS 证书详情、已知 CVE 漏洞、主机名和反向 DNS、ASN 和 ISP 信息、地理位置、操作系统指纹,以及显示变化历史的历史扫描数据。
Shodan 的免费 InternetDB API(internetdb.shodan.io)无需身份验证即可快速查询 IP,返回开放端口、主机名、标签、CPE 和已知漏洞。适用于全量 Shodan API 可能触及速率限制的高容量富化场景。
通过以下维度综合评估 IP 声誉:开放端口数量和类型(异常端口表明可能遭到攻陷)、易受攻击服务(含已知 CVE 的未修补软件)、托管类型(住宅、云端、VPN/代理、防弹托管)、历史活动(过去与恶意软件、扫描、垃圾邮件的关联),以及地理背景(以特定威胁活动著称的国家/地区)。
import shodan
import json
from datetime import datetime
class ShodanEnricher:
def __init__(self, api_key):
self.api = shodan.Shodan(api_key)
self.info = self.api.info()
print(f"[+] Shodan API 已初始化。剩余扫描积分:{self.info.get('scan_credits', 0)}")
def enrich_ip(self, ip_address):
"""通过 Shodan 对 IP 地址进行完整富化。"""
try:
host = self.api.host(ip_address)
enrichment = {
"ip": ip_address,
"organization": host.get("org", ""),
"asn": host.get("asn", ""),
"isp": host.get("isp", ""),
"country": host.get("country_name", ""),
"country_code": host.get("country_code", ""),
"city": host.get("city", ""),
"latitude": host.get("latitude"),
"longitude": host.get("longitude"),
"os": host.get("os", ""),
"ports": host.get("ports", []),
"hostnames": host.get("hostnames", []),
"domains": host.get("domains", []),
"vulns": host.get("vulns", []),
"tags": host.get("tags", []),
"last_update": host.get("last_update", ""),
"services": [],
}
for service in host.get("data", []):
svc = {
"port": service.get("port", 0),
"transport": service.get("transport", "tcp"),
"product": service.get("product", ""),
"version": service.get("version", ""),
"module": service.get("_shodan", {}).get("module", ""),
"banner": service.get("data", "")[:200],
}
if "ssl" in service:
svc["ssl_subject"] = service["ssl"].get("cert", {}).get("subject", {})
svc["ssl_issuer"] = service["ssl"].get("cert", {}).get("issuer", {})
svc["ssl_expires"] = service["ssl"].get("cert", {}).get("expires", "")
enrichment["services"].append(svc)
# 计算声誉评分
enrichment["reputation"] = self._calculate_reputation(enrichment)
print(f"[+] {ip_address}:{len(enrichment['ports'])} 个端口,"
f"{len(enrichment['vulns'])} 个漏洞,"
f"声誉等级:{enrichment['reputation']['level']}")
return enrichment
except shodan.APIError as e:
print(f"[-] {ip_address} 的 Shodan 查询出错:{e}")
return None
def _calculate_reputation(self, data):
"""根据 Shodan 数据计算 IP 声誉评分。"""
score = 0
factors = []
# 漏洞评估
vuln_count = len(data.get("vulns", []))
if vuln_count > 10:
score += 40
factors.append(f"{vuln_count} 个已知漏洞")
elif vuln_count > 5:
score += 25
factors.append(f"{vuln_count} 个已知漏洞")
elif vuln_count > 0:
score += 10
factors.append(f"{vuln_count} 个已知漏洞")
# 可疑端口分析
suspicious_ports = {4444, 5555, 6666, 8888, 9090, 1234, 31337,
6667, 6697, 8080, 8443, 3128, 1080}
open_ports = set(data.get("ports", []))
sus_found = open_ports.intersection(suspicious_ports)
if sus_found:
score += 15
factors.append(f"可疑端口:{sus_found}")
# 基于标签的评估
malicious_tags = {"self-signed", "cloud", "vpn", "proxy", "tor"}
tags = set(data.get("tags", []))
mal_tags = tags.intersection(malicious_tags)
if mal_tags:
score += 10
factors.append(f"标签:{mal_tags}")
# 开放端口过多
port_count = len(data.get("ports", []))
if port_count > 20:
score += 15
factors.append(f"开放端口过多({port_count} 个)")
level = (
"critical" if score >= 50
else "high" if score >= 35
else "medium" if score >= 15
else "low"
)
return {"score": score, "level": level, "factors": factors}
def enrich_ip_free(self, ip_address):
"""使用免费 InternetDB API 快速富化 IP。"""
import requests
resp = requests.get(f"https://internetdb.shodan.io/{ip_address}", timeout=10)
if resp.status_code == 200:
data = resp.json()
print(f"[+] InternetDB:{ip_address} -> "
f"{len(data.get('ports', []))} 个端口,"
f"{len(data.get('vulns', []))} 个漏洞")
return data
return None
enricher = ShodanEnricher("YOUR_SHODAN_API_KEY")
result = enricher.enrich_ip("8.8.8.8")
print(json.dumps(result, indent=2, default=str))
import time
def batch_ip_reputation(enricher, ip_list, output_file="ip_reputation.json"):
"""检查 IP 地址列表的声誉。"""
results = []
for i, ip in enumerate(ip_list):
result = enricher.enrich_ip(ip)
if result:
results.append(result)
if (i + 1) % 10 == 0:
print(f" [{i+1}/{len(ip_list)}] 已处理")
time.sleep(1) # 速率限制
# 按声誉评分降序排列(最高风险在前)
results.sort(key=lambda x: x.get("reputation", {}).get("score", 0), reverse=True)
with open(output_file, "w") as f:
json.dump(results, f, indent=2, default=str)
# 汇总
levels = {"critical": 0, "high": 0, "medium": 0, "low": 0}
for r in results:
level = r.get("reputation", {}).get("level", "low")
levels[level] += 1
print(f"\n=== 批量声誉汇总 ===")
print(f"IP 总数:{len(results)}")
for level, count in levels.items():
print(f" {level.upper()}:{count}")
return results
suspicious_ips = ["203.0.113.1", "198.51.100.5", "192.0.2.100"]
results = batch_ip_reputation(enricher, suspicious_ips)
def correlate_infrastructure(enricher, ip_address):
"""根据共享属性发现相关基础设施。"""
host_data = enricher.enrich_ip(ip_address)
if not host_data:
return {}
correlations = {
"same_org": [],
"same_asn": [],
"shared_ssl": [],
}
# 搜索同一组织
org = host_data.get("organization", "")
if org:
try:
results = enricher.api.search(f'org:"{org}"', limit=20)
for match in results.get("matches", []):
correlations["same_org"].append({
"ip": match.get("ip_str", ""),
"port": match.get("port", 0),
"product": match.get("product", ""),
})
except shodan.APIError:
pass
# 搜索相同 SSL 证书
for service in host_data.get("services", []):
ssl_subject = service.get("ssl_subject", {})
if ssl_subject:
cn = ssl_subject.get("CN", "")
if cn:
try:
results = enricher.api.search(f'ssl.cert.subject.CN:"{cn}"', limit=20)
for match in results.get("matches", []):
correlations["shared_ssl"].append({
"ip": match.get("ip_str", ""),
"cn": cn,
})
except shodan.APIError:
pass
print(f"[+] {ip_address} 的基础设施关联:")
print(f" 同一组织:{len(correlations['same_org'])} 台主机")
print(f" 共享 SSL:{len(correlations['shared_ssl'])} 台主机")
return correlations