From dotnet-skills
Building event-driven systems. Pub/sub, competing consumers, DLQ, sagas, delivery guarantees.
npx claudepluginhub wshaddix/dotnet-skillsThis skill uses the workspace's default tool permissions.
Searches, retrieves, and installs Agent Skills from prompts.chat registry using MCP tools like search_skills and get_skill. Activates for finding skills, browsing catalogs, or extending Claude.
Searches prompts.chat for AI prompt templates by keyword or category, retrieves by ID with variable handling, and improves prompts via AI. Use for discovering or enhancing prompts.
Captures architectural decisions in Claude Code sessions as structured ADRs. Auto-detects choices between alternatives and maintains a docs/adr log for codebase rationale.
Durable messaging patterns for .NET event-driven architectures. Covers publish/subscribe, competing consumers, dead-letter queues, saga/process manager orchestration, and delivery guarantee strategies using Azure Service Bus, RabbitMQ, and MassTransit.
Out of scope: Background service lifecycle and IHostedService registration -- see [skill:dotnet-background-services]. Resilience pipelines and retry policies -- see [skill:dotnet-resilience]. JSON/binary serialization configuration -- see [skill:dotnet-serialization]. In-process producer/consumer queues with Channel<T> -- see [skill:dotnet-channels].
Cross-references: [skill:dotnet-background-services] for hosting message consumers, [skill:dotnet-resilience] for fault tolerance around message handlers, [skill:dotnet-serialization] for message envelope serialization, [skill:dotnet-channels] for in-process queuing patterns.
| Type | Purpose | Example |
|---|---|---|
| Command | Request an action (one recipient) | PlaceOrder, ShipPackage |
| Event | Notify something happened (many recipients) | OrderPlaced, PaymentReceived |
| Document | Transfer data between systems | CustomerProfile, ProductCatalog |
Commands are sent to a specific queue; events are published to a topic/exchange and delivered to all subscribers. This distinction drives the choice between point-to-point and pub/sub topologies.
| Guarantee | Behavior | Implementation |
|---|---|---|
| At-most-once | Fire and forget; message may be lost | No ack, no retry |
| At-least-once | Message retried until acknowledged; duplicates possible | Ack after processing + retry on failure |
| Exactly-once | Each message processed exactly once | At-least-once + idempotent consumer |
At-least-once with idempotent consumers is the standard approach for durable messaging. True exactly-once requires distributed transactions (which most brokers do not support) or consumer-side deduplication.
// Publisher -- send event to a topic
await using var client = new ServiceBusClient(connectionString);
await using var sender = client.CreateSender("order-events");
var message = new ServiceBusMessage(
JsonSerializer.SerializeToUtf8Bytes(new OrderPlaced(orderId, total)))
{
Subject = nameof(OrderPlaced),
ContentType = "application/json",
MessageId = Guid.NewGuid().ToString()
};
await sender.SendMessageAsync(message, cancellationToken);
// Subscriber -- process events from a subscription
await using var processor = client.CreateProcessor(
topicName: "order-events",
subscriptionName: "billing-service",
new ServiceBusProcessorOptions
{
MaxConcurrentCalls = 10,
AutoCompleteMessages = false
});
processor.ProcessMessageAsync += async args =>
{
var body = args.Message.Body.ToObjectFromJson<OrderPlaced>();
await HandleOrderPlacedAsync(body);
await args.CompleteMessageAsync(args.Message);
};
processor.ProcessErrorAsync += args =>
{
logger.LogError(args.Exception, "Error processing message");
return Task.CompletedTask;
};
await processor.StartProcessingAsync(cancellationToken);
Key packages:
<PackageReference Include="Azure.Messaging.ServiceBus" Version="7.*" />
// Publisher -- declare exchange and publish
var factory = new ConnectionFactory { HostName = "localhost" };
await using var connection = await factory.CreateConnectionAsync();
await using var channel = await connection.CreateChannelAsync();
await channel.ExchangeDeclareAsync(
exchange: "order-events",
type: ExchangeType.Fanout,
durable: true);
var body = JsonSerializer.SerializeToUtf8Bytes(
new OrderPlaced(orderId, total));
await channel.BasicPublishAsync(
exchange: "order-events",
routingKey: string.Empty,
body: body);
Key packages:
<PackageReference Include="RabbitMQ.Client" Version="7.*" />
MassTransit abstracts the broker, providing a unified API for Azure Service Bus, RabbitMQ, Amazon SQS, and in-memory transport.
// Registration
builder.Services.AddMassTransit(x =>
{
x.AddConsumer<OrderPlacedConsumer>();
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host("localhost", "/", h =>
{
h.Username("guest");
h.Password("guest");
});
cfg.ConfigureEndpoints(context);
});
});
// Publisher
public sealed class OrderService(IPublishEndpoint publishEndpoint)
{
public async Task PlaceOrderAsync(
Guid orderId, decimal total, CancellationToken ct)
{
// Process order...
await publishEndpoint.Publish(
new OrderPlaced(orderId, total), ct);
}
}
// Consumer
public sealed class OrderPlacedConsumer(
ILogger<OrderPlacedConsumer> logger)
: IConsumer<OrderPlaced>
{
public async Task Consume(ConsumeContext<OrderPlaced> context)
{
logger.LogInformation(
"Processing order {OrderId}", context.Message.OrderId);
await ProcessAsync(context.Message);
}
}
// Message contract (use records in a shared contracts assembly)
public record OrderPlaced(Guid OrderId, decimal Total);
Key packages:
<PackageReference Include="MassTransit" Version="8.*" />
<!-- Pick ONE transport: -->
<PackageReference Include="MassTransit.RabbitMQ" Version="8.*" />
<!-- OR -->
<PackageReference Include="MassTransit.Azure.ServiceBus.Core" Version="8.*" />
Multiple consumer instances process messages from the same queue in parallel. The broker delivers each message to exactly one consumer, distributing load across instances.
Queue: order-processing
├── Consumer Instance A (picks message 1)
├── Consumer Instance B (picks message 2)
└── Consumer Instance C (picks message 3)
// Multiple instances reading from the same queue automatically compete.
// MaxConcurrentCalls controls per-instance parallelism.
var processor = client.CreateProcessor("order-processing",
new ServiceBusProcessorOptions
{
MaxConcurrentCalls = 20,
PrefetchCount = 50,
AutoCompleteMessages = false
});
x.AddConsumer<OrderProcessor>(cfg =>
{
cfg.UseConcurrentMessageLimit(10);
});
Competing consumers sacrifice strict ordering for throughput. When order matters:
RequiresSession = true) to guarantee FIFO within a session ID (e.g., per customer)UseMessagePartitioner for key-based orderingDead-letter queues (DLQs) capture messages that cannot be processed after exhausting retries. They prevent poison messages from blocking the main queue.
| Reason | Trigger |
|---|---|
| Max delivery attempts exceeded | Message failed processing N times |
| TTL expired | Message sat in queue past its time-to-live |
| Consumer rejection | Consumer explicitly dead-letters the message |
| Queue length exceeded | Queue overflow policy routes to DLQ |
// Dead-letter a message with reason
await args.DeadLetterMessageAsync(
args.Message,
deadLetterReason: "ValidationFailed",
deadLetterErrorDescription: "Missing required field: CustomerId");
// Read from the dead-letter sub-queue
await using var dlqReceiver = client.CreateReceiver(
"order-processing",
new ServiceBusReceiverOptions
{
SubQueue = SubQueue.DeadLetter
});
while (true)
{
var message = await dlqReceiver.ReceiveMessageAsync(
TimeSpan.FromSeconds(5), cancellationToken);
if (message is null) break;
logger.LogWarning(
"DLQ message: {Reason} - {Description}",
message.DeadLetterReason,
message.DeadLetterErrorDescription);
// Inspect, fix, and re-submit or discard
await dlqReceiver.CompleteMessageAsync(message);
}
MassTransit automatically creates _error and _skipped queues. Failed messages after retry exhaustion move to the error queue with fault metadata.
// Configure retry before dead-lettering
x.AddConsumer<OrderProcessor>(cfg =>
{
cfg.UseMessageRetry(r => r.Intervals(
TimeSpan.FromSeconds(1),
TimeSpan.FromSeconds(5),
TimeSpan.FromSeconds(15)));
});
Always monitor DLQ depth with alerts. Unmonitored DLQs accumulate silently until data is lost or stale.
Sagas coordinate multi-step business processes across services. Each step publishes events that trigger the next step, with compensation logic for failures.
| Style | How it works | Use when |
|---|---|---|
| Choreography | Services react to events independently; no central coordinator | Simple flows, few steps, loosely coupled |
| Orchestration | A saga/process manager directs each step | Complex flows, compensation needed, visibility required |
// Saga state
public class OrderState : SagaStateMachineInstance
{
public Guid CorrelationId { get; set; }
public string CurrentState { get; set; } = default!;
public Guid OrderId { get; set; }
public decimal Total { get; set; }
public DateTime? PaymentReceivedAt { get; set; }
}
// State machine definition
public sealed class OrderStateMachine : MassTransitStateMachine<OrderState>
{
public State Submitted { get; private set; } = default!;
public State PaymentPending { get; private set; } = default!;
public State Completed { get; private set; } = default!;
public State Faulted { get; private set; } = default!;
public Event<OrderSubmitted> OrderSubmitted { get; private set; } = default!;
public Event<PaymentReceived> PaymentReceived { get; private set; } = default!;
public Event<PaymentFailed> PaymentFailed { get; private set; } = default!;
public OrderStateMachine()
{
InstanceState(x => x.CurrentState);
Event(() => OrderSubmitted,
x => x.CorrelateById(ctx => ctx.Message.OrderId));
Event(() => PaymentReceived,
x => x.CorrelateById(ctx => ctx.Message.OrderId));
Event(() => PaymentFailed,
x => x.CorrelateById(ctx => ctx.Message.OrderId));
Initially(
When(OrderSubmitted)
.Then(ctx =>
{
ctx.Saga.OrderId = ctx.Message.OrderId;
ctx.Saga.Total = ctx.Message.Total;
})
.Publish(ctx => new RequestPayment(
ctx.Saga.OrderId, ctx.Saga.Total))
.TransitionTo(PaymentPending));
During(PaymentPending,
When(PaymentReceived)
.Then(ctx =>
ctx.Saga.PaymentReceivedAt = DateTime.UtcNow)
.Publish(ctx => new FulfillOrder(ctx.Saga.OrderId))
.TransitionTo(Completed),
When(PaymentFailed)
.Publish(ctx => new CancelOrder(ctx.Saga.OrderId))
.TransitionTo(Faulted));
}
}
// Registration -- requires MassTransit.EntityFrameworkCore package for EF persistence
// NuGet: MassTransit.EntityFrameworkCore Version="8.*"
builder.Services.AddMassTransit(x =>
{
x.AddSagaStateMachine<OrderStateMachine, OrderState>()
.EntityFrameworkRepository(r =>
{
r.ExistingDbContext<SagaDbContext>();
r.UsePostgres();
});
x.UsingRabbitMq((context, cfg) =>
{
cfg.ConfigureEndpoints(context);
});
});
| Store | Package | Use when |
|---|---|---|
| Entity Framework Core | MassTransit.EntityFrameworkCore | Already using EF Core; need transactions |
| MongoDB | MassTransit.MongoDb | Document-oriented state; high throughput |
| Redis | MassTransit.Redis | Ephemeral sagas; low latency |
| In-Memory | Built-in | Testing only -- state lost on restart |
When a saga step fails, publish compensating commands to undo prior steps:
OrderSubmitted -> RequestPayment -> PaymentReceived -> ReserveInventory
|
InventoryFailed
|
RefundPayment (compensation)
|
CancelOrder (compensation)
At-least-once delivery means consumers may receive the same message multiple times. Idempotent consumers ensure repeated processing produces the same result.
public sealed class IdempotentOrderConsumer(
AppDbContext db,
ILogger<IdempotentOrderConsumer> logger)
: IConsumer<OrderPlaced>
{
public async Task Consume(ConsumeContext<OrderPlaced> context)
{
var messageId = context.MessageId
?? throw new InvalidOperationException("Missing MessageId");
// Check if already processed
var exists = await db.ProcessedMessages
.AnyAsync(m => m.MessageId == messageId);
if (exists)
{
logger.LogInformation(
"Duplicate message {MessageId}, skipping", messageId);
return;
}
// Process the message
await ProcessOrderAsync(context.Message);
// Record as processed
db.ProcessedMessages.Add(new ProcessedMessage
{
MessageId = messageId,
ProcessedAt = DateTime.UtcNow,
ConsumerType = nameof(IdempotentOrderConsumer)
});
await db.SaveChangesAsync();
}
}
Prefer operations that are naturally idempotent:
INSERT ... ON CONFLICT UPDATE) instead of blind insertsUPDATE ... WHERE Status = 'Pending') instead of unconditionalWrap message payloads in a standard envelope with metadata for tracing, versioning, and routing.
public sealed record MessageEnvelope<T>(
string MessageId,
string MessageType,
DateTimeOffset Timestamp,
string CorrelationId,
string Source,
int Version, // Schema version for backward-compatible deserialization
T Payload);
MassTransit provides this automatically via ConsumeContext (MessageId, CorrelationId, Headers). When using raw broker clients, implement envelopes explicitly.
AutoCompleteMessages = false and call CompleteMessageAsync after successful processing. Auto-complete acknowledges before processing finishes, risking data loss on failure..env files excluded from source control.