Help us improve
Share bugs, ideas, or general feedback.
From kafka-skills
Scaffolds a production-ready Python Kafka producer/consumer with confluent-kafka-python, Schema Registry, graceful shutdown, idempotent producer, and tests. Discovers topic and schema from any attached Kafka MCP server.
npx claudepluginhub lensesio/agentic-engineering-for-apache-kafka --plugin kafka-skillsHow this skill is triggered — by the user, by Claude, or both
Slash command
/kafka-skills:kafka-python-client [optional: target environment name] [optional: keyword from the topic name][optional: target environment name] [optional: keyword from the topic name]This skill is limited to the following tools:
The summary Claude sees in its skill listing — used to decide when to auto-load this skill
Generates a production-ready Python project that produces to and consumes from a Kafka topic using `confluent-kafka-python`, with Schema Registry (JSON Schema), graceful shutdown, idempotent producer, header-based schema identification, and tests. The agent should discover everything about the target topic (name, partitions, registered schema) from the live cluster via whichever Kafka MCP serve...
Use when the user wants to build a Python Kafka producer or consumer, add Schema Registry to existing Python code, migrate from raw JSON to schema-backed serialization, or scaffold a confluent-kafka-python project for Confluent Cloud or local Docker.
Guides Kafka topic design (partitions, replication), KafkaJS idempotent producers/consumers, consumer lag monitoring, exactly-once semantics, schema registry, compacted topics, and DLQ patterns. Use for reliable streaming implementations.
Creates and validates Redpanda Connect YAML pipeline configurations from descriptions like Kafka to S3 or repairs broken configs using rpk scaffold and lint tools.
Share bugs, ideas, or general feedback.
Generates a production-ready Python project that produces to and consumes from a Kafka topic using confluent-kafka-python, with Schema Registry (JSON Schema), graceful shutdown, idempotent producer, header-based schema identification, and tests. The agent should discover everything about the target topic (name, partitions, registered schema) from the live cluster via whichever Kafka MCP server is attached — Lenses MCP, Confluent's, Aiven's, or any other — before asking the user. Only fall back to questions if no Kafka MCP is attached or discovery returns nothing.
Target environment and keyword: $ARGUMENTS
Open your first reply with: "Running the kafka-python-client skill to scaffold this project."
Copy this checklist and track your progress:
Scaffold Progress:
- [ ] Step 1: Discover topic + schema via the attached Kafka MCP
- [ ] Step 2: Hard gate - confirm with user before generating
- [ ] Step 3: Generate the project files
- [ ] Step 4: Run pytest against the generated tests
- [ ] Step 5: Run validation gate (kafka-topic-audit + kafka-perf-review)
- [ ] Step 6: Hand back with run instructions
references/mcp-discovery.md)references/tests/ and fix any failures (fix the code, not the tests)kafka-topic-audit against the target topic and kafka-perf-review against the generated producer.py and consumer.pyRead references/mcp-discovery.md for the full probing procedure, vendor-specific tool-name hints and fallbacks. The high-level shape:
mcp__Lenses__* (Lenses MCP — reference implementation), mcp__Confluent__*, mcp__Aiven__*, custom servers tagged for Kafka.list_environments; Confluent: list_clusters; others vary).list_datasets(search=...); raw Kafka admin MCPs: list_topics then filter).get_dataset; Confluent: get_schema(subject=<topic>-value); bare Schema Registry MCPs: HTTP GET against /subjects/<topic>-value/versions/latest).get_topic_metadata; most others expose a describe_topic or get_topic equivalent).GROUP_ID (Lenses: list_consumer_groups_by_topic; most others: list_consumer_groups).If multiple candidate topics match the keyword, ask the user which one — don't guess. List them with their partition counts and field names so the choice is obvious.
If no Kafka MCP is attached, or discovery returns nothing, fall back to the hard gate in Step 2 and ask the user for the missing pieces directly. Be explicit about what was lost: "Without a Kafka MCP I can't verify the schema matches what's registered against the topic."
Expected output of Step 1: a recap like
Discovered via <MCP server name> (Lenses MCP / Confluent MCP / ...):
- Cluster: staging
- Topic: orders.payment.completed
- Partitions: 12
- RF: 1 (single-broker cluster — kafka-topic-audit will flag this)
- Value schema: JSON Schema, 6 required fields + 1 optional (pulled from registry)
- Suggested group: payments-service-<8-char-random>
Mandatory. Do not write any file before sending one message that:
Then STOP and wait for the user's reply. The recap exists so the user can spot a misread before any files are written, and it is mandatory even on prompts that look fully specified.
The only way to skip the gate is if the user has already confirmed the recap earlier in this conversation.
Defaults to apply once confirmed:
Producer from confluent_kafka unless the user signals an async runtime, in which case offer AIOProducer. Async signals include: explicit mention of asyncio; any async-first web framework (FastAPI, Starlette, Quart, Litestar, BlackSheep, Robyn, aiohttp, Sanic, Tornado); ASGI applications generally; or async task systems (ARQ, Dramatiq, Celery with async tasks). If the user mentions Django, default to sync Producer unless they explicitly say async ASGI. If unsure, ask.AIOConsumer. The poll loop typically runs for hours or days and should never block the event loop../payments-service/ (or derive from the user's wording).Create this layout in the chosen project directory:
<project-dir>/
├── producer.py
├── consumer.py
├── common.py
├── schemas/
│ └── value.schema.json
├── tests/
│ └── test_project.py
├── .env.example
├── pyproject.toml
└── README.md
The project uses uv to manage the Python environment and dependencies — not pip, venv, pipenv, poetry or conda. Dependencies are declared in pyproject.toml; the environment is provisioned with uv sync; commands run via uv run. Generated documentation must reflect this throughout.
Use the templates in references/:
producer-template.py for producer.pyconsumer-template.py for consumer.pycommon-template.py for common.pytests-template.py for tests/test_project.pyWrite schemas/value.schema.json directly from the schema pulled in Step 1 — do not regenerate or paraphrase it. The whole point of MCP discovery is that the schema is the real one.
.env.examplePopulate with values from MCP discovery where possible. For a local Lenses CE setup:
KAFKA_ENV=local
BOOTSTRAP_SERVER=localhost:9092
TOPIC=<topic from discovery>
SCHEMA_REGISTRY_URL=http://localhost:8081
CLIENT_ID=payments-service
GROUP_ID=<suggested group from discovery>
For a remote managed cluster, replace BOOTSTRAP_SERVER and SCHEMA_REGISTRY_URL with whatever the MCP's dataset/cluster-metadata call returned, and add SASL credentials placeholders.
pyproject.tomlDeclare dependencies in a PEP 621 [project] table so uv can resolve and lock them. Do not also emit a requirements.txt — pyproject.toml is the single source of truth and uv sync produces a uv.lock next to it.
[project]
name = "<project-name>"
version = "0.1.0"
description = "Kafka producer and consumer scaffolded by kafka-python-client"
requires-python = ">=3.10"
dependencies = [
"confluent-kafka[json,schema_registry]>=2.13.2",
"jsonschema",
"python-dotenv",
"requests>=2.25.0",
"httpx",
"authlib",
"cachetools",
"attrs",
"typing_extensions",
]
[dependency-groups]
dev = [
"pytest",
"pytest-asyncio",
]
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
Every third-party package imported by producer.py, consumer.py or common.py must appear in [project].dependencies. Test-only packages (pytest, pytest-asyncio) belong in the dev dependency group, installed automatically by uv sync and excluded from production installs via uv sync --no-dev. Include pytest-asyncio because the consumer is always async. The async Schema Registry client imports httpx and authlib at runtime even though they aren't declared as confluent-kafka dependencies — list them explicitly.
Either edit pyproject.toml by hand or generate it incrementally with uv init --bare <project-dir> followed by uv add calls, e.g.:
uv init --bare <project-dir>
cd <project-dir>
uv add "confluent-kafka[json,schema_registry]>=2.13.2" jsonschema python-dotenv "requests>=2.25.0" httpx authlib cachetools attrs typing_extensions
uv add --dev pytest pytest-asyncio
README.mdA short README with: prerequisites (uv installed, Python 3.10+ — uv will fetch a managed interpreter automatically if none is present — and Docker if running Kafka locally), setup commands (uv sync to create .venv and install everything), how to register the schema if it isn't already (link to schemas/value.schema.json), how to run the producer and consumer with uv run, and how to run the tests with uv run pytest. The README must not reference pip, python -m venv, source .venv/bin/activate or requirements.txt — uv handles all of that.
Read references/common-mistakes.md for the full lookup table. The short version:
main() and hand it to produce() as an argument. Never build a new one per call.auto.register.schemas=False and use.latest.version=True, and publish the schema from a small register_schema() helper that calls sr_client.register_schema() directly and lets any exception bubble up.JSONSerializer, AvroSerializer, and ProtobufSerializer do not share the same positional order in confluent-kafka-python. Passing schema_str and schema_registry_client by name on every call keeps the call site identical across formats and avoids the TypeError: ... got multiple values for argument 'schema_registry_client' surprise.enable.idempotence=true, acks=all.flush() in finally. Async consumer: unsubscribe() then close(), both awaited.key=<entity_id>.encode("utf-8") for any per-entity event stream so partition ordering is preserved.AdminClient.list_topics() for the broker, an HTTP GET on /subjects for the schema registry.Use uv for the environment and uv run to execute commands inside it. uv sync creates .venv/ next to pyproject.toml, resolves the dependencies declared there, writes a uv.lock, and installs everything in one step — no manual python -m venv or pip install needed.
cd <project-dir>
uv sync
uv run pytest tests/
If uv is missing on the user's machine, point them at the official installer (curl -LsSf https://astral.sh/uv/install.sh | sh) before continuing — do not fall back to pip.
If any test fails, fix the generated code (not the tests) until they pass.
This is what differentiates this skill from a one-shot scaffolder. After Step 4, automatically run two existing skills as a quality gate.
producer.py and consumer.py should pass all of enable.idempotence, acks=all, linger.ms > 0, compression.type set, max.poll.interval.ms sensible, graceful shutdown present. If the audit finds anything, fix it in the generated code and re-run.Final message to the user must include:
schemas/value.schema.json came from the live cluster, not from inferencereferences/test-cases.md)pytest tests/ passes on first run > 80% of the time, after at most one self-fixUser says: "build me a Python consumer for the payment events on our staging Kafka"
Actions:
list_environments → pick staging. list_datasets(search="payment") returns three topics. Three are presented to the user with their partition counts and message rates; the user picks orders.payment.completed. get_dataset pulls the registered JSON Schema. get_topic_metadata returns 12 partitions, RF=1. (With Confluent's MCP the equivalent calls would be list_clusters → list_topics → get_schema → describe_topic — same shape, different names. See references/mcp-discovery.md.)producer.py, consumer.py, common.py, schemas/value.schema.json, tests/test_project.py, .env.example, pyproject.toml, README.md.kafka-topic-audit flags RF=1 (expected on single-broker CE — passed through with a note). kafka-perf-review confirms idempotent producer, graceful shutdown, sensible max.poll.interval.ms.Result: A production-shaped project the user can run end-to-end in under five minutes, with a schema that exactly matches what's registered against the topic.
User says: "add a Kafka producer to my Quart service for orders.payment.refunded" — the same flow applies to any async runtime: FastAPI / Starlette / Litestar / BlackSheep / Robyn / aiohttp / Sanic / Tornado / plain asyncio workers / ASGI-based services.
Actions:
AIOProducer as the producer style for Step 2.@app.before_serving / @app.after_serving; FastAPI: lifespan handler; Starlette: lifespan handler; aiohttp: on_startup / on_cleanup; Sanic: @app.before_server_start / @app.before_server_stop; Litestar: on_startup / on_shutdown; plain asyncio script: instantiate in main(), wrap in try/finally), and asks the user for the app's root file. User confirms.common.py if missing, instantiate AIOProducer once using the framework's startup hook, add a produce() helper that takes the producer as a parameter, ensure await producer.flush() then await producer.close() are wired into the framework's shutdown hook. Use AsyncSchemaRegistryClient (not the sync one) so registry calls don't block the event loop.kafka-perf-review on the new code confirms async best practices (async SR client, AsyncJSONSerializer, kwargs-only construction, graceful shutdown wired to the framework's native lifecycle).Result: Existing service gains a Kafka producer using its native startup/shutdown lifecycle, without losing its existing structure.
User says: "scaffold a Python Kafka consumer for orders.payment.completed" but no Kafka MCP server is connected (or only non-Kafka MCPs like Atlassian/GitHub are attached).
Actions:
Result: Scaffold still works, but the agent explicitly tells the user what was lost without MCP — exact phrasing: "Without a Kafka MCP I can't verify the schema is what's actually registered against the topic. Run kafka-schema-review after you've registered this schema to confirm it matches."
Cause: Topic doesn't exist in this cluster, or the MCP hasn't indexed it yet, or the search keyword is too narrow.
Solution: Retry with a broader keyword. If still empty, use the MCP's full-list call (Lenses: list_topics; Confluent: list_topics; etc.) and let the user pick from the full list. If the topic genuinely doesn't exist, ask the user whether to create it via the MCP's topic-creation tool or abort.
Cause: Topic exists but no schema is registered against <topic>-value.
Solution: Ask the user to either (a) provide a JSON Schema to register, or (b) confirm they want to scaffold a schema-less consumer (in which case fall back to JSONDeserializer without Schema Registry — but warn this is brittle and recommend registering a schema later).
AIOProducer / AIOConsumer mocksCause: pytest-asyncio isn't installed, or the test fixture is sync.
Solution: Confirm pytest-asyncio is in pyproject.toml under [dependency-groups].dev, re-run uv sync, and ensure the test functions are decorated with @pytest.mark.asyncio.
TypeError: ... got multiple values for argument 'schema_registry_client'Cause: Serializer/deserializer constructed with positional arguments. JSON, Avro and Protobuf classes have different positional signatures.
Solution: See references/common-mistakes.md row 8. Always pass schema_str and schema_registry_client as keyword arguments.
Cause: The skill version drifted from the audit thresholds in kafka-perf-review.
Solution: Fix the generated code to satisfy the audit (the audit is the source of truth for "what good looks like" in this repo) and re-run the validation gate. If the audit threshold itself seems wrong, raise a PR against kafka-perf-review/references/.
When handing back in Step 6, use this structure:
## Project scaffolded
Generated at: <project-dir>/
### Files written
- producer.py
- consumer.py
- common.py
- schemas/value.schema.json (pulled from registry — matches live schema)
- tests/test_project.py
- .env.example
- pyproject.toml (uv-managed; uv.lock generated by `uv sync`)
- README.md
### Run it
# One-off setup (creates .venv and installs everything via uv)
cd <project-dir>
uv sync
# Terminal 1 — producer
cd <project-dir>
uv run python producer.py
# Terminal 2 — consumer
cd <project-dir>
uv run python consumer.py
### Validation gate
kafka-topic-audit findings:
- [topic-name] <severity> <finding>
kafka-perf-review findings:
- [file:line] <severity> <finding>
### Notes
- Schema in `schemas/value.schema.json` was pulled from `<topic>-value` in the registry, not inferred.
- Consumer group ID: `<group-id>` (unique, no collision).
- Idempotent producer enabled, graceful shutdown implemented.