From reactor
Author Reactor pipelines with Flux and Mono. Use this skill when designing or reviewing Flux/Mono source creation, operator composition, combination, empty/error behavior, ordinary backpressure choices, and everyday Context usage in Project Reactor.
npx claudepluginhub ririnto/sinon --plugin reactorThis skill uses the workspace's default tool permissions.
Author the ordinary Reactor path with `Flux` and `Mono`.
agents/openai.yamlreferences/backpressure.mdreferences/batching-grouping-windowing.mdreferences/blocking-bridges.mdreferences/combining-operators.mdreferences/context-propagation.mdreferences/creation-patterns.mdreferences/debugging-and-observability.mdreferences/hot-cold-and-multicasting.mdreferences/retry-strategies.mdSearches, retrieves, and installs Agent Skills from prompts.chat registry using MCP tools like search_skills and get_skill. Activates for finding skills, browsing catalogs, or extending Claude.
Checks Next.js compilation errors using a running Turbopack dev server after code edits. Fixes actionable issues before reporting complete. Replaces `next build`.
Guides code writing, review, and refactoring with Karpathy-inspired rules to avoid overcomplication, ensure simplicity, surgical changes, and verifiable success criteria.
Share bugs, ideas, or general feedback.
Author the ordinary Reactor path with Flux and Mono.
This skill covers source selection, operator composition, combination, empty/error behavior, ordinary demand decisions, and request-scoped Context usage. Keep dedicated scheduler design, sink-driven hot sources, and reactor-test work in their own domains.
Flux<T> and Mono<T>ContextSinksStepVerifier, TestPublisher, or virtual time| Reactor surface | Keep in this file | Open a reference when... |
|---|---|---|
Flux / Mono choice | cardinality, contract, type-switching operators | the real problem is hot/manual sources rather than ordinary cardinality |
| source creation | just, empty, error, defer, fromCallable, fromSupplier, fromFuture, fromIterable, range | you need generate, create, push, using, or other programmatic patterns |
| operator composition | map, flatMap, concatMap, handle, filter, take, skip, collectList, reduce, switchMap, flatMapMany | ordering, batching, or async fan-out details become the blocker |
| combination | concat, merge, zip, combineLatest, then, switchIfEmpty, switchOnNext | exact completion or ordering semantics decide correctness |
| empty/error behavior | defaultIfEmpty, switchIfEmpty, onErrorResume, onErrorMap, doFinally, timeout, repeat, bounded retry | you need Retry policies, backoff, filtering, retry exhaustion, or repeat exhaustion rules |
| ordinary backpressure | natural demand, limitRate(...), basic overflow choice | prefetch, BaseSubscriber, or queue growth becomes the blocker |
| threading / schedulers | recognize the boundary only | publishOn(...), subscribeOn(...), scheduler choice, or execution tracing becomes the main problem |
| blocking bridge | one blocking boundary with Mono.fromCallable(...) | multiple boundaries, fromRunnable/fromFuture nuances, terminal bridges, or virtual-thread considerations are needed |
Context | contextWrite(...), deferContextual(...), ordinary metadata flow | nested writes, library-facing composition, or precedence rules are the blocker |
| hot/cold and multicast | recognize the boundary only | repeated work, replay, or shared subscriptions decide the design |
| sink/manual hot-source APIs | recognize the boundary only | manual emission, sink flavor choice, or emit-failure behavior decides the design |
| batching/grouping/windowing | recognize the boundary only | groupBy, window, or buffer shapes the pipeline |
| sequence diagnostics | signal-level checkpoint(...), log(...), or doOnEach(...) as narrow tools | assembly tracing, global hooks, or thread-hop debugging becomes the main job |
Mono<T> for 0..1, Flux<T> for 0..N.map(...) for synchronous value-to-value work and flatMap(...) for publisher-returning work.concatMap(...) when downstream order must stay stable.switchMap(...) when previous inner publishers should cancel on new trigger (search-as-you-type, latest-win semantics).timeout(...) to bound the wait for any signal and fall back or error on delay.Mono.fromCallable(...) and move it off the caller thread deliberately.Context for cross-cutting metadata, not for primary business payload.Mono<T> for one result, empty, or error.Flux<T> when multiple values may arrive.just, justOrEmpty, empty, error.fromIterable, range.fromSupplier, fromCallable.fromFuture.defer.map, flatMap, concatMap, switchMap, handle.flatMapMany (or flatMapIterable for Iterable sources).filter, take, skip, distinct.collectList, reduce, count.concat, concatWith.merge, mergeWith.zip, zipWith.combineLatest.then, thenMany.defaultIfEmpty (eager static value), switchIfEmpty (lazy alternative publisher).onErrorReturn, onErrorResume, onErrorMap.timeout(...) or timeoutWhen(...) for delayed fallback on silence.repeat(...) or repeatWhen(...) for re-subscription loops.doFinally.retry(n) or retryWhen(...) with a deliberate policy.limitRate(...) or an explicit overflow policy only when mismatch is real.contextWrite(...) and deferContextual(...) when metadata must survive async boundaries.Mono.fromCallable(...) and keep the rest of the chain reactive.Context and read it where needed.| Need | Default move | Why |
|---|---|---|
| one result | Mono<T> | preserves 0..1 semantics directly |
| many results | Flux<T> | keeps demand and streaming visible |
| lazy one-shot lookup | Mono.fromCallable(...) | captures deferred work once per subscription |
| async step returning a publisher | flatMap(...) | flattens the nested publisher |
| async step with ordering | concatMap(...) | preserves source order |
| cancellation-aware fan-out (latest-win) | switchMap(...) | cancels previous inner on new trigger |
| Mono-to-Flux flatten | flatMapMany(...) or flatMapIterable(...) | bridges cardinality change |
| synchronous reshape | map(...) | keeps the chain simple |
| conditional multi-signal emission | handle(...) | emit 0..N values per input element |
| static empty fallback | defaultIfEmpty(...) | eagerly replaces empty with one value |
| dynamic empty fallback | switchIfEmpty(...) | lazily switches to another publisher on empty |
| error fallback | onErrorResume(...) | chooses a replacement publisher |
| exception translation | onErrorMap(...) | preserves failure flow while changing the type |
| time-bounded operation | timeout(Duration) | errors or falls back if no signal arrives in time |
| completion-side repeat | repeat(n) or repeatWhen(...) | re-subscribes on completion, not error |
| sequential combination | concat(...) | later sources wait for earlier completion |
| concurrent combination | merge(...) | emits as soon as values arrive |
| pair by index | zip(...) | aligns values positionally |
| latest-state recompute | combineLatest(...) | recomputes when any source changes |
| lifecycle side effect | doOnSubscribe / doOnNext / doOnError / doOnCancel / doFinally | observe signals without transforming |
| request metadata | contextWrite(...) + deferContextual(...) | keeps metadata in the subscription, not the thread |
Mono contract with blocking bridge and explicit failure pathimport java.io.IOException;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
final class UserLookupService {
Mono<UserView> loadUser(String userId) {
return Mono.fromCallable(() -> fetchUser(userId))
.subscribeOn(Schedulers.boundedElastic())
.switchIfEmpty(Mono.error(new IllegalStateException("Missing user: " + userId)))
.map(user -> new UserView(user.id(), user.name()))
.onErrorMap(IOException.class, error -> new IllegalStateException("User lookup failed", error));
}
private User fetchUser(String userId) throws IOException {
return new User(userId, "Ada");
}
record User(String id, String name) {}
record UserView(String id, String name) {}
}
One scheduler hop offloads the blocking call to boundedElastic. For multi-hop or custom scheduler design, see the reactor-scheduling skill.
Flux pipeline with ordered async fan-out and empty fallbackimport reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
final class ActivityService {
Flux<String> recentActions(String userId) {
return findActionIds(userId)
.concatMap(this::fetchAction)
.filter(action -> !action.isBlank())
.switchIfEmpty(Flux.just("NO_ACTIONS"));
}
private Flux<String> findActionIds(String userId) {
return Flux.just("login", "purchase", "logout");
}
private Mono<String> fetchAction(String actionId) {
return Mono.just(actionId.toUpperCase());
}
}
Context usage for request metadataimport reactor.core.publisher.Mono;
import reactor.util.context.Context;
final class TraceAwareHandler {
Mono<String> handle(String input) {
return Mono.deferContextual(context -> Mono.just(context.get("traceId") + ":" + input))
.contextWrite(Context.of("traceId", "trace-123"));
}
}
switchMap for cancellation-aware fan-outimport reactor.core.publisher.Flux;
import java.time.Duration;
final class SearchAsYouType {
Flux<String> search(Flux<String> queries) {
return queries.switchMap(query ->
fetchResults(query).take(Duration.ofMillis(200))
);
}
private Flux<String> fetchResults(String query) {
return Flux.just(query + "-result1", query + "-result2");
}
}
switchMap cancels the previous inner publisher when a new trigger arrives. Use it for latest-win scenarios like search-as-you-type or live configuration refresh.
timeout with fallbackimport java.time.Duration;
import reactor.core.publisher.Mono;
final class BoundedCall {
Mono<String> fetchWithTimeout() {
return remoteCall()
.timeout(Duration.ofSeconds(3), Mono.just("fallback-value"));
}
private Mono<String> remoteCall() {
return Mono.just("response").delayElement(Duration.ofMillis(100));
}
}
repeat for completion-side re-subscriptionimport reactor.core.publisher.Mono;
final class PollingService {
Flux<String> pollUntilCondition() {
return fetchStatus()
.repeat(3)
.filter(status -> "DONE".equals(status))
.take(1);
}
private Mono<String> fetchStatus() {
return Mono.just("PENDING");
}
}
repeat re-subscribes on completion (not error). Combine with take(n) or repeatWhen(...) to avoid infinite loops.
handle for conditional multi-signal emissionimport reactor.core.publisher.Flux;
final class ConditionalEmission {
Flux<Integer> expand(Flux<Integer> source) {
return source.handle((value, sink) -> {
if (value > 0) {
sink.next(value);
if (value % 2 == 0) {
sink.next(value * 10);
}
}
});
}
}
handle allows emitting 0, 1, or N values per input element without nesting publishers. Values less than or equal to zero are silently filtered (no signal emitted).
flatMapMany for Mono-to-Flux cardinality changeimport reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.List;
final class BatchExpand {
Flux<String> expandBatch(Mono<List<String>> batchLoader) {
return batchLoader.flatMapMany(Flux::fromIterable);
}
}
Use flatMapMany when a Mono<T> produces a collection that should be emitted as individual elements in a Flux.
| Anti-pattern | Why it fails | Correct move |
|---|---|---|
using map(...) for async work | nests publishers or hides concurrency | use flatMap(...) or concatMap(...) |
returning Mono<List<T>> for a streaming contract | hides streaming semantics and demand | use Flux<T> unless the collection itself is the single value |
placing blocking I/O inside map(...) | occupies the current worker thread invisibly | isolate it with fromCallable(...) |
using retry() without a bound or policy | can loop forever under outage | use bounded retry or a deliberate Retry policy |
using repeat() without a termination condition | re-subscribes forever on every completion | combine with take(n), repeatUntil(...), or repeatWhen(...) |
using timeout(...) without fallback | unbounded timeout produces raw TimeoutException downstream | provide a fallback publisher via timeout(duration, fallback) |
treating Context like mutable shared state | writes are immutable and per subscription | write a new Context and read it with deferContextual(...) |
choosing merge(...) when order matters | output order becomes unstable | use concat(...) or concatMap(...) |
| using programmatic creation for ordinary values | makes the source harder to reason about | stay with factory methods until a blocker exists |
using switchMap(...) when all inner results matter | cancels previous inners before they complete | use flatMap(...) or concatMap(...) instead |
Flux vs Mono matches the real cardinality contract.switchMap for cancellation-aware fan-out).Context carries metadata, not primary payload.| Open this when... | Reference |
|---|---|
| factory methods are not enough and you must emit programmatically or bind resource lifetime | Programmatic Sequence Creation |
the exact choice among concat, merge, zip, combineLatest, or then changes correctness | Combining Operators |
demand, prefetch, BaseSubscriber, or overflow policy is the blocker | Backpressure and Demand |
Context behavior across nested composition or library-facing code becomes the blocker | Context for Library-Facing Flows |
plain retry(n) is not enough and you need backoff, filtering, or retry exhaustion rules | Retry Strategies |
| shared subscriptions, replay, or hot/cold behavior changes the design | Hot, Cold, and Multicasting |
groupBy, window, or buffer shapes the pipeline and the flow stalls or grows unexpectedly | Batching, Grouping, and Windowing |
| one blocking boundary is not enough and you need multiple boundaries, fromRunnable/fromFuture bridges, terminal edges, or virtual-thread considerations | Advanced Blocking Bridges |
| signal-level inspection (per-signal Context, conditional value inspection, tap observer) is needed, or assembly/thread tracing is the real blocker | Signal-Level Diagnostics |
Return:
Context decision that changes runtime behavior.