Help us improve
Share bugs, ideas, or general feedback.
From klingai-pack
Implements tamper-evident audit logging for Kling AI API operations like text-to-video tasks, logging submissions, statuses, and credentials to JSONL files for compliance.
npx claudepluginhub jeremylongshore/claude-code-plugins-plus-skills --plugin klingai-packHow this skill is triggered — by the user, by Claude, or both
Slash command
/klingai-pack:klingai-audit-loggingThis skill is limited to the following tools:
The summary Claude sees in its skill listing — used to decide when to auto-load this skill
Compliance-grade audit logging for Kling AI API operations. Every task submission, status change, and credential usage is captured in tamper-evident structured logs.
Reviews security and compliance for Kling AI video generation API integrations using checklists for credentials, data flow, input validation, privacy, and GDPR prep.
Records audit logs of user inputs (verbatim), agent actions, phase transitions, and gate decisions with ISO 8601 timestamps. Supports SOC2/ISMS-P retention policies (30/90/365 days) per project.
Implements Python wrapper for audited OpenRouter API completions with generation metadata logging, cost/token tracking, prompt hashing, and compliance-ready entries.
Share bugs, ideas, or general feedback.
Compliance-grade audit logging for Kling AI API operations. Every task submission, status change, and credential usage is captured in tamper-evident structured logs.
import json
import hashlib
import time
from datetime import datetime
from pathlib import Path
class AuditLogger:
"""Append-only audit log with integrity checksums."""
def __init__(self, log_dir: str = "audit"):
self.log_dir = Path(log_dir)
self.log_dir.mkdir(exist_ok=True)
self._prev_hash = "genesis"
def _compute_hash(self, entry: dict) -> str:
raw = json.dumps(entry, sort_keys=True) + self._prev_hash
return hashlib.sha256(raw.encode()).hexdigest()[:16]
def log(self, event_type: str, actor: str, details: dict):
"""Write a tamper-evident audit entry."""
entry = {
"timestamp": datetime.utcnow().isoformat() + "Z",
"event_type": event_type,
"actor": actor,
"details": details,
"prev_hash": self._prev_hash,
}
entry["hash"] = self._compute_hash(entry)
self._prev_hash = entry["hash"]
date = datetime.utcnow().strftime("%Y-%m-%d")
filepath = self.log_dir / f"audit-{date}.jsonl"
with open(filepath, "a") as f:
f.write(json.dumps(entry) + "\n")
return entry["hash"]
class KlingAuditClient:
"""Kling client with full audit trail."""
def __init__(self, base_client, audit: AuditLogger, actor: str = "system"):
self.client = base_client
self.audit = audit
self.actor = actor
def text_to_video(self, prompt: str, **kwargs):
# Log submission
self.audit.log("task_submitted", self.actor, {
"action": "text_to_video",
"model": kwargs.get("model", "kling-v2-master"),
"duration": kwargs.get("duration", 5),
"mode": kwargs.get("mode", "standard"),
"prompt_hash": hashlib.sha256(prompt.encode()).hexdigest()[:16],
"prompt_length": len(prompt),
})
result = self.client.text_to_video(prompt, **kwargs)
# Log completion
self.audit.log("task_completed", self.actor, {
"action": "text_to_video",
"status": "succeed",
"video_count": len(result.get("videos", [])),
})
return result
def log_auth_event(self, event: str, success: bool):
self.audit.log("auth_event", self.actor, {
"event": event,
"success": success,
"access_key_prefix": self.client.config.access_key[:8] + "...",
})
def verify_audit_chain(log_file: str) -> bool:
"""Verify tamper-evidence of audit log chain."""
prev_hash = "genesis"
entries = []
with open(log_file) as f:
for line_num, line in enumerate(f, 1):
entry = json.loads(line)
entries.append(entry)
if entry["prev_hash"] != prev_hash:
print(f"Chain broken at line {line_num}: "
f"expected prev_hash={prev_hash}, got {entry['prev_hash']}")
return False
# Recompute hash
check_entry = {k: v for k, v in entry.items() if k != "hash"}
raw = json.dumps(check_entry, sort_keys=True) + prev_hash
expected_hash = hashlib.sha256(raw.encode()).hexdigest()[:16]
if entry["hash"] != expected_hash:
print(f"Hash mismatch at line {line_num}")
return False
prev_hash = entry["hash"]
print(f"Verified {len(entries)} entries -- chain intact")
return True
def generate_audit_report(log_dir: str = "audit", days: int = 30) -> dict:
"""Generate compliance audit report."""
from collections import Counter
from datetime import timedelta
log_path = Path(log_dir)
events = []
cutoff = datetime.utcnow() - timedelta(days=days)
for filepath in sorted(log_path.glob("audit-*.jsonl")):
with open(filepath) as f:
for line in f:
entry = json.loads(line)
if entry["timestamp"] >= cutoff.isoformat():
events.append(entry)
event_types = Counter(e["event_type"] for e in events)
actors = Counter(e["actor"] for e in events)
report = {
"period_days": days,
"total_events": len(events),
"event_types": dict(event_types),
"unique_actors": len(actors),
"actors": dict(actors),
"first_event": events[0]["timestamp"] if events else None,
"last_event": events[-1]["timestamp"] if events else None,
}
print(f"\n=== Audit Report ({days} days) ===")
print(f"Total events: {report['total_events']}")
for event_type, count in event_types.most_common():
print(f" {event_type}: {count}")
print(f"Actors: {', '.join(actors.keys())}")
return report