Help us improve
Share bugs, ideas, or general feedback.
Use when the user wants to build a Python Kafka producer or consumer, add Schema Registry to existing Python code, migrate from raw JSON to schema-backed serialization, or scaffold a confluent-kafka-python project for Confluent Cloud or local Docker.
npx claudepluginhub confluentinc/agent-skills --plugin streaming-skills-pluginHow this skill is triggered — by the user, by Claude, or both
Slash command
/streaming-skills-plugin:developing-kafka-python-clientThe summary Claude sees in its skill listing — used to decide when to auto-load this skill
<HARD-GATE>
evals/evals.jsonreferences/common.pyreferences/consumer.pyreferences/docker-compose.ymlreferences/multi-event-guide.mdreferences/order_events.schema.jsonreferences/producer.pyreferences/producer_multi_event.pyreferences/producer_multi_event_sync.pyreferences/producer_sync.pyreferences/readme-template.mdreferences/schema-generation-rules.mdreferences/test_project.pyScaffolds a production-ready Python Kafka producer/consumer with confluent-kafka-python, Schema Registry, graceful shutdown, idempotent producer, and tests. Discovers topic and schema from any attached Kafka MCP server.
Architect, build, and debug Kafka Streams apps (JVM-embedded stream processing). Use when user mentions KStream, KTable, topology, TopologyTestDriver, StreamsBuilder, interactive queries, GlobalKTable, joins/windows/aggregations, or debugging issues (rebalancing, state stores, lag, deserialization errors). Do NOT trigger for Flink, connectors, CDC, or plain producer/consumer.
Guides Kafka topic design (partitions, replication), KafkaJS idempotent producers/consumers, consumer lag monitoring, exactly-once semantics, schema registry, compacted topics, and DLQ patterns. Use for reliable streaming implementations.
Share bugs, ideas, or general feedback.
Begin by announcing: "Using the Confluent Kafka Python Client skill to guide this project."
Generate a production-ready Python project for producing to and/or consuming from Kafka using confluent-kafka-python. Supports two target environments: Confluent Cloud (managed) and Local Docker (open-source Kafka), and two producer styles: AsyncIO (non-blocking) and Synchronous (blocking). The generated code follows Confluent's best practices.
Before generating any code, work through the questions below. Skip any question the user has already answered explicitly in their prompt — do not re-ask just for form's sake. For example, "build a producer and consumer on Confluent Cloud with an async producer" already answers #2, #3, and #4; only #1, #5, #6, #7, and #8 remain.
Mandatory confirmation gate — do not skip, even if the user answered every question. Before writing any file, you MUST send one message that:
Then STOP and wait for the user's reply. Do not generate files in the same turn as the recap, and do not proceed on the assumption that a fully-specified prompt implies consent to generate immediately — the recap catches misinterpretations of the prompt and is required even when questions #1–#8 are all pre-answered. The only way to skip the gate is if the user has already confirmed the recap earlier in this conversation.
Do not assume defaults for #1, #2, or #3 — if any of these are not answered by the prompt, you must ask.
main.py, uses Flask/FastAPI/Django, etc.), do not scaffold a new project. Instead: (a) identify their existing producer or data-sending code, (b) ask whether they already have schemas registered in Schema Registry, (c) add Schema Registry integration to their existing code following the patterns in the reference files. Generate only the files they are missing (e.g., common.py, schemas/value.schema.json) and modify their existing code inline.producer.produce() calls with serializer-backed calls. Do not discard their existing code.AIOProducer): Use when code runs under an event loop — FastAPI/Starlette, aiohttp, Sanic, asyncio workers — and must not block.Producer): Use for scripts, batch jobs, and highest-throughput pipelines where the user controls threads/processes and can call poll()/flush() directly.
If the user mentions an async framework (FastAPI, aiohttp, Sanic) or uses asyncio, default to AsyncIO. If they mention scripts, batch, ETL, or don't have a preference, default to Synchronous.schemas/value.schema.json instead of generating one. If no, proceed to ask about their data fields.demo-topic)python-consumer-group)Don't ask about Schema Registry — always include it.
| Thought | Reality |
|---|---|
| "The user mentioned FastAPI, so I know it's async — skip the questions" | Still confirm. They might want a sync background worker alongside FastAPI. |
| "I'll use Avro since it's more widely used" | This skill uses JSON Schema exclusively. Explain why if asked, but don't switch. |
| "I'll skip Schema Registry to keep it simple" | Schema Registry is non-negotiable. Every project includes it. |
"I'll use auto.register.schemas=True for convenience" | Always False. Explicit registration is a core principle. |
"I'll create a producer in produce() — it's cleaner" | One producer instance, created in main(), passed as a parameter. Always. |
| "The user wants sync, so the consumer should be sync too" | Consumer is always async (AIOConsumer). This is a deliberate design decision. |
"I'll add headers= to the AIOProducer for schema ID" | AIOProducer.produce() raises NotImplementedError on headers. Only sync producers use headers. |
After gathering all answers, present a confirmation summary before generating any code:
Before I generate the project, let me confirm:
- Project type: [Greenfield scaffold / Migration of existing code]
- Environment: [Confluent Cloud (SASL_SSL) / Local Docker (PLAINTEXT)]
- Components: [Producer only / Consumer only / Both]
- Producer style: [AsyncIO (AIOProducer) / Synchronous (Producer)] (if applicable)
- Schema: [brief description of user's data fields]
- Topic: [topic name]
- Consumer group: [group ID] (if consumer)
Does this look right?
Wait for user confirmation before proceeding to Step 2. If the user corrects anything, update your understanding and re-confirm.
digraph decisions {
"Q1: Existing app?" -> "Migration path:\nmodify existing code" [label="yes"];
"Q1: Existing app?" -> "Q2: Environment?" [label="no / greenfield"];
"Q2: Environment?" -> "Cloud config\n(SASL_SSL)" [label="Confluent Cloud"];
"Q2: Environment?" -> "Local Docker config\n(PLAINTEXT) + docker-compose.yml" [label="local / docker / OSS"];
"Cloud config\n(SASL_SSL)" -> "Q3: Components?";
"Local Docker config\n(PLAINTEXT) + docker-compose.yml" -> "Q3: Components?";
"Q3: Components?" -> "Q4: Async or sync?" [label="producer requested"];
"Q3: Components?" -> "Generate consumer\n(always async AIOConsumer)" [label="consumer only"];
"Q4: Async or sync?" -> "AIOProducer path\nAsyncJSONSerializer\n(no headers support)" [label="async / event-loop"];
"Q4: Async or sync?" -> "Producer path\nJSONSerializer\n(header-based schema ID)" [label="sync / batch / ETL"];
}
Create this file structure in the user's chosen directory:
<project-dir>/
├── producer.py # (if requested)
├── consumer.py # (if requested)
├── common.py # shared config loading + verification helpers
├── schemas/
│ └── value.schema.json # JSON Schema for the message value
├── tests/
│ └── test_project.py # unit tests (always generated)
├── .env.example # template for credentials
├── requirements.txt
├── docker-compose.yml # (local Docker path only)
NEVER read, open, or display .env files. They contain API keys and secrets. Only generate .env.example with placeholder values. If the user asks you to debug a connection issue, ask them to verify their .env values themselves — do not read the file.
These principles matter because they prevent the most common production issues with Kafka Python clients:
Reuse the producer instance. Creating a new producer per message is expensive — each one opens new TCP connections, does SASL handshakes, and fetches metadata. Create one producer and reuse it for all messages. The produce function should accept the producer as a parameter, not instantiate one.
Always use Schema Registry with JSON Schema. Schema Registry enforces a contract between producers and consumers. Without it, schema changes silently break downstream consumers. This skill uses JSON Schema exclusively. Schema Registry supports Avro, Protobuf, and JSON Schema — JSON Schema is chosen because: (1) Python has first-class JSON support with no code generation step, (2) confluent-kafka-python provides JSONSerializer/JSONDeserializer out of the box, (3) it is the most approachable format for Python developers already working with JSON/dict data. If the user specifically requests Avro or Protobuf, explain this rationale and note they can switch using AvroSerializer/ProtobufSerializer from confluent_kafka.schema_registry — do not generate Avro or Protobuf code.
Register schemas as a separate explicit step before creating the serializer. Use a dedicated register_schema() function that calls sr_client.register_schema() and lets errors (auth failures, network errors, permission denials) propagate immediately — never wrap registration in a bare try/except. Then configure the serializer with auto.register.schemas=False and use.latest.version=True. This ensures the serializer never silently auto-registers and aligns with production practice where CI/CD registers schemas, not application startup.
Use the appropriate serializer for the chosen producer style: AsyncJSONSerializer / AsyncJSONDeserializer from confluent_kafka.schema_registry._async.json_schema for async, or JSONSerializer / JSONDeserializer from confluent_kafka.schema_registry.json_schema for synchronous.
Choose the right producer style. The confluent-kafka-python library offers two producer APIs:
AIOProducer from confluent_kafka.aio): Non-blocking, integrates with asyncio event loops. Use with AsyncJSONSerializer from confluent_kafka.schema_registry._async.json_schema and AsyncSchemaRegistryClient. Best for applications already running an event loop (FastAPI, aiohttp, Sanic, asyncio workers).Producer from confluent_kafka): Blocking calls with delivery callbacks. Use with JSONSerializer from confluent_kafka.schema_registry.json_schema and SchemaRegistryClient. Best for scripts, batch jobs, and highest-throughput pipelines where the user controls threads/processes and can call poll()/flush() directly.
Always ask the user which style fits their use case. The consumer always uses AIOConsumer (async) — long-running poll loops benefit from non-blocking I/O, and mixing sync/async consumer styles adds complexity with little benefit.Graceful shutdown. Async producers must flush() and close() (both awaited) before exiting. Synchronous producers must call flush() before exiting — otherwise buffered messages are lost. Consumers must unsubscribe() then close() to leave the consumer group cleanly (avoiding unnecessary rebalances). Use try/finally blocks and handle KeyboardInterrupt / signals.
Support both Confluent Cloud and local Docker. When targeting Confluent Cloud, configure SASL_SSL with PLAIN mechanism and load API keys from .env. When targeting local Docker, use PLAINTEXT with no authentication. The KAFKA_ENV environment variable (cloud or local) controls which path is used. Load all settings from environment variables via .env.
Verify connectivity before running. Use AdminClient.list_topics() to verify the broker is reachable and the topic exists before producing or consuming. Verify Schema Registry connectivity with an HTTP health check.
Always set a message key for domain events. Pass key=<entity_id>.encode("utf-8") to producer.produce() for any message that represents an entity or event stream (order events, user actions, device telemetry, transactions). Kafka partitions by key, so messages with the same key land on the same partition and preserve ordering — critical for event streams like OrderCreated → OrderUpdated → OrderCancelled where consumers must see events in order. The produce() helper in every reference file accepts a key_field parameter naming the field to use as the key (e.g., key_field="order_id", key_field="transaction_id"). Ask the user which field identifies the entity and pass it to produce(). Only leave key_field=None if the user explicitly states ordering does not matter (e.g., stateless metrics where any partition is fine).
This module handles configuration loading and connectivity verification. Use references/common.py as the template.
When the user chooses the AsyncIO producer, use references/producer.py as the template.
Key points:
produce() takes a producer instance as a parameter — it never creates onemain() and can be passed to multiple produce() callsAsyncJSONSerializer) must be awaited when calling it on a messageAIOProducer.produce() is async and returns an asyncio.Future. You must await the method to get the Future, then await the Future to get the delivered Message: future = await producer.produce(...); result = await futureAIOProducer.flush() and close() are coroutines — they must be awaited in the finally blockregister_schema() explicitly registers the schema and returns the schema ID — errors propagate immediately. create_json_serializer() creates the serializer with conf={'auto.register.schemas': False, 'use.latest.version': True}. The serializer's constructor signature is AsyncJSONSerializer(schema_str, schema_registry_client=sr_client, conf=conf) — the schema string is the first positional argument, the client and conf are keyword argumentsAIOProducer batch mode. Do not pass headers= to AIOProducer.produce() — it will raise NotImplementedError. Schema identification is handled automatically by the JSON Schema serializer's wire format prefix. See "Schema ID in Headers vs Wire Format" below for detailsWhen the user chooses the synchronous producer, use references/producer_sync.py as the template.
Key points:
produce() takes a producer instance as a parameter — it never creates onemain() and can be passed to multiple produce() callsJSONSerializer (synchronous) from confluent_kafka.schema_registry.json_schema and SchemaRegistryClient from confluent_kafka.schema_registryProducer.produce() is non-blocking — it enqueues the message. Call producer.poll(0) after each produce to serve delivery callbacks and keep the internal queue from filling upproducer.flush() after a batch to block until all in-flight messages are delivereddelivery_callback(err, msg) function to handle per-message delivery reportsflush() in the finally block ensures no buffered messages are lostregister_schema() explicitly registers the schema. create_json_serializer() creates the serializer with conf={'auto.register.schemas': False, 'use.latest.version': True}. Both return the schema IDconfluent.value.schemaId) on every produced message — this is the header-based schema identification pattern. It keeps the JSON payload clean and readable by non-Confluent consumers. See "Schema ID in Headers vs Wire Format" below for detailsUse references/consumer.py as the template.
Key points in the consumer:
unsubscribe() then close() to leave the consumer group cleanlyAsyncJSONDeserializer (no fallback to raw JSON parsing — Schema Registry is required)Synchronous producer: The reference code passes the Schema ID as a Kafka record header (confluent.value.schemaId) on every message. This keeps the JSON payload clean (no magic byte prefix), making it readable by non-Confluent consumers and debuggable with tools like kcat. This is the recommended approach for synchronous producers.
Async producer (AIOProducer): The AIOProducer does not support custom headers in batch mode (produce() raises NotImplementedError if headers= is passed). Schema identification relies on the JSON Schema serializer's wire format prefix (magic byte + schema ID prepended to the payload). This is a known limitation — do not attempt to add headers to async-produced messages.
When to use which: If downstream consumers are all Confluent-aware (using Schema Registry deserializers), both approaches work transparently. If downstream consumers are non-Confluent (plain JSON consumers), the sync producer with header-based schema ID is preferable because the message value remains clean JSON. Document this tradeoff in the generated README when producing for mixed consumer ecosystems.
Generate a JSON Schema file matching the user's data domain. The file should be placed at schemas/value.schema.json.
For example, if the user is producing financial transactions:
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Transaction",
"description": "A financial transaction event produced to Kafka.",
"type": "object",
"properties": {
"transaction_id": {
"type": "string",
"description": "Unique identifier for this transaction."
},
"amount": {
"type": "number",
"description": "Transaction amount in the specified currency.",
"default": 0
},
"currency": {
"type": "string",
"description": "ISO 4217 currency code.",
"default": ""
},
"timestamp": {
"type": "string",
"format": "date-time",
"description": "Time the transaction occurred, in ISO 8601 format."
},
"status": {
"description": "Current state of the transaction.",
"enum": ["pending", "completed", "failed", "refunded"],
"default": "pending"
},
"metadata": {
"oneOf": [{"type": "null"}, {"type": "object"}],
"description": "Optional metadata associated with the transaction.",
"default": null
}
},
"required": ["transaction_id", "amount", "currency", "timestamp", "status"]
}
Follow the rules in references/schema-generation-rules.md strictly when generating or adapting schemas to the user's domain.
When the user describes multiple event types on a single topic, follow references/multi-event-guide.md. Only suggest multi-event union schemas when the user explicitly describes multiple event types on one topic.
When the user chooses local Docker, you MUST generate a docker-compose.yml using references/docker-compose.yml as the template. This starts a single-node Kafka broker (using confluentinc/confluent-local) and a Confluent Schema Registry. The user just runs docker compose up -d to get a working Kafka environment.
IMPORTANT: The confluentinc/confluent-local image uses KRaft mode and has built-in listener names: PLAINTEXT (internal, port 29092), PLAINTEXT_HOST (external, port 9092), and CONTROLLER (port 29093). Do NOT invent custom listener names — this will conflict with the image's internal configuration and cause boot loops. Only override KAFKA_ADVERTISED_LISTENERS and KAFKA_LISTENERS using these exact listener names. The internal PLAINTEXT listener must advertise the kafka hostname (not localhost) so Schema Registry can reach the broker from within the Docker network.
Generate the appropriate .env.example based on the target environment:
Confluent Cloud:
KAFKA_ENV=cloud
BOOTSTRAP_SERVER=pkc-xxxxx.us-east-1.aws.confluent.cloud:9092
API_KEY=your-api-key
API_SECRET=your-api-secret
TOPIC=demo-topic
SCHEMA_REGISTRY_URL=https://psrc-xxxxx.us-east-2.aws.confluent.cloud
SR_API_KEY=your-sr-api-key
SR_API_SECRET=your-sr-api-secret
CLIENT_ID=python-client
GROUP_ID=python-consumer-group
Local Docker:
KAFKA_ENV=local
BOOTSTRAP_SERVER=localhost:9092
TOPIC=demo-topic
SCHEMA_REGISTRY_URL=http://localhost:8081
CLIENT_ID=python-client
GROUP_ID=python-consumer-group
confluent-kafka[json,schema_registry]>=2.13.2
jsonschema
python-dotenv
requests>=2.25.0
httpx
authlib
cachetools
attrs
typing_extensions
pytest
pytest-asyncio
Every third-party package imported anywhere in the generated code (producer.py, consumer.py, common.py) must have a corresponding entry in requirements.txt. If the code does from confluent_kafka import ..., then confluent-kafka must be in requirements.txt. If it does from dotenv import load_dotenv, then python-dotenv must be listed. This includes transitive dependencies that aren't automatically installed — for example, the async Schema Registry client imports httpx and authlib at runtime, so both must be explicitly listed even though they aren't declared as dependencies of confluent-kafka. The user should be able to pip install -r requirements.txt and run the code with zero ModuleNotFoundErrors.
Always include pytest. Include pytest-asyncio if the project uses the async producer or consumer. Only include Faker if the producer generates sample data with it.
Generate a README following references/readme-template.md. Adapt to match what was actually generated — omit producer sections if only a consumer was requested, omit Docker sections for Confluent Cloud projects.
Always generate unit tests. Use references/test_project.py as the template. The tests must run without a live Kafka cluster or Schema Registry — mock all external dependencies so tests pass in CI and eval environments.
The tests should verify these properties of the generated code:
common.py: load_config() returns all required keys and uses correct defaults. get_kafka_config() produces a config with SASL_SSL and PLAIN when KAFKA_ENV=cloud, or PLAINTEXT with no SASL when KAFKA_ENV=local. verify_kafka_setup() and verify_schema_registry() return the right booleans when mocked to succeed or fail.
producer.py (if generated): produce() accepts a producer instance and a schema_id as parameters (never creates a producer). The producer class (AIOProducer for async, Producer for sync) is instantiated exactly once in the module. Messages are passed through the serializer before producing. The schema ID is included as a confluent.value.schemaId Kafka record header on every produced message. For synchronous producers, verify flush() is called after producing.
consumer.py (if generated): Uses JSONDeserializer or AsyncJSONDeserializer (no raw JSON parsing fallback). Calls unsubscribe() before close() for graceful shutdown.
schemas/value.schema.json: Valid JSON Schema with type: object, a title, and properties with at least one property. Each property has a type.
Project structure: requirements.txt exists and contains confluent-kafka, python-dotenv, and requests. .env.example exists.
Adapt the tests to the user's specific schema and data domain — if they have fields like device_id and temperature, the schema tests can check for those specific field names.
After generating all files, run pytest tests/ to verify the tests pass. If any test fails, fix the generated code (not the tests) until they pass.
After generating the files, give the user instructions based on their target environment. Adapt these to match what was actually generated: omit producer steps if only a consumer was requested, omit consumer steps if only a producer was requested.
Confluent Cloud:
.env.example to .env and fill in their Confluent Cloud credentialspip install -r requirements.txtregister_schema() function (auto.register.schemas is set to False). If only a consumer was generated: register the schema manually by pasting the contents of schemas/value.schema.json into the Confluent Cloud Console under Schema Registry > Schemas for the topic's value subject.python producer.pypython consumer.pyRemind them that they can find their bootstrap server, API keys, and Schema Registry URL in the Confluent Cloud Console under their cluster and environment settings.
Local Docker:
docker compose up -d.env.example to .env (defaults are pre-filled for local Docker, no edits needed)pip install -r requirements.txtdocker compose exec kafka kafka-topics --create --topic demo-topic --bootstrap-server localhost:29092python producer.pypython consumer.pydocker compose down (add -v to also remove stored data)