From reactor
Design Reactor execution context with explicit scheduler choice, publishOn/subscribeOn placement, and blocking-boundary decisions. Use this skill when designing or reviewing Reactor execution context decisions: scheduler choice, publishOn/subscribeOn placement, blocking offload, thread-affinity boundaries, and ordinary scheduling diagnostics.
npx claudepluginhub ririnto/sinon --plugin reactorThis skill uses the workspace's default tool permissions.
Choose execution context deliberately in Reactor.
Searches, retrieves, and installs Agent Skills from prompts.chat registry using MCP tools like search_skills and get_skill. Activates for finding skills, browsing catalogs, or extending Claude.
Checks Next.js compilation errors using a running Turbopack dev server after code edits. Fixes actionable issues before reporting complete. Replaces `next build`.
Guides code writing, review, and refactoring with Karpathy-inspired rules to avoid overcomplication, ensure simplicity, surgical changes, and verifiable success criteria.
Share bugs, ideas, or general feedback.
Choose execution context deliberately in Reactor.
This skill covers the ordinary path for scheduler choice, publishOn(...) vs subscribeOn(...), blocking offload, thread-affinity boundaries, and local scheduling diagnostics. Keep custom scheduler factories, automatic context propagation, global hooks, and test-only virtual-time work in blocker references or sibling skills.
Schedulers.parallel(), boundedElastic(), single(), or immediate()publishOn(...) or subscribeOn(...) is the right moveFlux / Mono source creation and ordinary operator composition as the main problemreactor-test design as the main job| Reactor scheduling surface | Keep in this file | Open a reference when... |
|---|---|---|
| default execution model | source runs on the subscription thread until a scheduler changes it | you are debugging assembly/runtime gaps across many chains |
| scheduler choice | parallel, boundedElastic, single, immediate | shared defaults are not enough and you must create, tune, or replace schedulers explicitly |
publishOn(...) vs subscribeOn(...) | ordinary placement, effect, and when both are justified | hook-level scheduling instrumentation or shared scheduler replacement becomes the blocker |
| blocking offload | one blocking boundary with Mono.fromCallable(...) + subscribeOn(boundedElastic()) | queue caps, lifecycle, virtual threads, or custom executor bridges matter |
| thread-affinity and context boundary | Context flows with the subscription, not the thread | ThreadLocal bridging or automatic context propagation becomes the blocker |
| local scheduling diagnostics | log(...), named checkpoint(...), and visible thread logging | global hooks, assembly tracing, or debug-agent level tooling becomes the blocker |
| virtual time boundary | recognize that time control belongs to testing | time simulation becomes the main job |
Scheduler.subscribe() unless you move them.publishOn(...) to switch downstream execution from that point onward.subscribeOn(...) to move subscription and upstream source work.subscribeOn(...) at the source boundary, especially for blocking bridges.Schedulers.parallel() for fast non-blocking CPU work.Schedulers.boundedElastic() for blocking I/O or thread-affine imperative code.Context as subscription metadata, not as a thread-local replacement by itself.Schedulers.parallel().Schedulers.boundedElastic().Schedulers.single() or a dedicated custom scheduler.Schedulers.immediate().subscribeOn(...).publishOn(...).Context rather than thread locals.log(...), named checkpoint(...), and explicit thread logging before reaching for global hooks.subscribeOn(...) at the source if the source or blocking bridge must move.publishOn(...) only where downstream affinity truly changes.Context.| Need | Default move | Why |
|---|---|---|
| CPU-bound non-blocking work | Schedulers.parallel() | fixed worker pool sized for CPU work |
| blocking I/O or legacy bridge | Schedulers.boundedElastic() | bounded worker expansion for blocking tasks |
| one serialized execution lane | Schedulers.single() | preserves one-thread affinity |
| no real handoff (test or caller-thread only) | Schedulers.immediate() | runs on the current thread; avoid in production pipelines |
| move source and subscription | subscribeOn(...) | affects upstream work |
| move downstream operators | publishOn(...) | affects work after that operator |
| request metadata across async boundaries | contextWrite(...) + deferContextual(...) | survives thread switches without thread-local assumptions |
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
final class BlockingLookup {
Mono<String> readUser(String userId) {
return Mono.fromCallable(() -> blockingLookup(userId))
.subscribeOn(Schedulers.boundedElastic())
.map(String::trim);
}
private String blockingLookup(String userId) {
return "user-" + userId;
}
}
publishOn(...)import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
final class PublishOnExample {
Flux<String> process() {
return Flux.just("a", "b", "c")
.map(String::toUpperCase)
.publishOn(Schedulers.parallel())
.map(value -> value + "-done");
}
}
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
final class MixedExecutionExample {
Mono<String> loadAndTransform(String userId) {
return Mono.fromCallable(() -> blockingLookup(userId))
.subscribeOn(Schedulers.boundedElastic())
.publishOn(Schedulers.parallel())
.map(String::toUpperCase);
}
private String blockingLookup(String userId) {
return "user-" + userId;
}
}
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.context.Context;
final class ContextAcrossThreads {
Mono<String> handle(String input) {
return Mono.deferContextual(context -> Mono.just(context.get("requestId") + ":" + input))
.publishOn(Schedulers.parallel())
.contextWrite(Context.of("requestId", "req-42"));
}
}
| Anti-pattern | Why it fails | Correct move |
|---|---|---|
using publishOn(...) to move a blocking source | the source still runs before the handoff | wrap the source and use subscribeOn(...) |
stacking many publishOn(...) calls | adds context switches without adding correctness | keep only the handoffs that change behavior |
using parallel() for blocking I/O | blocks non-blocking worker threads | use boundedElastic() |
| assuming thread locals survive scheduler hops | the thread can change between signals | move request data through Context |
adding multiple subscribeOn(...) calls for control | only the closest relevant source placement matters | place one subscribeOn(...) at the real boundary |
placing subscribeOn(...) inside a flatMap lambda to affect the outer chain | subscribeOn inside flatMap scopes to the inner publisher only | place subscribeOn at the outer source or on the inner publisher deliberately |
using Schedulers.immediate() in production code | runs on the caller thread with no isolation | reserve for test code or when you explicitly want caller-thread execution |
placing contextWrite(...) downstream of publishOn(...) | contextWrite affects upstream operators; placement after a thread switch may not reach intended targets | place contextWrite before publishOn when upstream needs the context |
publishOn(...) and subscribeOn(...) are used for the correct direction of influence.Context is described as subscription metadata rather than thread-local state.| Open this when... | Reference |
|---|---|
| shared scheduler defaults are not enough and you must create, tune, or replace schedulers explicitly | Scheduler Tuning and Custom Schedulers |
ThreadLocal-backed data must cross scheduler boundaries or automatic context propagation becomes the blocker | ThreadLocal Context Bridging |
local log(...) and checkpoint(...) are not enough and you need global debugging hooks or assembly tracing | Debugging and Hooks |
Return:
publishOn(...) / subscribeOn(...) placement and why it changes execution correctly.Context rule that changes runtime behavior.