From typescript-services
Builds non-blocking event publishing systems with queue management, retry logic, exponential backoff, error categorization, and resilient HTTP handling. Use when implementing activity/audit logging, telemetry clients, event-driven architectures, or async job submission systems in TypeScript.
npx claudepluginhub andercore-labs/claudes-kitchen --plugin typescript-servicesThis skill uses the workspace's default tool permissions.
```typescript
Implements structured self-debugging workflow for AI agent failures: capture errors, diagnose patterns like loops or context overflow, apply contained recoveries, and generate introspection reports.
Monitors deployed URLs for regressions in HTTP status, console errors, performance metrics, content, network, and APIs after deploys, merges, or upgrades.
Provides React and Next.js patterns for component composition, compound components, state management, data fetching, performance optimization, forms, routing, and accessible UIs.
// Non-blocking client
class ActivityClient extends EventEmitter {
publish(event: unknown): void // Queue any event
publishActivity(activity: ActivityEnvelope): void // Queue structured activity
publishAuditLog(options: PublishAuditLogOptions): void // Queue validated audit log
flush(): Promise<void> // Wait for queue drain
close(): Promise<void> // Graceful shutdown
queueSize(): number // Monitor depth
getState(): 'running' | 'closed' // Check client state
}
// Worker with retry
class Worker {
private async sendWithRetry(event: unknown, attempt = 0): Promise<void> {
const delay = initialDelay * (backoffFactor ** attempt);
// 2xx-3xx → success | 429 → rate limit | 5xx,409 → retry | 4xx → drop
}
}
⚠️ CRITICAL: Present plan to user BEFORE implementation
Analyze requirements:
1. Event types → What events? (activity, audit, telemetry)
2. Volume → Events/second expected? (low <10, medium 10-1000, high >1000)
3. Destinations → Where? (single endpoint, multiple, fan-out)
4. Reliability → SLA? (best-effort, guaranteed, exactly-once)
5. Integration → Framework? (Express, NestJS, standalone)
6. JSON examples → Request examples of input events and expected output format
Ask user for JSON examples:
// Use AskUserQuestion tool FIRST to gather examples
questions = [{
question: "Do you have example JSON for the events you want to publish?",
header: "JSON Examples",
options: [
{
label: "Yes - I'll provide examples",
description: "Provide sample input event JSON and expected output format"
},
{
label: "No - Use standard format",
description: "Use Activity Stream or Audit Log standard formats"
},
{
label: "Need help deciding",
description: "Show me the standard formats first"
}
],
multiSelect: false
}]
// IF user provides examples:
// - Analyze JSON structure
// - Identify required fields
// - Determine validation needs
// - Plan TypeScript interfaces and Zod schemas
Create implementation plan:
PLAN:
- Queue size → Based on volume (default 10k, adjust if needed)
- Retry policy → maxAttempts, backoffFactor based on SLA
- Worker pattern → Single queue or multiple workers (high volume)
- Event types → publish() vs publishActivity() vs publishAuditLog()
- Error handling → EventEmitter, callbacks, severity thresholds
- Validation → TypeScript types, runtime validation, Zod schemas
- Testing strategy → Unit, integration, load tests
Present to user with AskUserQuestion:
// Use AskUserQuestion tool to present plan
questions = [{
question: "Review the implementation plan for the activity publisher:",
header: "Implementation Plan",
options: [
{
label: "Approve - Proceed with implementation",
description: `Queue: ${queueSize}, Retry: ${maxAttempts}x, Volume: ${volumeEstimate}`
},
{
label: "Modify - Adjust parameters",
description: "Change queue size, retry policy, or architecture decisions"
},
{
label: "Explain - Need more details",
description: "Provide more context about trade-offs and design decisions"
}
],
multiSelect: false
}]
User approval required → proceed to Phase 2
Only after user approves plan:
export class ActivityClient extends EventEmitter {
private worker: Worker;
private queue: unknown[] = [];
private closed = false;
private readonly maxQueueSize: number;
constructor(endpoint: string, options?: ClientOptions) {
super();
this.maxQueueSize = options?.maxQueueSize ?? 10000;
this.worker = new Worker(endpoint, this.queue, options);
this.worker.start();
}
// Publish any event (generic)
publish(event: unknown): void {
if (this.closed) throw new Error('CLIENT_CLOSED');
if (this.queue.length >= this.maxQueueSize) {
this.emit('queue_full', { queueSize: this.queue.length });
throw new Error('QUEUE_FULL');
}
this.queue.push(event);
}
// Publish structured activity (validated)
publishActivity(activity: ActivityEnvelope): void {
this.publish(activity.toDict());
}
// Publish audit log (validated + formatted)
publishAuditLog(options: PublishAuditLogOptions): void {
// Validate required fields
if (!options.tenant_id) throw new Error('tenant_id required');
if (!options.verb) throw new Error('verb required');
if (!options.actor) throw new Error('actor required');
if (!options.object) throw new Error('object required');
// Validate actor structure
if (options.actor.type === 'user') {
if (!options.actor.email) throw new Error('User actor requires email');
if (!options.actor.tenant_id) throw new Error('User actor requires tenant_id');
}
if (options.actor.type === 'system') {
if (!options.actor.system_type) throw new Error('System actor requires system_type');
}
// Build audit activity with defaults
const auditActivity = {
id: uuidv4(),
timestamp: new Date().toISOString(),
type: 'audit-log',
tenant_id: options.tenant_id,
verb: options.verb,
actor: options.actor,
object: options.object,
plane: options.plane ?? 'control',
region: options.region ?? 'eu',
stamp: options.stamp ?? uuidv4(),
metadata: options.metadata ?? {}
};
this.publish(auditActivity);
}
// Get client state
getState(): 'running' | 'closed' {
return this.closed ? 'closed' : 'running';
}
async flush(): Promise<void> {
while (this.queue.length > 0 && !this.closed) {
await new Promise(resolve => setTimeout(resolve, 100));
}
}
async close(): Promise<void> {
this.closed = true;
await this.flush();
this.worker.stop();
}
}
Publishing Methods:
| Method | Purpose | Validation | Use Case |
|---|---|---|---|
publish(event) | Generic event publishing | None | Flexible, any JSON data |
publishActivity(activity) | Structured activity | Activity envelope | Activity Stream spec compliance |
publishAuditLog(options) | Audit logging | Full validation + formatting | Compliance, audit trails |
Principles:
publishActivity() → wrapper around publish() for typed activitiespublishAuditLog() → validates + formats before queueinggetState() → check if client is operationalLogic: Process queue → Send with retry → Handle status → Backoff/drop
| Status | Action | Retry? |
|---|---|---|
| 2xx-3xx | Success | No |
| 429 | Respect Retry-After → backoff | Yes |
| 409, 5xx | Exponential backoff | Yes |
| Other 4xx | Drop event | No |
Retry: delay = initialDelay * (backoffFactor ** attempt) (default: 1s * 2^attempt)
See advanced.md for complete Worker implementation, AbortController patterns, and retry configuration.
| ErrorType | Severity | Cause | Action |
|---|---|---|---|
| network_error | high | Connection failed | Retry with backoff |
| http_error | medium | HTTP status ≥ 400 | Classify → retry or drop |
| timeout_error | medium | Request timeout | Retry with backoff |
| rate_limited | low | 429 response | Respect Retry-After |
| retry_exhausted | high | Max attempts reached | Drop event |
| validation_error | critical | Invalid event data | Fix event structure |
| queue_full | critical | Queue at capacity | Increase maxQueueSize |
| client_closed | medium | Client shutdown | Complete shutdown |
interface ErrorContext {
errorType: ErrorType;
severity: 'low' | 'medium' | 'high' | 'critical';
message: string;
originalError?: Error;
timestamp: string;
attempt: number;
maxAttempts: number;
httpStatusCode?: number;
event?: unknown; // Optional, configurable
}
interface ErrorHandling {
onError?: (context: ErrorContext) => void;
emitEvents?: boolean; // Default: true
minSeverity?: Severity; // Default: 'low'
includeEventData?: boolean; // Default: true
callbackTimeout?: number; // ms, default: 1000
}
// Callback with timeout protection
Promise.race([
errorHandling.onError(context),
timeout(callbackTimeout)
]).catch(err => console.error('Callback failed:', err));
enum SDK_EVENTS {
ERROR = 'error',
RETRY = 'retry',
SUCCESS = 'success',
RATE_LIMITED = 'rate_limited',
QUEUE_FULL = 'queue_full',
CLIENT_CLOSED = 'client_closed'
}
// Usage
client.on(SDK_EVENTS.ERROR, (context: ErrorContext) => {
console.error(`[${context.severity}] ${context.errorType}:`, context.message);
});
client.on(SDK_EVENTS.RETRY, (context) => {
console.log(`Retry ${context.attempt}/${context.maxAttempts}`);
});
Pattern: Actor → Action → Object (→ Target)
Three publishing methods:
| Method | Data Type | Validation |
|---|---|---|
| publish() | Any JSON | None |
| publishActivity() | ActivityEnvelope | Structure validated |
| publishAuditLog() | AuditLogOptions | Full validation + formatting |
Validation rules:
See advanced.md for:
See examples.md for usage examples and builder patterns.
| Test | Assertion |
|---|---|
| Non-blocking | Date.now() - start < 10ms |
| Queue full | expect(() => publish()).toThrow('QUEUE_FULL') |
| 2xx success | successSpy.toHaveBeenCalledTimes(1) |
| 429 rate limit | rateLimitedSpy.toHaveBeenCalledWith({ retryAfter: 2 }) |
| 5xx retry | retrySpy.toHaveBeenCalledTimes(2) |
| 4xx drop | errorSpy.toHaveBeenCalledWith({ httpStatusCode: 400 }) |
See examples.md for complete test implementations with mock servers.
VALIDATE:
- [ ] publish() non-blocking? (immediate return)
- [ ] Bounded queue? (maxQueueSize enforced)
- [ ] Queue full → throw error?
- [ ] Closed client → throw error?
- [ ] Worker separate from publish()?
- [ ] Exponential backoff implemented?
- [ ] HTTP status classified correctly?
- [ ] 2xx-3xx → success
- [ ] 429 → rate limit + Retry-After
- [ ] 5xx, 409 → retry
- [ ] 4xx (not 429, 409) → drop
- [ ] Rate limiting respects Retry-After?
- [ ] 4xx permanent failures drop immediately?
- [ ] Error callbacks timeout-protected?
- [ ] AbortController for graceful shutdown?
- [ ] flush() waits for queue drain?
- [ ] close() calls flush() before shutdown?
- [ ] EventEmitter for monitoring?
- [ ] Builder pattern fluent (returns this)?
- [ ] Validation for required fields?
See advanced.md for:
See examples.md for:
| Phase | Action |
|---|---|
| 1. SCOPE | Extract context: files, mode (informative | executive), sessionId |
| 2. VERIFY | Run checklist against implementation |
| 3. VIOLATIONS | Collect with file:line, check name, severity |
| 4. REPORT | ✓ Pass → Proceed | ✗ Fail → Return violations |
| 5. METRICS | Call mcp__agent-orchestrator__store-skill-metrics |
| 6. OUTPUT | Return JSON: violations[], fixRate, finalViolations |
Metrics Structure:
{
sessionId: string;
skill: "typescript-services:ts-activity-publisher-recipe";
initialViolations: number;
iterations: number;
fixesApplied: number;
finalViolations: number;
mode: "informative" | "executive";
duration: number;
}
Output Format:
{
status: "success" | "failed";
violations: [
{
file: "src/activity-client.ts";
line: 42;
check: "Non-blocking design";
violation: "publish() awaits queue operation";
severity: "critical";
}
];
metrics: {
initialViolations: 2;
finalViolations: 0;
fixRate: 1.0;
};
}