From wpf-dev-pack
Provides .NET code examples for Pub-Sub patterns using System.Threading.Channels and System.Reactive for event-driven, reactive architectures.
npx claudepluginhub christian289/dotnet-with-claudecode --plugin wpf-dev-packThis skill uses the workspace's default tool permissions.
A guide for Pub-Sub patterns for event-based asynchronous communication.
Generates design tokens/docs from CSS/Tailwind/styled-components codebases, audits visual consistency across 10 dimensions, detects AI slop in UI.
Records polished WebM UI demo videos of web apps using Playwright with cursor overlay, natural pacing, and three-phase scripting. Activates for demo, walkthrough, screen recording, or tutorial requests.
Delivers idiomatic Kotlin patterns for null safety, immutability, sealed classes, coroutines, Flows, extensions, DSL builders, and Gradle DSL. Use when writing, reviewing, refactoring, or designing Kotlin code.
A guide for Pub-Sub patterns for event-based asynchronous communication.
Quick Reference: See QUICKREF.md for essential patterns at a glance.
| API | Purpose | NuGet |
|---|---|---|
System.Reactive (Rx.NET) | Reactive event streams | System.Reactive |
System.Threading.Channels | Async Producer-Consumer | BCL |
IObservable<T> | Observable sequence | BCL |
using System.Threading.Channels;
public sealed class MessageProcessor
{
private readonly Channel<Message> _channel =
Channel.CreateUnbounded<Message>();
// Producer - Send message
public async Task SendAsync(Message message)
{
await _channel.Writer.WriteAsync(message);
}
// Consumer - Process message
public async Task ProcessAsync(CancellationToken ct)
{
await foreach (var message in _channel.Reader.ReadAllAsync(ct))
{
await HandleMessage(message);
}
}
// Channel completion signal
public void Complete() => _channel.Writer.Complete();
}
// Backpressure control with buffer size limit
var options = new BoundedChannelOptions(capacity: 100)
{
FullMode = BoundedChannelFullMode.Wait, // Wait when full
SingleReader = true,
SingleWriter = false
};
var channel = Channel.CreateBounded<Message>(options);
// Writer waits until space is available
await channel.Writer.WriteAsync(message);
public sealed class WorkerPool
{
private readonly Channel<WorkItem> _channel;
private readonly int _workerCount;
public WorkerPool(int workerCount = 4)
{
_workerCount = workerCount;
_channel = Channel.CreateUnbounded<WorkItem>();
}
public async Task StartAsync(CancellationToken ct)
{
var workers = Enumerable.Range(0, _workerCount)
.Select(_ => ProcessAsync(ct));
await Task.WhenAll(workers);
}
private async Task ProcessAsync(CancellationToken ct)
{
await foreach (var item in _channel.Reader.ReadAllAsync(ct))
{
await ProcessItem(item);
}
}
public ValueTask EnqueueAsync(WorkItem item) =>
_channel.Writer.WriteAsync(item);
}
using System.Reactive.Linq;
using System.Reactive.Subjects;
public sealed class EventAggregator : IDisposable
{
private readonly Subject<object> _subject = new();
// Subscribe to specific event type
public IObservable<T> GetEvent<T>() =>
_subject.OfType<T>().AsObservable();
// Publish event
public void Publish<T>(T @event) =>
_subject.OnNext(@event!);
public void Dispose() => _subject.Dispose();
}
// Event definitions
public record UserLoggedIn(string UserId);
public record OrderPlaced(int OrderId);
// Subscription
var aggregator = new EventAggregator();
aggregator.GetEvent<UserLoggedIn>()
.Subscribe(e => Console.WriteLine($"User logged in: {e.UserId}"));
aggregator.GetEvent<OrderPlaced>()
.Where(e => e.OrderId > 100)
.Subscribe(e => Console.WriteLine($"Large order: {e.OrderId}"));
// Publish
aggregator.Publish(new UserLoggedIn("user123"));
aggregator.Publish(new OrderPlaced(150));
// Debounce - Process only the last event in a sequence
searchInput
.Throttle(TimeSpan.FromMilliseconds(300))
.DistinctUntilChanged()
.Subscribe(query => Search(query));
// Buffer - Collect events for a period and process as batch
events
.Buffer(TimeSpan.FromSeconds(5))
.Subscribe(batch => ProcessBatch(batch));
// Retry - Retry on failure
observable
.Retry(3)
.Subscribe(
onNext: data => Process(data),
onError: ex => LogError(ex)
);
| Feature | Channels | Rx.NET |
|---|---|---|
| Purpose | Producer-Consumer | Event streams |
| Backpressure | Built-in (Bounded) | Separate implementation |
| Operators | Basic | Rich |
| Learning curve | Low | High |
| Dependency | BCL | NuGet |
// Program.cs
services.AddSingleton(Channel.CreateUnbounded<Message>());
services.AddSingleton(sp => sp.GetRequiredService<Channel<Message>>().Reader);
services.AddSingleton(sp => sp.GetRequiredService<Channel<Message>>().Writer);
// Producer
public sealed class Producer(ChannelWriter<Message> writer)
{
public ValueTask SendAsync(Message msg) => writer.WriteAsync(msg);
}
// Consumer
public sealed class Consumer(ChannelReader<Message> reader)
{
public async Task ProcessAsync(CancellationToken ct)
{
await foreach (var msg in reader.ReadAllAsync(ct))
{
await Handle(msg);
}
}
}
<ItemGroup>
<PackageReference Include="System.Reactive" Version="6.0.*" />
</ItemGroup>
// Subscription disposal is required
var subscription = observable.Subscribe(handler);
// After use
subscription.Dispose();
// Prevent memory explosion with Bounded Channel
var channel = Channel.CreateBounded<Message>(new BoundedChannelOptions(1000)
{
FullMode = BoundedChannelFullMode.DropOldest // Drop old messages
});