From python-services
Builds non-blocking event publishing systems with queue management, retry logic, exponential backoff, error categorization, and resilient HTTP handling. Use when implementing activity/audit logging, telemetry clients, event-driven architectures, or async job submission systems in Python.
npx claudepluginhub andercore-labs/claudes-kitchen --plugin python-servicesThis skill uses the workspace's default tool permissions.
```python
Implements structured self-debugging workflow for AI agent failures: capture errors, diagnose patterns like loops or context overflow, apply contained recoveries, and generate introspection reports.
Monitors deployed URLs for regressions in HTTP status, console errors, performance metrics, content, network, and APIs after deploys, merges, or upgrades.
Provides React and Next.js patterns for component composition, compound components, state management, data fetching, performance optimization, forms, routing, and accessible UIs.
# Non-blocking client with context manager
class ActivityClient:
def publish(event: dict[str, Any]) -> None # Queue any event
def publish_activity(activity: ActivityEnvelope) -> None # Queue structured activity
def publish_audit_log(audit_log: AuditLogActivity) -> None # Queue validated audit log
def flush() -> None # Wait for queue drain
def close() -> None # Graceful shutdown
def on(event_name: str, handler: Callable) -> None # Register event handler
def __enter__() -> Self # Context manager support
def __exit__() -> None # Auto flush & close
# Worker with retry (background thread)
class _Worker(threading.Thread):
def _send(event: dict[str, Any]) -> None:
delay = initial_delay * (backoff_factor ** attempt)
# 2xx-3xx → success | 429 → rate limit | 5xx,409 → retry | 4xx → drop
⚠️ CRITICAL: Present plan to user BEFORE implementation
Analyze requirements:
1. Event types → What events? (activity, audit, telemetry)
2. Volume → Events/second expected? (low <10, medium 10-1000, high >1000)
3. Destinations → Where? (single endpoint, multiple, fan-out)
4. Reliability → SLA? (best-effort, guaranteed, exactly-once)
5. Integration → Framework? (FastAPI, Flask, standalone)
6. JSON examples → Request examples of input events and expected output format
Ask user for JSON examples:
# Use AskUserQuestion tool FIRST to gather examples
questions = [{
"question": "Do you have example JSON for the events you want to publish?",
"header": "JSON Examples",
"options": [
{
"label": "Yes - I'll provide examples",
"description": "Provide sample input event JSON and expected output format"
},
{
"label": "No - Use standard format",
"description": "Use Activity Stream or Audit Log standard formats"
},
{
"label": "Need help deciding",
"description": "Show me the standard formats first"
}
],
"multiSelect": False
}]
# IF user provides examples:
# - Analyze JSON structure
# - Identify required fields
# - Determine validation needs
# - Plan schema validation
Create implementation plan:
PLAN:
- Queue size → Based on volume (default 10k, adjust if needed)
- Retry policy → max_attempts, backoff_factor based on SLA
- Worker threads → Single (default) or multiple (high volume)
- Event types → publish() vs publishActivity() vs publishAuditLog()
- Error handling → Callbacks, severity thresholds, alerting
- Validation → Schema validation, required fields
- Testing strategy → Unit, integration, load tests
Present to user with AskUserQuestion:
# Use AskUserQuestion tool to present plan
questions = [{
"question": "Review the implementation plan for the activity publisher:",
"header": "Implementation Plan",
"options": [
{
"label": "Approve - Proceed with implementation",
"description": f"Queue: {queue_size}, Retry: {max_attempts}x, Volume: {volume_estimate}"
},
{
"label": "Modify - Adjust parameters",
"description": "Change queue size, retry policy, or architecture decisions"
},
{
"label": "Explain - Need more details",
"description": "Provide more context about trade-offs and design decisions"
}
],
"multiSelect": False
}]
User approval required → proceed to Phase 2
Only after user approves plan:
Non-blocking design:
publish() → immediate return (enqueue only)queue.Queue → bounded, configurablePublishing Methods:
| Method | Purpose | Validation | Use Case |
|---|---|---|---|
publish(event) | Generic event | None | Flexible, any dict |
publish_activity(activity) | Structured activity | Envelope validation | Activity Stream spec |
publish_audit_log(audit_log) | Audit logging | Full validation | Compliance, audit trails |
Queue Management:
queue.Full → raises exceptionflush() → waits via queue.join()close() → flush then shutdownSend Logic:
FOR each attempt 1..max_attempts:
IF shutdown set → return
TRY HTTP POST to endpoint
status → _classify_response_status()
CASE success (2xx-3xx) → emit success event, return
CASE permanent_failure (4xx except 429,409) → emit error, return
CASE rate_limited (429) → Retry-After header, continue
CASE retryable (5xx, 409) → exponential backoff, continue
CATCH RequestException → retry if attempts remaining
sleep(delay)
delay *= backoff_factor
RETRY exhausted → emit final_error event
Exponential Backoff:
delay = initial_delay * (backoff_factor ** attempt)
# Default: initial=1s, backoff=2.0 → 1, 2, 4, 8, 16...
| Code | Category | Action |
|---|---|---|
| 2xx, 3xx | success | Emit success_sent event, return |
| 429 | rate_limited | Read Retry-After, sleep, retry |
| 5xx, 409 | retryable | Exponential backoff, retry |
| 4xx (not 429, 409) | permanent_failure | Emit error event, drop |
| Connection timeout/error | retryable | Exponential backoff, retry |
Error Types & Severity:
# Severity: CRITICAL > ERROR > WARN > LOW
class ErrorContext:
severity: ErrorSeverity # CRITICAL | ERROR | WARN | LOW
category: str # "network" | "validation" | "rate_limit" | "permanent"
attempt: int # Current attempt number
error: str # Error message
response_status: int | None # HTTP status if applicable
event_data: dict | None # Event that failed (if include_event_data=True)
Error Callbacks:
error_callback = lambda context: log_error(context)
# Called with ErrorContext for CRITICAL/ERROR/WARN
# Timeout-protected (max 5s default)
Data Models:
See advanced.md for:
VALIDATE:
- [ ] publish() non-blocking? (immediate return)
- [ ] Thread-safe queue.Queue used?
- [ ] Bounded queue? (max_queue_size enforced)
- [ ] Queue full → raises queue.Full?
- [ ] Worker is daemon thread?
- [ ] Exponential backoff implemented?
- [ ] HTTP status classified correctly (2xx|429|5xx/409|4xx)?
- [ ] Rate limit respects Retry-After header?
- [ ] 4xx (not 429/409) permanent failures drop immediately?
- [ ] Error callbacks timeout-protected?
- [ ] Context manager (__enter__/__exit__) implemented?
- [ ] flush() uses queue.join()?
- [ ] close() calls flush() before shutdown?
- [ ] Event handlers registered via on()?
- [ ] Builder pattern fluent (returns self)?
- [ ] Immutable dataclasses for data models?
- [ ] Type hints throughout?
See examples.md for:
| Phase | Action |
|---|---|
| 1. SCOPE | Extract context: files, mode (informative | executive), sessionId |
| 2. VERIFY | Run checklist against implementation |
| 3. VIOLATIONS | Collect with file:line, check name, severity |
| 4. REPORT | ✓ Pass → Proceed | ✗ Fail → Return violations |
| 5. METRICS | Call mcp__agent-orchestrator__store-skill-metrics |
| 6. OUTPUT | Return JSON: violations[], fixRate, finalViolations |
Metrics Structure:
{
"sessionId": str,
"skill": "python-services:python-activity-publisher-recipe",
"initialViolations": int,
"iterations": int,
"fixesApplied": int,
"finalViolations": int,
"mode": "informative" | "executive",
"duration": float
}
Output Format:
{
"status": "success" | "failed",
"violations": [
{
"file": "src/activity_client.py",
"line": 42,
"check": "Non-blocking design",
"violation": "publish() blocks on queue operation",
"severity": "critical"
}
],
"metrics": {
"initialViolations": 2,
"finalViolations": 0,
"fixRate": 1.0
}
}