Help us improve
Share bugs, ideas, or general feedback.
Architect, build, and debug Kafka Streams apps (JVM-embedded stream processing). Use when user mentions KStream, KTable, topology, TopologyTestDriver, StreamsBuilder, interactive queries, GlobalKTable, joins/windows/aggregations, or debugging issues (rebalancing, state stores, lag, deserialization errors). Do NOT trigger for Flink, connectors, CDC, or plain producer/consumer.
npx claudepluginhub confluentinc/agent-skills --plugin streaming-skills-pluginHow this skill is triggered — by the user, by Claude, or both
Slash command
/streaming-skills-plugin:kafka-streams-programmingThe summary Claude sees in its skill listing — used to decide when to auto-load this skill
JVM-embedded stream processing library with no separate cluster.
evals/evals.jsonreferences/architecture.mdreferences/build-templates.mdreferences/cli-commands.mdreferences/config-baseline.mdreferences/debugging.mdreferences/docker-compose.mdreferences/production-hardening.mdreferences/schema-patterns.mdreferences/topology-patterns.mdreferences/verification.mdscripts/create-topics.shscripts/produce-test-data.shscripts/teardown.shscripts/verify-output.shScaffolds a production-ready Python Kafka producer/consumer with confluent-kafka-python, Schema Registry, graceful shutdown, idempotent producer, and tests. Discovers topic and schema from any attached Kafka MCP server.
Scan a project to identify Kafka applications, extract schemas from data models, tag PII fields, generate Terraform for Confluent Schema Registry registration, and produce a migration report with rollout ordering. Use this skill when a user asks to analyze a folder or repo for Kafka usage, extract schemas, audit producer/consumer configurations, or generate Terraform for Schema Registry.
Guides real-time data processing pipelines, Kafka/Flink framework selection, event-driven architectures. Covers batch vs streaming, watermarks, windows, state management.
Share bugs, ideas, or general feedback.
JVM-embedded stream processing library with no separate cluster.
Do NOT read all reference files upfront. Read ONLY what you need, when you need it.
references/topology-patterns.md § Joins Decision Tree onlyreferences/build-templates.md when writing build files, not beforereferences/debugging.md for that symptomNever read multiple files preemptively "just in case"
Before answering in any mode (Architect, Build, Debug), confirm the target environment if the user hasn't stated it: Apache Kafka | Confluent Platform | Confluent Cloud. Versions/auth shape every recommendation — KIP-1071 support, SASL config, ACL model, transactional-id expiry, CLI tool names all branch on this. Skip the question only if the user already named the environment.
Determine the user's intent and enter the appropriate mode:
| User intent | Mode | What to do |
|---|---|---|
| "I need to process events from topic X..." / "Build me a KS app..." / "I want to aggregate/filter/join..." | Build | Go to Build Mode |
| "How should I design my topology?" / "Should I use a KTable or GlobalKTable?" / "What join type do I need?" / "How do I handle late events?" | Architect | Go to Architect Mode |
| "My Streams app is stuck/slow/crashing..." / "Why am I getting rebalancing loops?" / "How do I interpret this metric?" | Debug | Go to Debug Mode |
If unclear, default to Architect — understand the problem before generating code.
If user asks for generic stream processing on CC without mentioning KS, briefly offer Flink as alternative. Don't lecture.
Design the right topology. Translate user's data problem into KS primitives.
Confirm target environment first (see preamble). Then ask (skip if answered): What data (topics)? What output? Relationship between inputs (combine/enrich/group)?
Match problem to pattern (read references/topology-patterns.md only for the specific pattern needed). Present: why it fits, data flow in plain English, KS primitives involved, tradeoffs/alternatives.
When needed, read only the relevant section:
references/topology-patterns.md § Joins Decision Treereferences/topology-patterns.md § Windowing Decision Treereferences/topology-patterns.md § Enrichment Patternsreferences/topology-patterns.md § Exactly-Once (walk through before recommending — at-least-once is simpler if downstream can dedupe)After user confirms, go to Build Mode.
Generate a complete, runnable Kafka Streams project.
Ask (skip if already answered):
references/config-baseline.md when generating config)references/cli-commands.md if needed)references/architecture.md or references/production-hardening.md § Deployment Sizing if needed)Present plan: topics to create (source/output/DLQ), schemas to register. Changelog/repartition topics auto-created by KS. If the user says input topics already exist, omit them from create-topics.sh — the script should only create new topics (typically output + DLQ).
Generate: project structure, schemas, App.java, TopologyBuilder.java, config, simplelogger.properties, docker-compose (if local), scripts, TopologyTest.java, .env.example, monitoring comments.
Read references only as needed:
references/topology-patterns.md for the specific patternreferences/build-templates.mdreferences/schema-patterns.mdreferences/config-baseline.md for env-specific blocksscripts/create-topics.sh, scripts/teardown.shreferences/docker-compose.mdGradle: Run gradle wrapper --gradle-version 8.12 after creating build files.
If user wants sample data: generate SampleDataProducer.java and produce task.
Trigger: User says "production"/"prod"/"deploy" or specifies K8s/ECS/Docker Swarm or requests multiple instances.
Add production components (read references/production-hardening.md for details if needed): Logback JSON logging, logback.xml, health check endpoint, Dockerfile with JVM tuning, KIP-1034 DLQ handler, K8s YAML (if K8s), shadow/fat jar plugin.
Explain topology, config choices, how to run, what to monitor. Mention group.protocol=streams (KIP-1071) provides 50-80% faster rebalancing (requires AK 4.2+/CP 8.2+).
You must actually start the app against a real broker and observe it reach RUNNING before declaring the task done. Generated code that compiles and passes TopologyTestDriver tests can still fail at startup — version-mismatch NoClassDefFoundErrors, silent logger fallbacks, missing runtime deps, and import-path errors all slip past compile + test and only surface against a real broker / Schema Registry. A green build is not a working app.
Branch on the target environment chosen in Step 1:
Local (Apache Kafka or Confluent Platform via the generated docker-compose.yml):
docker compose up -d and wait for Kafka + SR to be healthy (docker compose ps, or curl SR /subjects)./create-topics.sh./gradlew run or mvn exec:java) so you can read its logs while it runsState transition from REBALANCING to RUNNING within ~30s. If you don't see it, read the actual stack trace, diagnose via references/debugging.md § Startup Failures, fix, restart, re-verifydocker compose down (or leave running if the user wants to keep iterating — ask)Confluent Cloud: You usually cannot run end-to-end yourself because the cluster + SR API keys are the user's. Do the most you can without them, then hand off the rest:
.env has real CC creds (the user has set up a real .env): run the app locally pointed at CC (./gradlew run auto-loads .env) and follow steps 3–5 above. Don't skip just because it's CC — if you have creds, run it../gradlew build (compile + unit tests) and report the result./create-topics.sh --cloud, ./gradlew run, the consume command from references/verification.md § Confluent Cloud) and what success looks like (State transition from REBALANCING to RUNNING, records on the output topic)In the handoff, state plainly which of the above you did. If you ran it and saw RUNNING, say so. If you only compiled, say only that. Don't imply a runtime verification you didn't perform.
For CC consume commands, schema-aware producers, and reset procedures, read references/verification.md.
| Symptom | Category | Go to |
|---|---|---|
| App crashes on startup | Startup failure | references/debugging.md § Startup Failures |
| App runs but no output / stops processing | Processing stall | references/debugging.md § Processing Stalls |
| Rebalancing loops / constant rebalancing | Rebalancing | references/debugging.md § Rebalancing Issues |
| High lag / slow processing | Performance | references/debugging.md § Performance |
| Deserialization errors / poison pills | Data quality | references/debugging.md § Deserialization Errors |
| State store issues (corruption, growth, recovery) | State | references/debugging.md § State Store Issues |
Thread failures / StreamsUncaughtExceptionHandler | Thread health | references/debugging.md § Thread Failures |
| Memory issues (OOM, high heap, RocksDB) | Memory | references/debugging.md § Memory Issues |
Confirm target environment first (see preamble) — most debug paths branch on it. Then ask for: error message, config, KS/Java versions, new app or regression?
Read the relevant section in references/debugging.md for the identified category. Provide fix with explanation.
Non-negotiable defaults. Apply all. Read reference files only if you need implementation details.
SpecificAvroSerde, KafkaProtobufSerde, KafkaJsonSchemaSerde). Set schema.registry.url, default.key.serde, default.value.serde. JSON Schema: set json.value.type. Protobuf: set specific.protobuf.value.type (references/config-baseline.md)group.protocol=streams (default). Remove if UnsupportedVersionException. Unsupported: static membership, regex topics, standby replicas, warm-up replicas (references/topology-patterns.md § Assignment Strategy)DeserializationExceptionHandler, ProcessingExceptionHandler (KIP-1034), ProductionExceptionHandler, StreamsUncaughtExceptionHandler. Use MaxFailures pattern for uncaught (references/production-hardening.md § Error Handling)ensure.explicit.internal.resource.naming=truestreams.close(30s) on SIGTERM/SIGINTmetrics.recording.level=INFO (references/config-baseline.md)simplelogger.properties (references/build-templates.md)statestore.cache.max.bytes=0 to avoid non-deterministic assertions.java.time.Instant for timestamp-millis/timestamp-micros, LocalDate for date, BigDecimal for decimal, etc. Never use raw long/int literals with generated setter methods — use Instant.EPOCH, Instant.now(), Instant.ofEpochMilli(...). Use Instant.isAfter()/isBefore() instead of Math.max()/Math.min() for timestamp comparisons. Applies to topology code, aggregation initializers, producers, AND test helpers (references/schema-patterns.md § Java type mapping).scripts/: create-topics.sh (pre-create topics, --cloud), teardown.sh (delete topics/state, --cloud), produce-test-data.sh (generate if requested).
references/topology-patterns.md — design, joins, windows, aggregations | references/architecture.md — internals, sizing | references/debugging.md — troubleshooting | references/config-baseline.md — config | references/build-templates.md — project structure | references/schema-patterns.md — Avro/Protobuf/JSON | references/production-hardening.md — prod setup | references/cli-commands.md — CLI | references/docker-compose.md — local dev | references/verification.md — checklists