From dt-brigid
System.Reactive and DynamicData patterns for .NET 10 game clients. Covers Subject, BehaviorSubject, ValueSubject, IObservable operators, SourceCache, collection binding, and Godot node lifecycle integration. Load when working with reactive pipelines, ViewModel bindings, SourceCache, DynamicData, IObservable, Subject, Subscribe, Bind, or INotifyCollectionChanged in C# 14.
npx claudepluginhub dreamteam-hq/brigid --plugin dt-brigidThis skill uses the workspace's default tool permissions.
CrystalMagica uses MVVM as a backend architecture pattern for its Godot 4.6 game client. This is NOT WPF/MAUI MVVM.
Searches, retrieves, and installs Agent Skills from prompts.chat registry using MCP tools like search_skills and get_skill. Activates for finding skills, browsing catalogs, or extending Claude.
Searches prompts.chat for AI prompt templates by keyword or category, retrieves by ID with variable handling, and improves prompts via AI. Use for discovering or enhancing prompts.
Checks Next.js compilation errors using a running Turbopack dev server after code edits. Fixes actionable issues before reporting complete. Replaces `next build`.
CrystalMagica uses MVVM as a backend architecture pattern for its Godot 4.6 game client. This is NOT WPF/MAUI MVVM.
CrystalMagica library (CrystalMagica.Models). These are wire types like CharacterData, CharacterAction, and MoveBegin used by both client and server.CrystalMagica.Game/ViewModels/. They are client-side only and driven by System.Reactive. MainViewModel, RemoteCharacterVM, LocalPlayerCharacterVM.CrystalMagica.Game/Views/. PlayerNode (base CharacterBody3D), RemotePlayerNode, ItemsNode.Position as Godot.Vector3, Color). It captures local input and sends actions to the server via serverClient.Map.RelayCharacterAction(). No Rx observables — the local view reads its properties directly.Subject<CharacterAction> Updates and derives IObservable<Vector2> Position. The view subscribes to these streams to replicate remote player state.PlayerNode (the shared CharacterBody3D base with physics, gravity, and IInputAction methods) but through different paths: local drives it from input, remote drives it from Rx subscriptions.Three subject types serve distinct roles. Choosing the wrong one produces subtle bugs.
| Type | Has .Value | Replays to Late Subscribers | Use For |
|---|---|---|---|
Subject<T> | No | No | Discrete events: network messages, input actions |
BehaviorSubject<T> | Yes (read-only) | Last value | State that always has a current value |
ValueSubject<T> | Yes (read/write) | Last value | CrystalMagica wrapper over BehaviorSubject<T> with get/set .Value |
Use when there is no meaningful "current value." Character actions are events, not state.
// Declaration
public Subject<CharacterAction> Updates { get; set; } = new();
// Producer pushes events (MainViewModel.HandleMessage)
target.Value.Updates.OnNext(action);
// Consumer subscribes (RemotePlayerNode.Bind)
_ = viewModel.Updates.Subscribe(x =>
{
if (x.Action is CharacterActions.Jump)
Jump();
else if (x.Action is CharacterActions.Stop)
StopMoving();
else if (x is MoveBegin moveBegin)
MoveBegin(moveBegin);
});
Wraps BehaviorSubject<T>. Setting .Value calls OnNext() internally. Implements ISubject<T>, so it is both an observable and an observer.
// CrystalMagica.Reactive.ValueSubject<T>
public class ValueSubject<T> : ISubject<T>
{
private BehaviorSubject<T> subject;
public T Value
{
get => subject.Value;
set => subject.OnNext(value); // set triggers OnNext
}
public ValueSubject(T value = default)
{
subject = new BehaviorSubject<T>(value);
}
// ISubject<T> delegation: Subscribe, OnNext, OnError, OnCompleted
}
Usage in MainViewModel:
public ValueSubject<string> Status { get; set; } = new();
public ValueSubject<LocalPlayerCharacterVM> Player { get; set; } = new();
// Property-style write triggers OnNext
Status.Value = "Connecting...";
// Direct OnNext also works
Status.OnNext("Connected");
// Subscribe to observe changes
_ = viewModel.Player
.Where(x => x is not null)
.Subscribe(Player.Bind);
Decision rule: Use ValueSubject<T> when you need imperative read/write access to current state AND reactive notifications. Use Subject<T> when there is no "current value" concept.
Chain operators to transform events into state. StartWith provides the initial value so subscribers receive a value immediately on subscription.
// RemoteCharacterVM constructor
Position = Updates
.Select(x => x.Position)
.StartWith(data.Position);
The resulting IObservable<Vector2> emits the spawn position first, then every subsequent position from incoming actions.
| Operator | What It Does | When to Use |
|---|---|---|
.Select(x => ...) | Transform each element | Extract a field from a composite event |
.Where(x => ...) | Filter elements | Skip nulls, filter by type |
.StartWith(value) | Emit an initial value before the source | Derived state that needs a starting value |
.Subscribe(onNext) | Terminal — attach an observer | Wire the pipeline to a side effect |
.CombineLatest(other, (a, b) => ...) | Merge latest values from two streams | Derived state from multiple inputs |
.Merge(other) | Interleave two streams into one | Flatten independent event sources |
.DistinctUntilChanged() | Suppress consecutive duplicates | Avoid redundant updates |
.Throttle(TimeSpan) | Rate-limit emissions | Prevent flooding downstream |
Async interop: Observable.FromAsync(() => TaskMethod()) converts a Task<T> to IObservable<T>. await observable.FirstAsync() converts back.
SourceCache<TObject, TKey> is a keyed reactive collection from DynamicData. All mutations flow through the cache; bound collections update automatically.
// Declare cache and bound collection
private SourceCache<RemoteCharacterVM, Guid> RemoteCharacters { get; set; }
= new(x => x.Id);
public ReadOnlyObservableCollection<RemoteCharacterVM> RemoteCharacterList { get; }
// Wire once in constructor
_ = RemoteCharacters
.Connect()
.Bind(out var list)
.Subscribe();
RemoteCharacterList = list;
Connect() returns IObservable<IChangeSet<T, K>>. Bind(out var list) materializes the changeset stream into a ReadOnlyObservableCollection<T>. Subscribe() activates the pipeline.
| Operation | Code | Notes |
|---|---|---|
| Add or update | cache.AddOrUpdate(item) | Upsert by key selector |
| Remove by key | cache.Remove(key) | |
| Lookup by key | cache.Lookup(key) | Returns Optional<T> — check .HasValue |
| Observe changes | cache.Connect() | Returns changeset stream |
Lookup returns Optional<T>, not a nullable. Always check .HasValue:
var target = RemoteCharacters.Lookup(action.CharacterId);
if (!target.HasValue)
{
target = Enemies.Lookup(action.CharacterId);
}
if (target.HasValue)
{
target.Value.Updates.OnNext(action);
}
Use separate SourceCache instances when entities have different lifecycles or routing:
private SourceCache<RemoteCharacterVM, Guid> RemoteCharacters { get; set; }
= new(x => x.Id);
private SourceCache<RemoteCharacterVM, Guid> Enemies { get; set; }
= new(x => x.Id);
// Each gets its own pipeline
_ = RemoteCharacters.Connect().Bind(out var list).Subscribe();
RemoteCharacterList = list;
_ = Enemies.Connect().Bind(out var enemyList).Subscribe();
EnemiesList = enemyList;
Apply DynamicData operators between Connect() and Bind():
// Filter
_ = RemoteCharacters.Connect()
.Filter(x => x.IsAlive)
.Bind(out var alive).Subscribe();
// Sort
_ = RemoteCharacters.Connect()
.Sort(SortExpressionComparer<RemoteCharacterVM>
.Ascending(x => x.Id))
.Bind(out var sorted).Subscribe();
Transform and auto-refresh for property-change-driven re-evaluation:
// Transform -- project to a different type
_ = RemoteCharacters.Connect()
.Transform(vm => new MinimapBlip(vm.Id, vm.Position))
.Bind(out var blips).Subscribe();
// AutoRefresh -- re-evaluate filter when property changes
_ = RemoteCharacters.Connect()
.AutoRefresh(x => x.IsAlive)
.Filter(x => x.IsAlive)
.Bind(out var aliveOnly).Subscribe();
ObservableDictionary<TValue, TKey> wraps SourceCache with the Connect/Bind pattern and forwards INotifyCollectionChanged:
public class ObservableDictionary<TValue, TKey> : INotifyCollectionChanged
{
private SourceCache<TValue, TKey> dictionary { get; set; }
public ReadOnlyObservableCollection<TValue> Values { get; }
public ObservableDictionary(Func<TValue, TKey> keySelector)
{
dictionary = new(keySelector);
_ = dictionary.Connect().Bind(out var values).Subscribe();
Values = values;
(Values as INotifyCollectionChanged).CollectionChanged +=
(s, e) => CollectionChanged?.Invoke(s, e);
}
public TValue this[TKey key]
{
get => dictionary.Lookup(key).Value;
set => dictionary.AddOrUpdate(value);
}
}
Use when you need dictionary-style [] access AND collection-changed notifications.
ItemsNode is a generic Node3D that observes a ViewModel collection and manages child scene instances:
[Export] PackedScene NodeTemplate set in the Godot editor.Items property accepts any INotifyCollectionChanged. Setting it subscribes to CollectionChanged.NodeTemplate, cast to IBindable, call Bind(item), then AddChild(node). Tracks in _nodesByItem dictionary.QueueFree(), remove from dictionary.QueueFree()), clear dictionary, then re-create nodes for all current items.// Main._Ready() -- wire ItemsNodes to ViewModel collections
RemoteCharacters.Items = viewModel.RemoteCharacterList;
Enemies.Items = viewModel.EnemiesList;
ItemsNode.Items uses the C# 14 field keyword for change tracking:
public INotifyCollectionChanged Items
{
get => field;
set
{
field?.CollectionChanged -= HandleCollectionChanged;
field = value;
field.CollectionChanged += HandleCollectionChanged;
}
}
Views implement IBindable to receive their ViewModel from ItemsNode:
public interface IBindable { void Bind(object viewModel); }
A view provides both the untyped overload (for ItemsNode) and a typed overload (for direct wiring). The typed Bind sets up all Rx subscriptions:
// RemotePlayerNode : PlayerNode, IBindable
public void Bind(RemoteCharacterVM viewModel)
{
Mesh.SetSurfaceOverrideMaterial(0,
new StandardMaterial3D { AlbedoColor = viewModel.Color });
_ = viewModel.Position.Subscribe(
x => Position = x.ToGodot3D());
_ = viewModel.Updates.Subscribe(x =>
{
if (x.Action is CharacterActions.Jump) Jump();
else if (x.Action is CharacterActions.Stop) StopMoving();
else if (x is MoveBegin moveBegin) MoveBegin(moveBegin);
});
}
The untyped overload casts and delegates:
public void Bind(object viewModel)
{
if (viewModel is not RemoteCharacterVM typedVM)
throw new Exception("You bound the wrong thing");
Bind(typedVM);
}
public static class BindExtensions
{
public static IDisposable Bind(this Label label,
IObservable<string> observable)
=> observable.Subscribe(value => label.Text = value);
}
// Usage in Main._Ready()
_ = StatusLabel.Bind(viewModel.Status);
This pattern generalizes: write a Bind extension for any Godot property that should track an observable.
All Rx subscriptions fire on the main thread. The architecture guarantees this:
SocketManager.Run() reads WebSocket frames on a background taskChannel<MemoryStream> (thread-safe)MainViewModel.Process() drains the channel inside _Process() (main thread)OnNext() calls happen inside Process() — subscribers always execute on the main threadNo ObserveOn is needed. The channel decouples the background socket from the main-thread Rx pipeline.
The codebase currently uses _ = discard universally — this is safe when subscriptions live for the node's full lifetime.
| Scenario | Strategy |
|---|---|
| Subscription lives for node lifetime | _ = discard — GC-safe when source completes |
| Subscription must be cut early | Store IDisposable, call .Dispose() in _ExitTree() |
| Multiple subscriptions per node | RECOMMENDED: CompositeDisposable — dispose all at once |
| SourceCache pipeline | _ = cache.Connect().Bind(...).Subscribe() — lives for VM lifetime |
RECOMMENDED patterns for explicit disposal (
CompositeDisposable,DisposeWith,_ExitTreeteardown), Godot lifecycle integration, error handling (Catch,Retry), and testing Rx code: see references/advanced-patterns.md.
Creates a new subscription every frame. Subscriptions accumulate, callbacks multiply.
// BAD
public override void _Process(double delta)
{
viewModel.Position.Subscribe(x => Position = x.ToGodot3D());
}
// GOOD -- subscribe once in Bind() or _Ready()
public override void _Ready()
{
_ = viewModel.Position.Subscribe(x => Position = x.ToGodot3D());
}
Subjects are mutable variables. Use them at boundaries only (network input, user input). Derive everything else with operators.
// BAD -- unnecessary Subject for derived state
public Subject<Vector2> Position { get; set; } = new();
// ... somewhere: Updates.Subscribe(x => Position.OnNext(x.Position));
// GOOD -- derive with operators, no extra Subject
public IObservable<Vector2> Position { get; set; }
Position = Updates.Select(x => x.Position).StartWith(data.Position);
Without StartWith, subscribers receive nothing until the first event. Nodes spawn at origin.
// BAD -- no initial value
Position = Updates.Select(x => x.Position);
// GOOD
Position = Updates.Select(x => x.Position).StartWith(data.Position);
ReadOnlyObservableCollection from Bind() is read-only. All mutations go through SourceCache. Never try to add/remove items on the bound collection.
Multiple Connect() calls are fine with different operators. Identical pipelines are waste — bind once, share the result.
Always pass an onError handler to Subscribe. Without it, an error silently kills the subscription:
// BAD: .Subscribe(HandleUpdate);
// GOOD:
.Subscribe(onNext: HandleUpdate,
onError: ex => GD.PrintErr(ex.Message));
Recommended approach — no Rx tests currently exist in CrystalMagica. Use
Microsoft.Reactive.Testing.TestSchedulerfor deterministic time. Subscribe synchronously and assert immediately —StartWithandSubject.OnNextemit inline. For examples, see references/advanced-patterns.md.