From reactor
Author Reactor hot sources with Sinks for manual emission, replay/multicast selection, and emit-result handling. Use this skill when designing or reviewing Reactor hot-source APIs with Sinks: sink type selection, manual emission, replay/multicast choices, emit result handling, and the boundary between Sinks and ConnectableFlux-style sharing.
npx claudepluginhub ririnto/sinon --plugin reactorThis skill uses the workspace's default tool permissions.
Author Reactor hot sources deliberately.
Searches, retrieves, and installs Agent Skills from prompts.chat registry using MCP tools like search_skills and get_skill. Activates for finding skills, browsing catalogs, or extending Claude.
Checks Next.js compilation errors using a running Turbopack dev server after code edits. Fixes actionable issues before reporting complete. Replaces `next build`.
Guides code writing, review, and refactoring with Karpathy-inspired rules to avoid overcomplication, ensure simplicity, surgical changes, and verifiable success criteria.
Share bugs, ideas, or general feedback.
Author Reactor hot sources deliberately.
This skill covers the ordinary path for choosing Sinks.one(), Sinks.empty(), or Sinks.many(), selecting unicast vs multicast vs replay behavior, handling tryEmit* / emit* outcomes, and deciding when a sink is the right tool instead of ConnectableFlux-style hot conversion. Keep advanced connection lifecycle, contention tuning, and internal unsafe sinks in blocker references.
Sinks.one(), Sinks.empty(), and Sinks.many()EmitResult, EmitFailureHandler, or ordinary backpressure outcomes from manual emissionFlux / Mono composition as the main problemreactor-test APIs as the main jobProcessor-based designs| Reactor hot-source surface | Keep in this file | Open a reference when... |
|---|---|---|
| hot vs cold distinction | ordinary difference between per-subscription work and shared/manual hot sources | the design depends on connection lifecycle rather than sink emission |
| when to choose Sinks | choose Sinks for programmatic emission, not just for sharing a cold source | the real problem is connect/disconnect and subscriber rendezvous |
| sink shape | Sinks.one(), Sinks.empty(), Sinks.many() | internal contention or unsafe sink use becomes the blocker |
| fan-out model | unicast vs multicast vs replay | replay policy or connection lifecycle becomes the blocker |
| emission API | tryEmit* vs emit*, EmitResult, EmitFailureHandler | failure handling rules become the main problem |
| backpressure implications | ordinary queueing, dropping, or replay behavior | overflow and concurrent emission are the main blockers |
| ConnectableFlux boundary | recognize when publish(), replay(), autoConnect(...), or refCount(...) is a better fit than Sinks | connection management is the design problem |
| concurrency boundary | recognize that safe sinks detect non-serialized emission | external synchronization or Sinks.unsafe() becomes the blocker |
Sinks.one() for exactly one terminal value, Sinks.empty() for terminal-only completion/error, and Sinks.many() for multi-value streams.onBackpressureBuffer() when retention is acceptable, directAllOrNothing() when all subscribers must advance together, and directBestEffort() when slow subscribers may miss values but fast ones should continue.tryEmit* when the caller can inspect and branch on EmitResult immediately.emit* only when retry semantics are deliberate and an EmitFailureHandler is part of the design.Sinks.unsafe() as an internal optimization boundary, not an ordinary default.publish(), replay(), autoConnect(...), or refCount(...) instead.Sinks.one().Sinks.empty().Sinks.many().Sinks.many().
unicast().multicast() with the right delivery strategy.replay() with the right retention rule.tryEmitNext(...), tryEmitComplete(), tryEmitError(...).emitNext(...), emitComplete(...), emitError(...) with a failure handler.Mono or Flux through asMono() or asFlux().
asMono() for Sinks.one() and Sinks.empty() -- matches the 0..1 cardinality contract.asFlux() for Sinks.many() -- exposes the multi-value surface.asMono() on a Sinks.many() works but signals a cardinality mismatch; prefer asFlux().tryEmit* or emit* deliberately and make failure behavior explicit.| Need | Default move | Why |
|---|---|---|
| one async result | Sinks.one() | one value or terminal error |
| terminal-only completion/error | Sinks.empty() | no payload value |
| one subscriber with buffering | Sinks.many().unicast().onBackpressureBuffer() | single-consumer stream |
| live fan-out with retained backlog | Sinks.many().multicast().onBackpressureBuffer() | broadcast current signals and buffer by demand |
| live fan-out where all subscribers move together | Sinks.many().multicast().directAllOrNothing() | drop for everyone if one subscriber has no demand |
| live fan-out where only slow subscribers fall behind | Sinks.many().multicast().directBestEffort() | keep fast subscribers flowing |
| live fan-out that errors on overflow | Sinks.many().multicast().onBackpressureError() | fail fast when downstream cannot keep up |
| replay all retained history | Sinks.many().replay().all() | late subscribers receive full retained history |
| replay only the latest signal | Sinks.many().replay().latest() | late subscribers see current state only |
| replay bounded history | Sinks.many().replay().limit(...) | retains selected size or time window |
| immediate emission result | tryEmitNext(...) | caller handles EmitResult directly |
| retry-aware emission | emitNext(..., handler) | retries or fails by policy |
| share existing cold source | publish(), replay(), refCount(...), autoConnect(...) | no new sink required |
| If you need... | Use | Trade-off |
|---|---|---|
| every active subscriber to stay aligned | directAllOrNothing() | one slow subscriber can cause drops for all |
| fast subscribers to keep flowing while slow ones miss values | directBestEffort() | delivery diverges across subscribers |
| demand-aware buffering for subscribers | onBackpressureBuffer() | retains elements in memory |
| fail immediately on downstream overflow | onBackpressureError() | propagates error to upstream producer |
| late subscribers to see the full retained stream | replay().all() | retention grows unless bounded externally |
| late subscribers to see only the current state | replay().latest() | emits last signal after first emission; replay().limit(1) replays even the first signal immediately |
| late subscribers to see bounded recent history | replay().limit(...) | history is explicit by size or time |
Sinks.one() for one callback resultimport reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
final class CallbackBridge {
Mono<String> loadValue() {
Sinks.One<String> sink = Sinks.one();
completeLater(sink);
return sink.asMono();
}
private void completeLater(Sinks.One<String> sink) {
sink.tryEmitValue("done");
}
}
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
final class EventBus {
private final Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();
Flux<String> events() {
return sink.asFlux();
}
void publish(String event) {
sink.tryEmitNext(event);
}
}
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
final class ReplayFeed {
private final Sinks.Many<String> sink = Sinks.many().replay().limit(3);
Flux<String> feed() {
return sink.asFlux();
}
void record(String value) {
sink.tryEmitNext(value);
}
}
emitNext(...) with explicit failure policyimport reactor.core.publisher.Sinks;
final class RetriedEmission {
void publish(Sinks.Many<String> sink, String value) {
sink.emitNext(value, Sinks.EmitFailureHandler.FAIL_FAST);
}
}
Sinks.empty() for terminal-only signalsimport reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
final class CompletionSignal {
Mono<Void> shutdownSignal() {
Sinks.Empty<Void> sink = Sinks.empty();
triggerShutdownLater(sink);
return sink.asMono();
}
private void triggerShutdownLater(Sinks.Empty<Void> sink) {
sink.tryEmitComplete();
}
}
Use Sinks.empty() when the only signals are completion or error -- no payload value is carried. Call tryEmitComplete() for normal termination or tryEmitError(...) for abnormal termination.
Sinks.many().unicast() for single-subscriber streamsimport reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
final class UnicastStream {
private final Sinks.Many<String> sink = Sinks.many().unicast().onBackpressureBuffer();
Flux<String> stream() {
return sink.asFlux();
}
void send(String value) {
sink.tryEmitNext(value);
}
void finish() {
sink.tryEmitComplete();
}
}
Unicast allows exactly one subscriber. A second subscription attempt receives an IllegalStateException. Use it when the downstream consumer is known to be singular (e.g., a dedicated processing pipeline).
| Anti-pattern | Why it fails | Correct move |
|---|---|---|
| creating a sink just to share an existing cold source | adds manual-emission complexity without need | use publish(), replay(), or refCount(...) |
| using multicast when late subscribers need history | late subscribers only see future values | use replay |
ignoring EmitResult from tryEmit* | failures become invisible | branch on the result or switch to emit* |
| using replay with no limit by default | cached history can grow without bound | choose a size or time limit deliberately |
| assuming safe sinks serialize every producer intention automatically | concurrent calls can still return FAIL_NON_SERIALIZED | handle contention or move to a coordinated emission strategy |
tryEmit* vs emit* is a deliberate API choice.| Open this when... | Reference |
|---|---|
EmitResult, retry policy, overflow, or terminated/cancelled emission is the blocker | Sink Emission Failures |
| the real problem is connect, disconnect, replay, or subscriber rendezvous for a shared cold source | Connectable Flux Patterns |
contention, external synchronization, or Sinks.unsafe() is the blocker | Concurrent and Unsafe Emission |
Return: