From spring
Build Kafka producers and consumers in Spring with `KafkaTemplate`, `@KafkaListener`, topic declarations, retries, dead-letter topics, acknowledgment strategies, and embedded Kafka tests. Use this skill when building Kafka producers and consumers in Spring with `KafkaTemplate`, `@KafkaListener`, topic declarations, retries, dead-letter topics, acknowledgment strategies, and embedded Kafka tests.
npx claudepluginhub ririnto/sinon --plugin springThis skill uses the workspace's default tool permissions.
Use this skill when building Kafka producers and consumers in Spring with `KafkaTemplate`, `@KafkaListener`, topic declarations, retries, dead-letter topics, acknowledgment strategies, and embedded Kafka tests.
Mandates invoking relevant skills via tools before any response in coding sessions. Covers access, priorities, and adaptations for Claude Code, Copilot CLI, Gemini CLI.
Share bugs, ideas, or general feedback.
Use this skill when building Kafka producers and consumers in Spring with KafkaTemplate, @KafkaListener, topic declarations, retries, dead-letter topics, acknowledgment strategies, and embedded Kafka tests.
Use spring-kafka for Kafka producers, consumers, listener containers, offsets, retry topics, dead-letter topics, and Kafka-specific testing.
spring-amqp or spring-pulsar for RabbitMQ or Pulsar semantics and client APIs.The ordinary Spring Kafka job is:
KafkaTemplate and consume through @KafkaListener, defaulting to container-managed acknowledgment unless offset control requires a manual strategy.| Situation | Use |
|---|---|
| Application sends records into Kafka | KafkaTemplate |
| Application consumes records from Kafka | @KafkaListener |
| Listener should commit after successful processing with the ordinary container flow | container-managed acknowledgment |
| Listener must control when acknowledgment is requested instead of using the ordinary container-managed path | manual acknowledgment |
| Listener should process many records together | batch listener |
| Ordinary failure path needs delayed reprocessing and a DLT | @RetryableTopic |
Keep the default path small: one producer, one listener, one serialization strategy per topic, container-managed acknowledgment unless explicit offset control is required, and one explicit retry/DLT policy.
Use Spring Kafka for application code and the Kafka test module for integration tests.
The current stable Spring Kafka line is 4.0.4. The 4.1.x line is still milestone-only and should be treated as upcoming until it reaches GA. Spring Boot 4.0.x manages Spring Kafka 4.0.x; older Spring Boot 3.5.x and 3.4.x applications remain on the 3.3.x line and should be treated as a separate compatibility branch.
<dependencies>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
@Bean
NewTopic paymentsTopic() {
return TopicBuilder.name("payments")
.partitions(3)
.replicas(1)
.build();
}
spring:
kafka:
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring.json.trusted.packages: "com.example.events"
spring.json.value.default.type: "com.example.events.PaymentEvent"
@Service
class PaymentPublisher {
private final KafkaTemplate<String, PaymentEvent> kafka;
PaymentPublisher(KafkaTemplate<String, PaymentEvent> kafka) {
this.kafka = kafka;
}
CompletableFuture<SendResult<String, PaymentEvent>> publish(PaymentEvent event) {
return kafka.send("payments", event.paymentId(), event);
}
}
@Component
class PaymentListener {
private final PaymentProcessor processor;
private final AtomicReference<String> lastSeenPaymentId = new AtomicReference<>();
PaymentListener(PaymentProcessor processor) {
this.processor = processor;
}
@KafkaListener(topics = "payments", groupId = "billing")
void handle(PaymentEvent event) {
processor.process(event);
lastSeenPaymentId.set(event.paymentId());
}
String lastSeenPaymentId() {
return lastSeenPaymentId.get();
}
}
@Configuration
@EnableKafkaRetryTopic
class KafkaRetryConfiguration {
}
@RetryableTopic(attempts = "4", backoff = @Backoff(delay = 1000, multiplier = 2.0))
@KafkaListener(topics = "orders")
void process(OrderEvent event) {
service.process(event);
}
@DltHandler
void deadLetter(OrderEvent event) {
audit.failed(event);
}
@Bean
ConcurrentKafkaListenerContainerFactory<String, PaymentEvent> manualAckKafkaListenerContainerFactory(ConsumerFactory<String, PaymentEvent> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, PaymentEvent> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
return factory;
}
@KafkaListener(topics = "payments", groupId = "billing", containerFactory = "manualAckKafkaListenerContainerFactory")
void handle(PaymentEvent event, Acknowledgment ack) {
processor.process(event);
ack.acknowledge();
}
Use manual acknowledgment only when listener code must control the commit point explicitly.
Keep the ordinary path on single-record listeners unless throughput or downstream batching requirements justify batch consumption. Open references/batch-listeners.md when the listener should actually switch to batch mode.
@SpringBootTest
@EmbeddedKafka(topics = "payments", partitions = 1)
class PaymentFlowTests {
@Autowired
PaymentPublisher publisher;
@Autowired
PaymentListener listener;
@Test
void producerAndListenerAgreeOnThePaymentEventShape() {
publisher.publish(new PaymentEvent("p-1"));
await().untilAsserted(() -> assertThat(listener.lastSeenPaymentId()).isEqualTo("p-1"));
}
}
Return these artifacts for the ordinary path:
payments
orders
orders-dlt
billing
@RetryableTopic(attempts = "4")
List<T> batches instead of one record at a time.@RetryableTopic is not enough and the listener needs explicit DefaultErrorHandler, recoverers, or deeper retry classification.