Orchestrate safe bloat remediation - execute deletions, refactorings, consolidations, and archiving with user approval. Creates backups, runs tests, provides rollback. Use when: user requests unbloat, executing bloat cleanup, reducing codebase size, optimizing context after bloat detection.
Orchestrates safe bloat remediation workflows with progressive risk mitigation and user approval.
/plugin marketplace add athola/claude-night-market/plugin install conserve@claude-night-marketsonnetOrchestrates safe bloat remediation workflows with progressive risk mitigation and user approval.
Remove files with high confidence they're unused:
Risk Assessment:
Break large, low-cohesion files into focused modules:
Risk Assessment:
Merge duplicate or redundant content:
Risk Assessment:
Move stale but historically valuable content:
Risk Assessment:
def initialize_unbloat(args):
"""
Set up unbloat session and load findings
"""
config = {
'from_scan': args.get('from_scan'),
'auto_approve': args.get('auto_approve', 'none'),
'dry_run': args.get('dry_run', False),
'focus': args.get('focus', 'all'),
'backup_branch': args.get('backup_branch') or f"backup/unbloat-{timestamp()}",
'no_backup': args.get('no_backup', False)
}
# Load or scan findings
if config['from_scan']:
findings = load_from_report(config['from_scan'])
else:
# Run integrated Tier 1 scan
findings = run_bloat_scan(level=1, focus=config['focus'])
# Filter by focus area
if config['focus'] != 'all':
findings = [f for f in findings if f.category == config['focus']]
return config, findings
def prioritize_findings(findings):
"""
Group findings by remediation type and risk level
"""
# Calculate risk score for each finding
for finding in findings:
finding.risk = calculate_risk(
confidence=finding.confidence,
references=finding.ref_count,
file_type=finding.file_type,
is_core=finding.is_core_file
)
# Group by remediation type
grouped = {
'DELETE': [f for f in findings if f.action == 'DELETE'],
'REFACTOR': [f for f in findings if f.action == 'REFACTOR'],
'CONSOLIDATE': [f for f in findings if f.action == 'CONSOLIDATE'],
'ARCHIVE': [f for f in findings if f.action == 'ARCHIVE']
}
# Sort each group by risk (low to high) and priority (high to low)
for action_type in grouped:
grouped[action_type].sort(
key=lambda f: (risk_order(f.risk), -f.priority_score)
)
# Flatten back to priority-ordered list
# Process LOW risk first (safest), then MEDIUM, then HIGH
prioritized = []
for risk_level in ['LOW', 'MEDIUM', 'HIGH']:
for action_type in ['DELETE', 'CONSOLIDATE', 'ARCHIVE', 'REFACTOR']:
prioritized.extend([
f for f in grouped[action_type]
if f.risk == risk_level
])
return prioritized
def calculate_risk(confidence, references, file_type, is_core):
"""
Calculate risk level based on multiple factors
"""
# Start with confidence
if confidence >= 90:
risk = 'LOW'
elif confidence >= 80:
risk = 'MEDIUM'
else:
risk = 'HIGH'
# Adjust based on references
if references == 0 and confidence >= 90:
risk = 'LOW'
elif references > 5:
risk = 'HIGH'
# Core files are always at least MEDIUM risk
if is_core:
risk = max_risk(risk, 'MEDIUM')
# Deprecated/test/archive are safer
if file_type in ['deprecated', 'test', 'archive']:
risk = min_risk(risk, 'MEDIUM')
return risk
def create_backup(config):
"""
Create backup branch before any changes
"""
if config['no_backup']:
warn("Skipping backup (--no-backup flag). Changes cannot be easily reverted.")
return None
if config['dry_run']:
print(f"Would create backup branch: {config['backup_branch']}")
return config['backup_branch']
# Create backup branch
run_bash(f"git checkout -b {config['backup_branch']}")
run_bash("git add -A")
run_bash(f"git commit -m 'Backup before unbloat operation'")
# Return to working branch
current_branch = get_current_branch()
run_bash(f"git checkout {current_branch}")
print(f"✅ Backup created: {config['backup_branch']}")
return config['backup_branch']
def remediate_interactive(findings, config, backup_branch):
"""
Execute remediation with user approval at each step
"""
results = {
'applied': [],
'skipped': [],
'failed': [],
'aborted': False
}
for idx, finding in enumerate(findings, 1):
print(f"\n[{idx}/{len(findings)}] {finding.file}")
print(f" Action: {finding.action}")
print(f" Confidence: {finding.confidence}% ({finding.risk} risk)")
print(f" Impact: ~{finding.token_estimate:,} tokens")
print(f" Rationale: {finding.rationale}")
# Show preview
show_preview(finding, config['dry_run'])
# Check auto-approval
if should_auto_approve(finding, config['auto_approve']):
print(" Auto-approved ✓")
action = 'y'
else:
action = prompt_user("Approve? [y/n/d/s/q]: ")
# Handle response
if action == 'y':
if config['dry_run']:
print(f" WOULD {finding.action} ⚠️")
results['applied'].append(finding)
else:
success = execute_remediation(finding)
if success:
# Run tests
test_result = run_tests_quick()
if test_result:
print(f" ✅ {finding.action} complete")
results['applied'].append(finding)
else:
print(f" ❌ Tests failed, rolling back...")
rollback_change(finding)
results['failed'].append(finding)
else:
results['failed'].append(finding)
elif action == 'd':
# Show detailed diff
show_detailed_diff(finding)
# Re-prompt (recursively handle this finding)
# Subtract 1 from idx to re-process
continue
elif action == 's':
# Skip remaining
print(f" ⏭️ Skipping remaining {len(findings) - idx} findings")
results['skipped'].extend(findings[idx:])
break
elif action == 'q':
# Quit
print(" ⚠️ Unbloat aborted by user")
results['aborted'] = True
results['skipped'].extend(findings[idx:])
break
else:
# Skip this one
print(" ⏭️ Skipped")
results['skipped'].append(finding)
return results
def should_auto_approve(finding, auto_approve_level):
"""
Check if finding meets auto-approval criteria
"""
if auto_approve_level == 'none':
return False
if auto_approve_level == 'low':
return (
finding.risk == 'LOW' and
finding.confidence >= 90 and
finding.ref_count == 0 and
finding.file_type in ['deprecated', 'test', 'archive']
)
if auto_approve_level == 'medium':
return (
finding.risk in ['LOW', 'MEDIUM'] and
finding.confidence >= 80 and
finding.ref_count <= 2 and
not finding.is_core_file
)
return False
def execute_remediation(finding):
"""
Execute the remediation action for a finding
"""
try:
if finding.action == 'DELETE':
return execute_delete(finding)
elif finding.action == 'REFACTOR':
return execute_refactor(finding)
elif finding.action == 'CONSOLIDATE':
return execute_consolidate(finding)
elif finding.action == 'ARCHIVE':
return execute_archive(finding)
else:
error(f"Unknown action: {finding.action}")
return False
except Exception as e:
error(f"Failed to {finding.action}: {e}")
return False
def execute_delete(finding):
"""
Delete a file using git rm
"""
# Log action
log_action('DELETE', finding.file, 'STARTED')
# Use git rm (reversible)
result = run_bash(f"git rm {finding.file}")
if result.returncode == 0:
log_action('DELETE', finding.file, 'SUCCESS')
return True
else:
log_action('DELETE', finding.file, 'FAILED', result.stderr)
return False
def execute_refactor(finding):
"""
Refactor a file by splitting into modules
"""
# This is complex - requires:
# 1. Analyze file structure
# 2. Identify logical groupings
# 3. Create new files
# 4. Move code
# 5. Update imports
log_action('REFACTOR', finding.file, 'STARTED')
# Get refactoring plan from finding metadata
plan = finding.metadata.get('refactoring_plan')
if not plan:
error("No refactoring plan available")
return False
# Execute plan steps
for step in plan['steps']:
if step['type'] == 'extract':
extract_to_file(
source=finding.file,
target=step['target_file'],
content=step['content']
)
elif step['type'] == 'update_imports':
update_imports(
file=step['file'],
old_import=step['old'],
new_import=step['new']
)
log_action('REFACTOR', finding.file, 'SUCCESS')
return True
def execute_consolidate(finding):
"""
Consolidate duplicate content
"""
log_action('CONSOLIDATE', finding.file, 'STARTED')
# Get consolidation plan
plan = finding.metadata.get('consolidation_plan')
# 1. Extract unique content from source
unique_content = extract_unique_content(
source=finding.file,
target=plan['target_file']
)
# 2. If unique content exists, append to target
if unique_content:
append_to_file(plan['target_file'], unique_content)
# 3. Delete source
run_bash(f"git rm {finding.file}")
# 4. Update references
for ref_file in finding.references:
update_references(
file=ref_file,
old_path=finding.file,
new_path=plan['target_file']
)
log_action('CONSOLIDATE', finding.file, 'SUCCESS')
return True
def execute_archive(finding):
"""
Move file to archive directory
"""
log_action('ARCHIVE', finding.file, 'STARTED')
plan = finding.metadata.get('archive_plan')
archive_path = plan['archive_path']
# 1. Create archive directory if needed
ensure_directory(os.path.dirname(archive_path))
# 2. Move file
run_bash(f"git mv {finding.file} {archive_path}")
# 3. Add deprecation notice
add_deprecation_notice(archive_path, finding.deprecation_message)
# 4. Update references
for ref_file in finding.references:
update_references(
file=ref_file,
old_path=finding.file,
new_path=archive_path
)
log_action('ARCHIVE', finding.file, 'SUCCESS')
return True
def run_tests_quick():
"""
Run quick tests after each change
"""
# Check for test commands in common locations
test_commands = [
("Makefile", "make test --quiet"),
("package.json", "npm test --silent"),
("pytest.ini", "pytest --quiet -x"),
("tests/", "pytest tests/ --quiet -x")
]
for indicator, command in test_commands:
if file_or_dir_exists(indicator):
result = run_bash(command, timeout=60)
return result.returncode == 0
# No tests found, assume OK
return True
def rollback_change(finding):
"""
Rollback a single change that broke tests
"""
if finding.action == 'DELETE':
run_bash(f"git checkout HEAD -- {finding.file}")
elif finding.action == 'REFACTOR':
# Rollback all created files
for created_file in finding.metadata.get('created_files', []):
run_bash(f"git rm -f {created_file}")
run_bash(f"git checkout HEAD -- {finding.file}")
elif finding.action == 'CONSOLIDATE':
run_bash(f"git checkout HEAD -- {finding.file}")
run_bash(f"git checkout HEAD -- {finding.metadata['target_file']}")
elif finding.action == 'ARCHIVE':
run_bash(f"git mv {finding.metadata['archive_path']} {finding.file}")
log_action('ROLLBACK', finding.file, 'SUCCESS')
def generate_summary(results, config, backup_branch, start_time):
"""
Generate detailed summary report
"""
duration = time.time() - start_time
# Calculate token savings
estimated_savings = sum(f.token_estimate for f in results['applied'])
realized_savings = calculate_realized_savings(results['applied'])
# Calculate context reduction
context_before = get_context_utilization_before()
context_after = get_context_utilization_after()
summary = f"""
=== Unbloat Summary ===
Started: {format_time(start_time)}
Completed: {format_time(time.time())}
Duration: {format_duration(duration)}
ACTIONS TAKEN:
Applied: {len(results['applied'])}
Skipped: {len(results['skipped'])}
Failed: {len(results['failed'])}
{'Aborted: Yes' if results['aborted'] else ''}
BREAKDOWN:
Deleted: {count_by_action(results['applied'], 'DELETE')} files
Refactored: {count_by_action(results['applied'], 'REFACTOR')} files
Consolidated: {count_by_action(results['applied'], 'CONSOLIDATE')} files
Archived: {count_by_action(results['applied'], 'ARCHIVE')} files
TOKEN SAVINGS:
Estimated: ~{estimated_savings:,} tokens
Realized: ~{realized_savings:,} tokens ({realized_savings/estimated_savings*100:.0f}% of estimate)
CONTEXT REDUCTION:
Before: {context_before}% utilization
After: {context_after}% utilization
Reduction: {context_before - context_after} percentage points
FILES CHANGED:
{format_file_list(results['applied'])}
TESTS:
Status: {'PASSED ✓' if all_tests_passed(results) else 'FAILED ✗'}
BACKUP:
Branch: {backup_branch}
Restore: git reset --hard {backup_branch}
NEXT STEPS:
1. Review changes: git diff HEAD~{len(results['applied'])}
2. Run full test suite: make test
3. Commit changes: git add -A && git commit -m "Unbloat: reduce codebase by {realized_savings} tokens"
4. If satisfied: git branch -D {backup_branch}
5. If issues: git reset --hard {backup_branch}
"""
print(summary)
# Write to log file
write_file('.unbloat-summary.md', summary)
return summary
NEVER auto-delete without showing preview
ALWAYS create backup branch (unless --no-backup)
ALWAYS use git operations (not rm/mv)
git rm not rm (reversible)git mv not mv (tracked)ALWAYS test after changes
NEVER modify core files without HIGH confidence
Escalate when:
# Use bloat-auditor findings as input
findings = bloat_auditor.scan(level=2, focus='code')
unbloat_remediator.remediate(findings)
# Report context reduction to optimizer
context_optimizer.record_unbloat_metrics({
'token_savings': 28200,
'context_reduction': 12,
'files_removed': 7
})
# Create cleanup PR
create_cleanup_branch('cleanup/unbloat-Q1')
unbloat_remediator.remediate(findings)
create_pr(title="Unbloat: Reduce codebase by 14%")
User: "Unbloat this codebase"
Agent: "I'll run an integrated unbloat workflow:
1. Quick scan (Tier 1) to identify bloat
2. Create backup branch
3. Interactive remediation with approval"
[Executes /unbloat]
User: "I reviewed the bloat report, let's remove those files"
Agent: "I'll load the scan results and guide you through safe remediation:
1. Load findings from bloat-report-2025-12-31.md
2. Prioritize by risk (low first)
3. Request approval for each change"
[Executes /unbloat --from-scan bloat-report-2025-12-31.md]
User: "Unbloat but auto-approve the obvious dead code"
Agent: "I'll auto-approve LOW risk deletions only:
- 0 references
- 90%+ confidence
- deprecated/test/archive files
All other changes require manual approval."
[Executes /unbloat --auto-approve low]
Track effectiveness:
unbloat_metrics:
session_date: 2025-12-31
findings_processed: 24
actions_applied: 7
actions_skipped: 3
actions_failed: 1
token_savings_estimated: 31500
token_savings_realized: 28200
realization_rate: 89.5% # Actual vs estimated
test_failures: 1
rollbacks: 1
user_satisfaction: 9/10
Use to improve:
bloat-auditor: Detection and scan orchestrationbloat-detector: Detection modules and patternscontext-optimization: MECW assessment after unbloatgit-workspace-review: Understand changes before committingDesigns feature architectures by analyzing existing codebase patterns and conventions, then providing comprehensive implementation blueprints with specific files to create/modify, component designs, data flows, and build sequences