npx claudepluginhub joshuarweaver/cascade-code-general-misc-1 --plugin epieczko-bettyThis skill uses the workspace's default tool permissions.
**Version:** 0.1.0
Guides Next.js Cache Components and Partial Prerendering (PPR) with cacheComponents enabled. Implements 'use cache', cacheLife(), cacheTag(), revalidateTag(), static/dynamic optimization, and cache debugging.
Guides building MCP servers enabling LLMs to interact with external services via tools. Covers best practices, TypeScript/Node (MCP SDK), Python (FastMCP).
Generates original PNG/PDF visual art via design philosophy manifestos for posters, graphics, and static designs on user request.
Version: 0.1.0 Status: Active Tags: telemetry, logging, observability, audit
The telemetry.capture skill provides comprehensive execution logging for Betty Framework components. It captures usage metrics, execution status, timing data, and contextual metadata in a structured, thread-safe manner.
All telemetry data is written to /registry/telemetry.json with ISO timestamps and validated JSON schema.
@capture_telemetry)This skill enables:
# Capture a successful execution
python skills/telemetry.capture/telemetry_capture.py plugin.build success 320 CLI
# Capture with inputs
python skills/telemetry.capture/telemetry_capture.py \
agent.run success 1500 API '{"agent": "api.designer", "task": "design_api"}'
# Capture a failure
python skills/telemetry.capture/telemetry_capture.py \
workflow.compose failure 2800 CLI '{"workflow": "api_first"}' "Validation failed at step 3"
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent / "../telemetry.capture"))
from telemetry_utils import capture_telemetry
@capture_telemetry(skill_name="agent.run", caller="CLI", capture_inputs=True)
def run_agent(agent_path: str, task_context: str = None):
"""Execute a Betty agent."""
# ... implementation
return {"status": "success", "result": execution_result}
# Usage
result = run_agent("/agents/api.designer", "Design user authentication API")
# Telemetry is automatically captured
from telemetry_utils import TelemetryContext
def build_plugin(plugin_path: str):
with TelemetryContext(skill="plugin.build", caller="CLI") as ctx:
ctx.set_inputs({"plugin_path": plugin_path})
try:
# Perform build operations
result = create_plugin_archive(plugin_path)
ctx.set_status("success")
return result
except Exception as e:
ctx.set_error(str(e))
raise
from telemetry_capture import TelemetryCapture
telemetry = TelemetryCapture()
# Capture an event
entry = telemetry.capture(
skill="plugin.build",
status="success",
duration_ms=320.5,
caller="CLI",
inputs={"plugin_path": "./plugin.yaml", "output_format": "tar.gz"},
metadata={"user": "developer@example.com", "environment": "production"}
)
print(f"Captured: {entry['timestamp']}")
from telemetry_capture import TelemetryCapture
telemetry = TelemetryCapture()
# Query recent failures
failures = telemetry.query(status="failure", limit=10)
# Query specific skill usage
agent_runs = telemetry.query(skill="agent.run", limit=50)
# Query by caller
cli_executions = telemetry.query(caller="CLI", limit=100)
for entry in failures:
print(f"{entry['timestamp']}: {entry['skill']} - {entry['error_message']}")
| Parameter | Type | Required | Description |
|---|---|---|---|
skill | string | Yes | Name of the skill/component (e.g., 'plugin.build') |
status | string | Yes | Execution status: success, failure, timeout, error, pending |
duration_ms | number | Yes | Execution duration in milliseconds |
caller | string | Yes | Source of the call (CLI, API, workflow.compose) |
inputs | object | No | Sanitized input parameters (default: {}) |
error_message | string | No | Error message if status is failure/error |
metadata | object | No | Additional context (user, session_id, environment) |
| Parameter | Type | Default | Description |
|---|---|---|---|
skill_name | string | function name | Override skill name |
caller | string | "runtime" | Caller identifier |
capture_inputs | boolean | False | Whether to capture function arguments |
sanitize_keys | list | None | Parameter names to redact (e.g., ['password']) |
{
"timestamp": "2025-10-24T14:30:45.123456+00:00",
"skill": "plugin.build",
"status": "success",
"duration_ms": 320.5,
"caller": "CLI",
"inputs": {
"plugin_path": "./plugin.yaml",
"output_format": "tar.gz"
},
"error_message": null,
"metadata": {
"user": "developer@example.com",
"environment": "production"
}
}
[
{
"timestamp": "2025-10-24T14:30:45.123456+00:00",
"skill": "plugin.build",
"status": "success",
"duration_ms": 320.5,
"caller": "CLI",
"inputs": {
"plugin_path": "./plugin.yaml",
"output_format": "tar.gz"
},
"error_message": null,
"metadata": {}
},
{
"timestamp": "2025-10-24T14:32:10.789012+00:00",
"skill": "agent.run",
"status": "success",
"duration_ms": 1500.0,
"caller": "API",
"inputs": {
"agent": "api.designer"
},
"error_message": null,
"metadata": {}
}
]
Note: The telemetry file is a simple JSON array for efficient querying and compatibility with existing Betty Framework tools.
python skills/telemetry.capture/telemetry_capture.py \
plugin.build success 320 CLI '{"plugin_path": "./plugin.yaml", "output_format": "tar.gz"}'
Output:
{
"timestamp": "2025-10-24T14:30:45.123456+00:00",
"skill": "plugin.build",
"status": "success",
"duration_ms": 320.0,
"caller": "CLI",
"inputs": {
"plugin_path": "./plugin.yaml",
"output_format": "tar.gz"
},
"error_message": null,
"metadata": {}
}
✓ Telemetry captured to /home/user/betty/registry/telemetry.json
# In skills/agent.run/agent_run.py
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent / "../telemetry.capture"))
from telemetry_utils import capture_telemetry
@capture_telemetry(
skill_name="agent.run",
caller="CLI",
capture_inputs=True,
sanitize_keys=["api_key", "password"]
)
def main():
"""Execute agent with automatic telemetry capture."""
# ... existing implementation
return {"status": "success", "result": result}
if __name__ == "__main__":
main()
from telemetry_capture import TelemetryCapture
telemetry = TelemetryCapture()
failures = telemetry.query(status="failure", limit=10)
print("Recent Failures:")
for entry in failures:
print(f" [{entry['timestamp']}] {entry['skill']}")
print(f" Error: {entry['error_message']}")
print(f" Duration: {entry['duration_ms']}ms")
print()
# Raises ValueError
telemetry.capture(
skill="test.skill",
status="invalid_status", # Must be: success, failure, timeout, error, pending
duration_ms=100,
caller="CLI"
)
Error: ValueError: Invalid status: invalid_status. Must be one of: success, failure, timeout, error, pending
python skills/telemetry.capture/telemetry_capture.py \
plugin.build success 320 CLI '{invalid json}'
Error: Error: Invalid JSON for inputs: Expecting property name enclosed in double quotes
The implementation uses fcntl.flock for thread-safe writes. If multiple processes write simultaneously:
This skill has no external dependencies beyond Python standard library:
json - JSON parsing and serializationfcntl - File locking for thread safetydatetime - ISO 8601 timestamp generationpathlib - Path handlingtyping - Type annotations| Variable | Default | Description |
|---|---|---|
BETTY_TELEMETRY_FILE | /home/user/betty/registry/telemetry.json | Path to telemetry file |
from telemetry_capture import TelemetryCapture
# Use custom location
telemetry = TelemetryCapture(telemetry_file="/custom/path/telemetry.json")
A: The skill automatically creates the telemetry file on first use. Ensure:
A: Ensure you:
/registry/telemetry.jsonA: Use sanitize_keys parameter:
@capture_telemetry(
skill_name="auth.login",
capture_inputs=True,
sanitize_keys=["password", "api_key", "secret_token"]
)
def login(username: str, password: str):
# password will be redacted as "***REDACTED***"
pass
A: Minimal impact:
The telemetry.capture skill integrates with:
All core Betty components should use the @capture_telemetry decorator for consistent observability.
Part of the Betty Framework. See repository LICENSE.