From spring
Build Apache Pulsar producers, consumers, and readers in Spring with `PulsarTemplate`, `@PulsarListener`, schema mapping, and DLQ patterns. Use this skill when building Apache Pulsar producers, consumers, or readers in Spring with `PulsarTemplate`, `@PulsarListener`, schema mapping, subscription type and DLQ decisions, customizers, transactions, or Pulsar-specific testing and administration.
npx claudepluginhub ririnto/sinon --plugin springThis skill uses the workspace's default tool permissions.
Use this skill when building Apache Pulsar producers, consumers, or readers in Spring with `PulsarTemplate`, `@PulsarListener`, schema mapping, subscription type and DLQ decisions, customizers, transactions, or Pulsar-specific testing and administration.
references/admin-and-topic-provisioning.mdreferences/batch-consumption.mdreferences/client-authentication-and-tls.mdreferences/consumer-acknowledgment.mdreferences/dead-letter-policy.mdreferences/error-handling-redelivery-and-dlt.mdreferences/observability.mdreferences/partitioned-topics-and-key-routing.mdreferences/producer-consumer-customizers-and-properties.mdreferences/reactive-pulsar.mdreferences/readers-and-replay.mdreferences/schema-mapping-and-compatibility.mdreferences/subscription-types-and-concurrency.mdreferences/testing-with-testcontainers.mdreferences/tombstones.mdreferences/transactions-and-coordinated-ack.mdMandates invoking relevant skills via tools before any response in coding sessions. Covers access, priorities, and adaptations for Claude Code, Copilot CLI, Gemini CLI.
Share bugs, ideas, or general feedback.
Use this skill when building Apache Pulsar producers, consumers, or readers in Spring with PulsarTemplate, @PulsarListener, schema mapping, subscription type and DLQ decisions, customizers, transactions, or Pulsar-specific testing and administration.
Use spring-pulsar for Pulsar producer and consumer code, listeners, readers, topic naming, partitions, and Pulsar-specific admin or transaction decisions.
spring-kafka or spring-amqp for Kafka or RabbitMQ semantics.The ordinary Spring Pulsar job is:
PulsarTemplate and consume through @PulsarListener.Use the Boot starter for ordinary Pulsar application code.
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-pulsar</artifactId>
</dependency>
</dependencies>
spring:
pulsar:
client:
service-url: pulsar://localhost:6650
producer:
topic-name: shipments
consumer:
subscription:
name: warehouse
Open references/client-authentication-and-tls.md when the cluster requires authentication, TLS, or separate administration credentials.
persistent://public/default/shipments
Start with explicit topic and subscription names in one namespace. Add tenant or namespace abstraction only when the deployment truly requires it.
@Service
class ShipmentPublisher {
private final PulsarTemplate<ShipmentEvent> pulsar;
ShipmentPublisher(PulsarTemplate<ShipmentEvent> pulsar) {
this.pulsar = pulsar;
}
void publish(ShipmentEvent event) {
pulsar.send("shipments", event);
}
}
@Component
class ShipmentListener {
private final ShipmentService service;
ShipmentListener(ShipmentService service) {
this.service = service;
}
@PulsarListener(topics = "shipments", subscriptionName = "warehouse")
void handle(ShipmentEvent event) {
service.handle(event);
}
}
@PulsarListener(
topics = "shipments",
subscriptionName = "warehouse",
schemaType = SchemaType.JSON,
subscriptionType = SubscriptionType.Shared,
concurrency = "3"
)
void handle(ShipmentEvent event) {
service.handle(event);
}
@PulsarReader(topics = "shipments", startMessageId = "earliest")
void replay(ShipmentEvent event) {
audit.record(event);
}
pulsar.executeInTransaction(operations -> {
operations.send("shipments", event);
return null;
});
Use transactions only when the workflow truly needs grouped Pulsar writes.
shipments
persistent://public/default/shipments
warehouse
@PulsarListener(topics = "shipments", subscriptionName = "warehouse")
DeadLetterPolicy.builder()
.maxRedeliverCount(5)
.deadLetterTopic("shipments-dlt")
.build();
Return:
@SpringBootTest
@Testcontainers
class ShipmentFlowTest {
static CountDownLatch deliveries = new CountDownLatch(1);
static AtomicReference<ShipmentEvent> received = new AtomicReference<>();
@Container
static PulsarContainer pulsar = new PulsarContainer("apachepulsar/pulsar:3.3.2");
@DynamicPropertySource
static void pulsarProperties(DynamicPropertyRegistry registry) {
registry.add("spring.pulsar.client.service-url", pulsar::getPulsarBrokerUrl);
registry.add("spring.pulsar.admin.service-url", pulsar::getHttpServiceUrl);
}
@Autowired
PulsarTemplate<ShipmentEvent> pulsarTemplate;
@Test
void sendsAndReceivesJsonPayload() throws Exception {
pulsarTemplate.send("shipments", new ShipmentEvent("shipment-42"));
assertAll(
() -> assertThat(deliveries.await(10, TimeUnit.SECONDS)).isTrue(),
() -> assertThat(received.get().shipmentId()).isEqualTo("shipment-42")
);
}
@Component
static class TestListener {
@PulsarListener(topics = "shipments", subscriptionName = "warehouse-test", schemaType = SchemaType.JSON)
void handle(ShipmentEvent event) {
received.set(event);
deliveries.countDown();
}
}
}
Pin a concrete broker image only when the test must prove compatibility with a chosen Pulsar line. Otherwise, keep the image aligned with one of the supported Pulsar lines for Spring Pulsar 2.0.x.
spring.pulsar.* properties are not enough or builder customizers are required.