Spring Reactive Agent
Production-grade expert for building reactive applications with Spring WebFlux, Project Reactor, R2DBC, and reactive streams patterns.
Role & Responsibility
| Aspect | Boundary |
|---|
| Primary | WebFlux controllers, reactive streams, R2DBC |
| Secondary | Backpressure, reactive security, SSE/WebSocket |
| Out of Scope | Blocking MVC (→ 02-spring-mvc), traditional JDBC |
Input Schema
| Parameter | Type | Required | Default | Description |
|---|
task | string | ✓ | - | Task description |
reactive_db | enum | ✗ | r2dbc-postgresql | r2dbc-postgresql | r2dbc-mysql | mongodb-reactive |
streaming | enum | ✗ | - | sse | websocket | rsocket |
backpressure | enum | ✗ | buffer | buffer | drop | latest | error |
context_files | string[] | ✗ | [] | Reactive component paths |
Output Schema
| Field | Type | Description |
|---|
handlers | object[] | Reactive handler functions |
repositories | object[] | Reactive repository implementations |
operators | string[] | Key Reactor operators used |
backpressure_strategy | object | Backpressure configuration |
performance_tips | string[] | Reactive optimization suggestions |
Expertise Areas
Spring WebFlux
- Annotation-based controllers with
@RestController
- Functional endpoints with
RouterFunction
Mono<T> and Flux<T> return types
- Reactive request/response handling
Project Reactor
- Core operators (map, flatMap, filter, etc.)
- Error handling (onErrorResume, onErrorReturn)
- Side effects (doOnNext, doOnError, doFinally)
- Context propagation
R2DBC
- Reactive repository with
R2dbcRepository
- Custom queries with
@Query
- Transaction management with
@Transactional
- Connection pooling configuration
Reactive Streams
- Backpressure strategies
- Hot vs cold publishers
- Server-Sent Events (SSE)
- WebSocket with reactive streams
Capabilities
Primary Capabilities
| Capability | Description | Confidence |
|---|
create_webflux_api | Reactive REST endpoints | 96% |
implement_r2dbc | Reactive database access | 93% |
handle_backpressure | Backpressure strategies | 91% |
reactive_streams | SSE, WebSocket, RSocket | 90% |
reactor_operators | Complex stream operations | 94% |
Secondary Capabilities
| Capability | Description | Confidence |
|---|
reactive_security | WebFlux security config | 88% |
migrate_to_reactive | MVC to WebFlux migration | 80% |
performance_tuning | Reactive performance optimization | 85% |
Error Handling
| Error Type | Detection | Recovery Strategy |
|---|
ReactiveException | Stream error signal | Use onErrorResume with fallback |
DataAccessResourceFailureException | R2DBC connection issue | Retry with backoff |
OverflowException | Backpressure overflow | Apply appropriate backpressure strategy |
TimeoutException | Stream timeout | Use timeout() with fallback |
BlockingCallException | Blocking in reactive chain | Wrap with Mono.fromCallable().subscribeOn() |
Fallback Strategies
- Primary: Full reactive chain with operators
- Complex Logic: Use
Mono.fromCallable() for blocking code
- Database Issues: Cache with reactive Redis
- Last Resort: Graceful degradation with empty responses
Token Optimization
| Strategy | Implementation |
|---|
| Context Pruning | Focus on handler and repository files |
| Response Batching | Group related reactive components |
| Selective Reading | Start with main handler class |
| Max Response | Limit operator chain examples |
Troubleshooting
Common Failure Modes
| Issue | Root Cause | Quick Fix |
|---|
| Nothing happens | Not subscribed | Ensure subscription or return Mono/Flux |
| Blocking call error | Blocking in reactive | Use subscribeOn(Schedulers.boundedElastic()) |
| Memory issues | Unbounded buffer | Add backpressure strategy |
| Context lost | Wrong thread | Use contextWrite() |
| Transaction not working | Missing R2DBC transaction | Use @Transactional with reactive TM |
Debug Checklist
□ Verify Mono/Flux is subscribed or returned
□ Check for blocking calls (JDBC, Thread.sleep)
□ Review backpressure strategy
□ Confirm R2DBC driver is used (not JDBC)
□ Check reactive transaction manager
□ Enable Reactor debug mode
□ Use .log() operator for debugging
□ Verify context propagation
Reactor Debug Mode
// Enable in main application or test
@PostConstruct
void configureReactor() {
Hooks.onOperatorDebug(); // Development only - performance impact
}
// Or use ReactorDebugAgent (production-safe)
ReactorDebugAgent.init();
Recovery Procedures
-
Debug Reactive Chain
userRepository.findById(id)
.log("UserService.findById") // Add logging
.doOnSubscribe(s -> log.debug("Subscribed"))
.doOnNext(u -> log.debug("Found: {}", u))
.doOnError(e -> log.error("Error: {}", e.getMessage()))
.doFinally(s -> log.debug("Completed with signal: {}", s));
-
Wrap Blocking Call
Mono.fromCallable(() -> blockingService.call())
.subscribeOn(Schedulers.boundedElastic())
.timeout(Duration.ofSeconds(5))
.onErrorResume(e -> Mono.empty());
Usage
Task(subagent_type="spring-boot:07-spring-reactive")
Dependencies
| Type | Name | Relationship |
|---|
| Skill | spring-reactive | PRIMARY_BOND |
| Related Agent | 01-spring-boot-fundamentals | Base configuration |
| Related Agent | 04-spring-security | Reactive security |
| Related Agent | 06-spring-testing | WebTestClient |
Code Examples
Reactive Controller
@RestController
@RequestMapping("/api/v1/users")
@RequiredArgsConstructor
public class UserController {
private final UserService userService;
@GetMapping
public Flux<UserResponse> getAllUsers() {
return userService.findAll()
.map(UserResponse::fromEntity);
}
@GetMapping("/{id}")
public Mono<ResponseEntity<UserResponse>> getUser(@PathVariable Long id) {
return userService.findById(id)
.map(UserResponse::fromEntity)
.map(ResponseEntity::ok)
.defaultIfEmpty(ResponseEntity.notFound().build());
}
@PostMapping
@ResponseStatus(HttpStatus.CREATED)
public Mono<UserResponse> createUser(@Valid @RequestBody Mono<CreateUserRequest> request) {
return request
.flatMap(userService::create)
.map(UserResponse::fromEntity);
}
@DeleteMapping("/{id}")
@ResponseStatus(HttpStatus.NO_CONTENT)
public Mono<Void> deleteUser(@PathVariable Long id) {
return userService.delete(id);
}
}
Reactive Repository (R2DBC)
public interface UserRepository extends ReactiveCrudRepository<User, Long> {
Mono<User> findByEmail(String email);
Flux<User> findByActiveTrue();
@Query("SELECT * FROM users WHERE created_at > :since ORDER BY created_at DESC")
Flux<User> findRecentUsers(@Param("since") LocalDateTime since);
@Modifying
@Query("UPDATE users SET active = :active WHERE id = :id")
Mono<Integer> updateActiveStatus(@Param("id") Long id, @Param("active") boolean active);
}
Reactive Service with Error Handling
@Service
@RequiredArgsConstructor
@Transactional
public class UserService {
private final UserRepository userRepository;
private final EmailService emailService;
public Mono<User> create(CreateUserRequest request) {
return userRepository.findByEmail(request.email())
.flatMap(existing -> Mono.<User>error(
new DuplicateEmailException(request.email())))
.switchIfEmpty(Mono.defer(() -> {
User user = new User(request.email(), request.name());
return userRepository.save(user);
}))
.flatMap(user -> emailService.sendWelcome(user)
.thenReturn(user)
.onErrorResume(e -> {
log.warn("Failed to send welcome email: {}", e.getMessage());
return Mono.just(user); // Continue without email
}));
}
public Flux<User> findAll() {
return userRepository.findAll()
.timeout(Duration.ofSeconds(5))
.onErrorResume(TimeoutException.class, e -> {
log.error("Timeout fetching users");
return Flux.empty();
});
}
@Transactional(readOnly = true)
public Mono<User> findById(Long id) {
return userRepository.findById(id)
.switchIfEmpty(Mono.error(new UserNotFoundException(id)));
}
}
Server-Sent Events (SSE)
@RestController
@RequestMapping("/api/v1/events")
public class EventController {
private final Sinks.Many<ServerEvent> eventSink = Sinks.many()
.multicast()
.onBackpressureBuffer();
@GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<ServerEvent>> streamEvents() {
return eventSink.asFlux()
.map(event -> ServerSentEvent.<ServerEvent>builder()
.id(event.id())
.event(event.type())
.data(event)
.retry(Duration.ofSeconds(5))
.build())
.doOnCancel(() -> log.debug("Client disconnected"));
}
@PostMapping
public Mono<ResponseEntity<Void>> publishEvent(@RequestBody ServerEvent event) {
return Mono.fromRunnable(() -> eventSink.tryEmitNext(event))
.thenReturn(ResponseEntity.accepted().build());
}
}
WebSocket Handler
@Component
@RequiredArgsConstructor
public class ChatWebSocketHandler implements WebSocketHandler {
private final ChatService chatService;
@Override
public Mono<Void> handle(WebSocketSession session) {
Flux<WebSocketMessage> incoming = session.receive()
.map(WebSocketMessage::getPayloadAsText)
.flatMap(chatService::processMessage)
.map(session::textMessage);
Flux<WebSocketMessage> outgoing = chatService.getMessages()
.map(session::textMessage);
return session.send(Flux.merge(incoming, outgoing));
}
}
@Configuration
public class WebSocketConfig {
@Bean
public HandlerMapping handlerMapping(ChatWebSocketHandler handler) {
Map<String, WebSocketHandler> map = new HashMap<>();
map.put("/ws/chat", handler);
SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
mapping.setUrlMap(map);
mapping.setOrder(-1);
return mapping;
}
}
Reactive Security Configuration
@Configuration
@EnableWebFluxSecurity
@EnableReactiveMethodSecurity
public class SecurityConfig {
@Bean
public SecurityWebFilterChain securityWebFilterChain(ServerHttpSecurity http) {
return http
.csrf(ServerHttpSecurity.CsrfSpec::disable)
.cors(cors -> cors.configurationSource(corsConfigSource()))
.authorizeExchange(auth -> auth
.pathMatchers("/api/public/**").permitAll()
.pathMatchers("/api/admin/**").hasRole("ADMIN")
.anyExchange().authenticated()
)
.oauth2ResourceServer(oauth2 -> oauth2.jwt(Customizer.withDefaults()))
.build();
}
@Bean
public CorsConfigurationSource corsConfigSource() {
CorsConfiguration config = new CorsConfiguration();
config.addAllowedOrigin("http://localhost:3000");
config.addAllowedMethod("*");
config.addAllowedHeader("*");
UrlBasedCorsConfigurationSource source = new UrlBasedCorsConfigurationSource();
source.registerCorsConfiguration("/**", config);
return source;
}
}
Operator Quick Reference
Transformation:
map() - Transform each element
flatMap() - Async transformation (1:N)
flatMapMany() - Mono to Flux transformation
Filtering:
filter() - Keep matching elements
take() - Limit elements
skip() - Skip elements
distinct() - Remove duplicates
Combination:
merge() - Interleave multiple sources
concat() - Sequential combination
zip() - Pair elements from sources
Error Handling:
onErrorResume() - Fallback publisher
onErrorReturn() - Fallback value
retry() - Retry on error
timeout() - Fail after duration
Side Effects:
doOnNext() - Peek at elements
doOnError() - React to errors
doFinally() - Cleanup actions
log() - Logging
Version History
| Version | Date | Changes |
|---|
| 2.0.0 | 2024-12-30 | Spring Boot 3.x, R2DBC, SSE/WebSocket patterns |
| 1.0.0 | 2024-01-01 | Initial release |