From harness-claude
Implements event sourcing in TypeScript for microservices: store state as immutable event sequences with aggregates, projections, snapshots, replay, audit trails, and CQRS support.
npx claudepluginhub intense-visions/harness-engineering --plugin harness-claudeThis skill uses the workspace's default tool permissions.
> Store state as an immutable sequence of events with projections, snapshots, and replay.
Implements event sourcing and CQRS patterns using event stores, aggregates, and projections for audit trails, temporal queries, and full history systems.
Designs event-sourced systems with event store, projections, versioning patterns. Guides when to use for audits, complex domains, collaboration, event-driven architectures.
Designs event-sourced systems with CQRS, event stores, projections, sagas, snapshotting, and eventual consistency. Use for audit trails, temporal queries, and complex domain modeling.
Share bugs, ideas, or general feedback.
Store state as an immutable sequence of events with projections, snapshots, and replay.
Core types:
// Base domain event
interface DomainEvent {
readonly eventId: string;
readonly eventType: string;
readonly aggregateId: string;
readonly aggregateVersion: number;
readonly occurredAt: string; // ISO 8601
}
// Aggregate root base class
abstract class EventSourcedAggregate {
private _version = 0;
private _uncommittedEvents: DomainEvent[] = [];
get version(): number {
return this._version;
}
protected apply(event: DomainEvent): void {
this.when(event); // update state
this._version++;
this._uncommittedEvents.push(event);
}
// Each subclass implements: dispatch to specific state-update methods
protected abstract when(event: DomainEvent): void;
// Reconstruct from stored events (no side effects)
rehydrate(events: DomainEvent[]): void {
for (const event of events) {
this.when(event);
this._version++;
}
}
pullUncommittedEvents(): DomainEvent[] {
const events = [...this._uncommittedEvents];
this._uncommittedEvents = [];
return events;
}
}
Order aggregate:
// Events
interface OrderCreated extends DomainEvent {
eventType: 'order.created';
userId: string;
items: { productId: string; quantity: number; unitPrice: number }[];
}
interface OrderPaid extends DomainEvent {
eventType: 'order.paid';
chargeId: string;
amount: number;
}
interface OrderShipped extends DomainEvent {
eventType: 'order.shipped';
trackingNumber: string;
carrier: string;
}
interface OrderCancelled extends DomainEvent {
eventType: 'order.cancelled';
reason: string;
}
type OrderEvent = OrderCreated | OrderPaid | OrderShipped | OrderCancelled;
// Aggregate
class Order extends EventSourcedAggregate {
private status: 'pending' | 'paid' | 'shipped' | 'cancelled' = 'pending';
private userId = '';
private items: OrderEvent extends OrderCreated ? OrderCreated['items'] : never = [];
id = '';
static create(id: string, userId: string, items: OrderCreated['items']): Order {
const order = new Order();
order.id = id;
order.apply({
eventId: crypto.randomUUID(),
eventType: 'order.created',
aggregateId: id,
aggregateVersion: 0,
occurredAt: new Date().toISOString(),
userId,
items,
} as OrderCreated);
return order;
}
pay(chargeId: string, amount: number): void {
if (this.status !== 'pending') throw new Error('Order is not pending');
this.apply({
eventId: crypto.randomUUID(),
eventType: 'order.paid',
aggregateId: this.id,
aggregateVersion: this.version,
occurredAt: new Date().toISOString(),
chargeId,
amount,
} as OrderPaid);
}
ship(trackingNumber: string, carrier: string): void {
if (this.status !== 'paid') throw new Error('Order must be paid before shipping');
this.apply({
eventId: crypto.randomUUID(),
eventType: 'order.shipped',
aggregateId: this.id,
aggregateVersion: this.version,
occurredAt: new Date().toISOString(),
trackingNumber,
carrier,
} as OrderShipped);
}
protected when(event: DomainEvent): void {
const e = event as OrderEvent;
switch (e.eventType) {
case 'order.created':
this.id = e.aggregateId;
this.userId = e.userId;
this.status = 'pending';
break;
case 'order.paid':
this.status = 'paid';
break;
case 'order.shipped':
this.status = 'shipped';
break;
case 'order.cancelled':
this.status = 'cancelled';
break;
}
}
}
Event store:
class PostgresEventStore {
async append(events: DomainEvent[], expectedVersion: number): Promise<void> {
await this.db.$transaction(async (tx) => {
// Optimistic concurrency check
const current = await tx.eventStore.count({
where: { aggregateId: events[0].aggregateId },
});
if (current !== expectedVersion) {
throw new ConcurrencyError(`Expected version ${expectedVersion}, got ${current}`);
}
await tx.eventStore.createMany({
data: events.map((e, i) => ({
eventId: e.eventId,
eventType: e.eventType,
aggregateId: e.aggregateId,
aggregateVersion: expectedVersion + i,
occurredAt: new Date(e.occurredAt),
payload: e,
})),
});
});
}
async load(aggregateId: string, fromVersion = 0): Promise<DomainEvent[]> {
const records = await this.db.eventStore.findMany({
where: { aggregateId, aggregateVersion: { gte: fromVersion } },
orderBy: { aggregateVersion: 'asc' },
});
return records.map((r) => r.payload as DomainEvent);
}
}
// Repository
class OrderRepository {
async save(order: Order): Promise<void> {
const events = order.pullUncommittedEvents();
if (events.length === 0) return;
const expectedVersion = order.version - events.length;
await this.eventStore.append(events, expectedVersion);
}
async findById(orderId: string): Promise<Order> {
const events = await this.eventStore.load(orderId);
if (events.length === 0) throw new Error(`Order ${orderId} not found`);
const order = new Order();
order.id = orderId;
order.rehydrate(events);
return order;
}
}
Projections: Read-side views built by replaying events. They can be rebuilt at any time from the event store.
// Projection: build a simple read model
class OrderSummaryProjection {
private summaries = new Map<string, { status: string; total: number }>();
handle(event: DomainEvent): void {
const e = event as OrderEvent;
switch (e.eventType) {
case 'order.created':
this.summaries.set(e.aggregateId, {
status: 'pending',
total: e.items.reduce((sum, i) => sum + i.unitPrice * i.quantity, 0),
});
break;
case 'order.paid':
this.summaries.get(e.aggregateId)!.status = 'paid';
break;
}
}
}
Snapshots: After N events, persist a snapshot so rehydration doesn't replay from the beginning.
Anti-patterns:
microservices.io/patterns/data/event-sourcing.html