Performs forensic acquisition and analysis of Google Drive, OneDrive, Dropbox, and Box via API remote data and endpoint sync client artifacts. For cybersecurity digital forensics.
npx claudepluginhub killvxk/cybersecurity-skills-zhThis skill uses the workspace's default tool permissions.
云存储取证获取(Cloud storage forensic acquisition)涉及通过 API 远程获取和本地端点制品分析两种方式,从 Google Drive、OneDrive、Dropbox 和 Box 等服务收集数字证据。现代调查必须应对一个挑战:云同步文件可能存在多种状态——本地已同步、仅云端(按需下载)、已缓存和已删除。与云存储同步过的端点设备包含大量元数据,涵盖本地同步文件、仅存于云端的文件,甚至可从缓存文件夹恢复的已删除项目。使用特定服务 API 进行基于 API 的获取,在拥有有效凭据和适当法律授权的情况下,可直接访问远程数据。
Performs forensic acquisition and analysis of Google Drive, OneDrive, Dropbox, and Box via API remote data and local endpoint sync artifacts for cybersecurity investigations.
Performs forensic acquisition from Google Drive, OneDrive, Dropbox, and Box using APIs for remote data and endpoint artifacts for local sync analysis in investigations.
Conducts cloud forensics investigations on AWS, Azure, and GCP by preserving snapshots, collecting CloudTrail/Activity/Audit logs, and analyzing access for incident response.
Share bugs, ideas, or general feedback.
云存储取证获取(Cloud storage forensic acquisition)涉及通过 API 远程获取和本地端点制品分析两种方式,从 Google Drive、OneDrive、Dropbox 和 Box 等服务收集数字证据。现代调查必须应对一个挑战:云同步文件可能存在多种状态——本地已同步、仅云端(按需下载)、已缓存和已删除。与云存储同步过的端点设备包含大量元数据,涵盖本地同步文件、仅存于云端的文件,甚至可从缓存文件夹恢复的已删除项目。使用特定服务 API 进行基于 API 的获取,在拥有有效凭据和适当法律授权的情况下,可直接访问远程数据。
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
from googleapiclient.http import MediaIoBaseDownload
import io
import os
import json
from datetime import datetime
class GoogleDriveForensicAcquisition:
"""通过 API 对 Google Drive 文件和元数据进行取证获取。"""
def __init__(self, credentials_path: str, output_dir: str):
self.creds = Credentials.from_authorized_user_file(credentials_path)
self.service = build("drive", "v3", credentials=self.creds)
self.output_dir = output_dir
os.makedirs(output_dir, exist_ok=True)
self.acquisition_log = []
def list_all_files(self, include_trashed: bool = True) -> list:
"""列出所有文件,包括回收站中的项目。"""
files = []
page_token = None
query = "" if include_trashed else "trashed = false"
while True:
results = self.service.files().list(
q=query,
pageSize=1000,
fields="nextPageToken, files(id, name, mimeType, size, "
"createdTime, modifiedTime, trashed, trashedTime, "
"owners, sharingUser, permissions, md5Checksum, "
"parents, webViewLink, driveId)",
pageToken=page_token
).execute()
files.extend(results.get("files", []))
page_token = results.get("nextPageToken")
if not page_token:
break
return files
def download_file(self, file_id: str, file_name: str, mime_type: str) -> str:
"""下载 Google Drive 文件并保持取证完整性。"""
output_path = os.path.join(self.output_dir, file_name)
if mime_type.startswith("application/vnd.google-apps"):
export_formats = {
"application/vnd.google-apps.document": "application/pdf",
"application/vnd.google-apps.spreadsheet": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
"application/vnd.google-apps.presentation": "application/pdf",
}
export_mime = export_formats.get(mime_type, "application/pdf")
request = self.service.files().export_media(fileId=file_id, mimeType=export_mime)
else:
request = self.service.files().get_media(fileId=file_id)
with io.FileIO(output_path, "wb") as fh:
downloader = MediaIoBaseDownload(fh, request)
done = False
while not done:
_, done = downloader.next_chunk()
self.acquisition_log.append({
"timestamp": datetime.utcnow().isoformat(),
"file_id": file_id,
"file_name": file_name,
"output_path": output_path,
"action": "downloaded"
})
return output_path
def get_activity_log(self, file_id: str) -> list:
"""获取特定文件的活动/修订历史。"""
revisions = self.service.revisions().list(
fileId=file_id,
fields="revisions(id, modifiedTime, lastModifyingUser, size, md5Checksum)"
).execute()
return revisions.get("revisions", [])
def export_acquisition_report(self) -> str:
"""导出获取日志用于证据链文档。"""
report_path = os.path.join(self.output_dir, "acquisition_log.json")
with open(report_path, "w") as f:
json.dump({
"acquisition_start": self.acquisition_log[0]["timestamp"] if self.acquisition_log else None,
"acquisition_end": datetime.utcnow().isoformat(),
"total_files": len(self.acquisition_log),
"entries": self.acquisition_log
}, f, indent=2)
return report_path
import msal
import requests
import os
import json
from datetime import datetime
class OneDriveForensicAcquisition:
"""通过 Microsoft Graph API 对 OneDrive 文件和元数据进行取证获取。"""
def __init__(self, client_id: str, tenant_id: str, client_secret: str, output_dir: str):
self.output_dir = output_dir
os.makedirs(output_dir, exist_ok=True)
authority = f"https://login.microsoftonline.com/{tenant_id}"
self.app = msal.ConfidentialClientApplication(
client_id, authority=authority, client_credential=client_secret
)
token_result = self.app.acquire_token_for_client(
scopes=["https://graph.microsoft.com/.default"]
)
self.access_token = token_result.get("access_token")
self.headers = {"Authorization": f"Bearer {self.access_token}"}
self.base_url = "https://graph.microsoft.com/v1.0"
def list_user_files(self, user_id: str) -> list:
"""列出用户 OneDrive 中的所有文件。"""
url = f"{self.base_url}/users/{user_id}/drive/root/children"
files = []
while url:
response = requests.get(url, headers=self.headers)
data = response.json()
files.extend(data.get("value", []))
url = data.get("@odata.nextLink")
return files
def download_file(self, user_id: str, item_id: str, filename: str) -> str:
"""从 OneDrive 下载文件。"""
url = f"{self.base_url}/users/{user_id}/drive/items/{item_id}/content"
response = requests.get(url, headers=self.headers, stream=True)
output_path = os.path.join(self.output_dir, filename)
with open(output_path, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
return output_path
def get_deleted_items(self, user_id: str) -> list:
"""从 OneDrive 回收站获取已删除项目。"""
url = f"{self.base_url}/users/{user_id}/drive/special/recyclebin/children"
response = requests.get(url, headers=self.headers)
return response.json().get("value", [])
# 使用 KAPE 收集所有云存储制品
kape.exe --tsource C: --tdest C:\Output\CloudArtifacts --target GoogleDrive,OneDrive,Dropbox,Box
# OneDrive 制品位置
# %USERPROFILE%\AppData\Local\Microsoft\OneDrive\logs\
# %USERPROFILE%\AppData\Local\Microsoft\OneDrive\settings\
# %USERPROFILE%\OneDrive\
# Google Drive 制品位置
# %USERPROFILE%\AppData\Local\Google\DriveFS\
# 包含元数据 SQLite 数据库和缓存文件
# Dropbox 制品位置
# %USERPROFILE%\AppData\Local\Dropbox\
# %USERPROFILE%\Dropbox\.dropbox.cache\
# 包含 filecache.dbx(加密 SQLite)、host.dbx、config.dbx
import sqlite3
import os
def analyze_onedrive_sync_engine(db_path: str) -> list:
"""分析 OneDrive SyncEngineDatabase 以获取文件元数据。"""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# 查询所有被追踪的文件,包括仅云端项目
cursor.execute("""
SELECT fileName, fileSize, lastChange,
resourceID, parentResourceID, eTag
FROM od_ClientFile_Records
ORDER BY lastChange DESC
""")
files = []
for row in cursor.fetchall():
files.append({
"filename": row[0],
"size": row[1],
"last_change": row[2],
"resource_id": row[3],
"parent_id": row[4],
"etag": row[5]
})
conn.close()
return files
| 服务 | 本地数据库 | 缓存位置 | 日志文件 |
|---|---|---|---|
| OneDrive | SyncEngineDatabase.db | %LOCALAPPDATA%\Microsoft\OneDrive\cache\ | %LOCALAPPDATA%\Microsoft\OneDrive\logs\ |
| Google Drive | metadata_sqlite_db | %LOCALAPPDATA%\Google\DriveFS{account}\content_cache\ | %LOCALAPPDATA%\Google\DriveFS\Logs\ |
| Dropbox | filecache.dbx(加密) | %APPDATA%\Dropbox.dropbox.cache\ | %APPDATA%\Dropbox\logs\ |
| Box | sync_db | %LOCALAPPDATA%\Box\Box\cache\ | %LOCALAPPDATA%\Box\Box\logs\ |