From harness-claude
Defines and evolves event schemas using Zod/JSON Schema, Avro, or Protobuf for Kafka, multi-service event exchange, and event-sourced systems with versioning.
npx claudepluginhub intense-visions/harness-engineering --plugin harness-claudeThis skill uses the workspace's default tool permissions.
> Define and evolve event schemas using a schema registry with Avro, Protobuf, or JSON Schema.
Maps event flows, designs topic topologies, schemas, and delivery guarantees for event-driven architectures using Kafka, RabbitMQ, Redis Streams, NATS, SQS. Supports event sourcing, CQRS, sagas.
Designs event-sourced systems with event store, projections, versioning patterns. Guides when to use for audits, complex domains, collaboration, event-driven architectures.
Guides event-driven architecture: assesses infrastructure, selects brokers (Kafka, RabbitMQ, SQS, NATS), designs schemas with versioning, configures DLQ/retries, implements idempotency.
Share bugs, ideas, or general feedback.
Define and evolve event schemas using a schema registry with Avro, Protobuf, or JSON Schema.
JSON Schema + Zod (TypeScript-native, no registry needed for small teams):
import { z } from 'zod';
// Define schemas with Zod — generates TypeScript types automatically
const OrderCreatedEventSchema = z.object({
eventId: z.string().uuid(),
eventType: z.literal('ORDER_CREATED'),
schemaVersion: z.literal(1),
aggregateId: z.string(),
aggregateType: z.literal('Order'),
occurredAt: z.string().datetime(),
payload: z.object({
orderId: z.string().uuid(),
userId: z.string().uuid(),
items: z.array(
z.object({
productId: z.string(),
quantity: z.number().int().positive(),
unitPrice: z.number().positive(),
})
),
totalAmount: z.number().positive(),
currency: z.string().length(3), // ISO 4217
}),
});
type OrderCreatedEvent = z.infer<typeof OrderCreatedEventSchema>;
// Envelope schema for all events
const EventEnvelopeSchema = z.object({
eventId: z.string().uuid(),
eventType: z.string(),
schemaVersion: z.number().int().positive(),
aggregateId: z.string(),
occurredAt: z.string().datetime(),
correlationId: z.string().uuid().optional(),
causationId: z.string().uuid().optional(),
});
// Validate incoming events
function parseOrderEvent(raw: unknown): OrderCreatedEvent {
return OrderCreatedEventSchema.parse(raw); // throws ZodError if invalid
}
// Create events with correct structure
function createOrderEvent(order: Order, correlationId?: string): OrderCreatedEvent {
return {
eventId: crypto.randomUUID(),
eventType: 'ORDER_CREATED',
schemaVersion: 1,
aggregateId: order.id,
aggregateType: 'Order',
occurredAt: new Date().toISOString(),
payload: {
orderId: order.id,
userId: order.userId,
items: order.items,
totalAmount: order.total,
currency: 'USD',
},
};
}
Schema versioning strategy:
// Version 1
const UserCreatedV1 = z.object({
schemaVersion: z.literal(1),
userId: z.string(),
email: z.string().email(),
});
// Version 2 — backward compatible (added optional field)
const UserCreatedV2 = z.object({
schemaVersion: z.literal(2),
userId: z.string(),
email: z.string().email(),
name: z.string().optional(), // new optional field — safe addition
});
// Discriminated union for versioned parsing
const UserCreatedEvent = z.discriminatedUnion('schemaVersion', [UserCreatedV1, UserCreatedV2]);
function parseUserCreated(raw: unknown) {
const event = UserCreatedEvent.parse(raw);
// Normalize to latest version
if (event.schemaVersion === 1) {
return { ...event, schemaVersion: 2 as const, name: undefined };
}
return event;
}
Event registry for type-safe dispatch:
// Central event registry
type EventRegistry = {
'order.created': OrderCreatedEvent;
'order.shipped': OrderShippedEvent;
'user.created': UserCreatedEvent;
'payment.failed': PaymentFailedEvent;
};
type EventType = keyof EventRegistry;
// Schema map for validation
const eventSchemas: { [K in EventType]: z.ZodType<EventRegistry[K]> } = {
'order.created': OrderCreatedEventSchema,
'order.shipped': OrderShippedEventSchema,
'user.created': UserCreatedEventSchema,
'payment.failed': PaymentFailedEventSchema,
};
function validateEvent<T extends EventType>(type: T, raw: unknown): EventRegistry[T] {
return eventSchemas[type].parse(raw) as EventRegistry[T];
}
Schema compatibility rules (Avro / Confluent conventions apply to JSON Schema too):
| Change | Backward Compatible | Forward Compatible |
|---|---|---|
| Add optional field | Yes | Yes |
| Add required field | No | Yes |
| Remove optional field | Yes | No |
| Remove required field | No | No |
| Rename field | No | No (use aliases) |
| Change field type | No | No |
Safe evolution pattern: Never remove or rename fields. Add fields as optional. Bump schemaVersion. Keep old schemas in the registry for producers still on v1.
Confluent Schema Registry (for Kafka): Stores Avro/Protobuf/JSON schemas. Producers register schemas; consumers validate against them. Enforces compatibility rules at publish time — prevents breaking changes from reaching consumers.
Anti-patterns:
any or untyped JSON as event payloads — schema drift becomes undetectableEvent ID for idempotency: Always include eventId (UUID). Consumers use it to deduplicate redelivered events.
microservices.io/patterns/data/event-sourcing.html