Help us improve
Share bugs, ideas, or general feedback.
Set up end-to-end Change Data Capture (CDC) pipelines on Confluent Cloud using Debezium source connectors, Flink for transformation, and Tableflow for data lake integration. Supports JSON_SR, Avro, and Protobuf formats. Handles schemaless topics (plain JSON without SR) and multi-event topics. This skill handles the complete workflow from database to Iceberg/Delta tables. Use this skill when users want to capture database changes and materialize them into Iceberg or Delta Lake tables via Confluent Cloud Tableflow. Trigger phrases include "CDC to Tableflow", "database to Iceberg", "database to Delta Lake", "stream database changes to data lake", "set up Tableflow pipeline", "schemaless topic to Tableflow", or "multi-event topic to Iceberg". Do NOT trigger for general CDC, Debezium, or database replication requests that do not involve Tableflow or Iceberg/Delta Lake as the destination.
npx claudepluginhub confluentinc/agent-skills --plugin streaming-skills-pluginHow this skill is triggered — by the user, by Claude, or both
Slash command
/streaming-skills-plugin:confluent-cloud-cdc-tableflowThe summary Claude sees in its skill listing — used to decide when to auto-load this skill
Build production-ready Change Data Capture pipelines that stream database changes through Confluent Cloud to Iceberg or Delta Lake tables using Debezium, Flink, and Tableflow.
Creates, configures, and updates Databricks Lakeflow Spark Declarative Pipelines (SDP/LDP) using serverless compute. Handles streaming tables, materialized views, CDC, SCD Type 2, Auto Loader for data ingestion in medallion architectures.
Develops Lakeflow Spark Declarative Pipelines on Databricks for batch and streaming data pipelines using Python or SQL. Guides dataset types like Streaming Tables and features like Auto Loader, Auto CDC via decision tree.
Creates and validates Redpanda Connect YAML pipeline configurations from descriptions like Kafka to S3 or repairs broken configs using rpk scaffold and lint tools.
Share bugs, ideas, or general feedback.
Build production-ready Change Data Capture pipelines that stream database changes through Confluent Cloud to Iceberg or Delta Lake tables using Debezium, Flink, and Tableflow.
This skill automates the setup of a complete CDC pipeline:
Database → Debezium CDC Connector → Kafka + Schema Registry → Flink (decode & transform) → Tableflow → Iceberg/Delta Tables
1. NEVER enable Tableflow directly on CDC source topics.
Always use the Flink decode pattern: CDC Source Topic → Flink INSERT → Target Topic (changelog.mode = 'upsert') → Tableflow.
CDC connectors with tombstones.on.delete=true produce null-value Kafka records (tombstones) on DELETE operations. If Tableflow is enabled directly on the CDC source topic, it will use APPEND mode by default and immediately suspend when it encounters a tombstone: "Tableflow will be suspended because we detected a Kafka record with a null value."
The Flink decode layer solves this by interpreting Debezium CDC semantics natively — it translates DELETEs into proper retract/tombstone messages that upsert-mode Tableflow handles correctly.
Do NOT use after.state.only=true as a shortcut to bypass the Flink decode step. While it strips the Debezium envelope, tombstone records from DELETEs still break APPEND-mode Tableflow. Additionally, OracleXStreamSource does not support the after.state.only configuration option at all.
2. Tableflow changelog mode is IMMUTABLE after first materialization.
Tableflow caches the changelog mode (APPEND or UPSERT) when it first materializes data. Once set, it cannot be changed — even by altering the Kafka topic's changelog.mode property or by deleting and recreating the Tableflow topic. The S3 table_path is keyed by Kafka topic name, so recreating a Tableflow topic reuses the same S3 path and cached state.
Attempting to change the mode causes: "The changelog mode for this topic has been modified since table materialization began." Flip-flopping the mode further corrupts state with: "Incompatible schema evolution detected."
To change changelog mode, you must delete the Tableflow topic, delete the underlying Kafka topic, and recreate both from scratch. This is why it's critical to create target topics with 'changelog.mode' = 'upsert' from the start.
3. Pipeline cleanup order matters.
When resetting a CDC-to-Tableflow pipeline, delete resources in this order:
-key and -value subjects)Never delete CDC source Kafka topics while the connector is still running — the connector cannot recover or re-snapshot and must be fully recreated.
references/connector-configs.md "Handling Topics Without Schema Registry".JSON_SR, AVRO, and PROTOBUF all support Flink auto-discovery and Tableflow. Choose based on throughput needs vs. debuggability.output.data.format, not key.converter/value.converter classes.Default: Use Confluent MCP Server. The MCP server is the preferred method for all Confluent Cloud operations. Only fall back to the Confluent CLI (confluent command) and REST APIs if the MCP server is not installed or unavailable.
Check for mcp__confluent__* tools (list-environments, create-connector, create-flink-statement, create-tableflow-topic, list-schemas, list-topics, consume-messages, search-topics-by-name).
If MCP is not available, fall back to the Confluent CLI (confluent command) and REST APIs for all operations. The CLI fallback should mirror the same workflow phases but use CLI commands instead of MCP tool calls.
CLI Fallback Examples:
# Environment & cluster discovery
confluent environment list
confluent kafka cluster list --environment <env-id>
# Connector operations
confluent connect cluster create --config-file connector-config.json --cluster <cluster-id> --environment <env-id>
confluent connect cluster describe <connector-id>
confluent connect cluster list --cluster <cluster-id> --environment <env-id>
# Flink operations
confluent flink compute-pool create <pool-name> --cloud <cloud> --region <region> --environment <env-id>
confluent flink statement create <statement-name> --sql "<SQL>" --compute-pool <pool-id> --environment <env-id>
confluent flink statement describe <statement-name> --environment <env-id>
confluent flink statement delete <statement-name> --environment <env-id>
# Topic & schema operations
confluent kafka topic list --cluster <cluster-id> --environment <env-id>
confluent schema-registry subject list --environment <env-id>
# Tableflow operations
confluent tableflow topic enable <topic-name> --cluster <cluster-id> --environment <env-id> --storage-type MANAGED --table-formats ICEBERG
confluent tableflow topic list --cluster <cluster-id> --environment <env-id>
confluent tableflow topic describe <topic-name> --cluster <cluster-id> --environment <env-id>
confluent tableflow topic disable <topic-name> --cluster <cluster-id> --environment <env-id>
REST API Fallback: If neither MCP nor CLI is available, use the Confluent Cloud REST APIs directly. All calls use HTTP Basic Auth with a Cloud API Key (not a Kafka API key). See references/rest-api.md for endpoint patterns and examples.
Ask the user to provide the following Confluent Cloud details:
| Detail | Example | Used For |
|---|---|---|
| Environment ID | env-0ypxv6 | environmentId in all MCP calls |
| Kafka Cluster ID | lkc-qo5k36 | clusterId in all MCP calls |
| Flink Compute Pool ID | lfcp-3v39xw | computePoolId in Flink statements |
| Flink Catalog Name | my_environment | catalogName in Flink statements |
| Flink Database Name | cluster_0 | databaseName in Flink statements |
Credentials: Generate a cdc-credentials.properties file with placeholders for: Kafka API Key/Secret (cluster-scoped), Database Username/Password. Have the user populate it in their editor and add it to .gitignore. If the user prefers Claude not read the file, fall back to CLI: generate connector-config.json with placeholders, user fills it in, then confluent connect cluster create --config-file connector-config.json.
Quick verification:
mcp__confluent__list-topics to confirm the MCP server is connected to the expected clustermcp__confluent__list-schemas to verify Schema Registry is accessibleSchema Registry is shared at the environment level across all clusters.
Use MCP to check what already exists:
mcp__confluent__list-connectors(environmentId, clusterId) → Existing CDC connectors
mcp__confluent__list-flink-statements(environmentId, computePoolId) → Existing Flink jobs
mcp__confluent__list-tableflow-topics(environmentId, clusterId) → Existing Tableflow topics
Ask the user:
Before proceeding, validate that the chosen topic.prefix won't collide with existing topics:
mcp__confluent__search-topics-by-name(topicName: "<proposed-prefix>", environmentId, clusterId)
Or via CLI:
confluent kafka topic list --cluster <cluster-id> --environment <env-id> | grep "^<proposed-prefix>"
If any existing topics share the proposed prefix, warn the user and recommend a unique prefix. A prefix collision silently merges CDC data with unrelated topics, which can corrupt both pipelines.
Default BACKWARD compatibility can halt CDC connectors when database columns are dropped. Set FULL_TRANSITIVE for CDC subjects after the connector creates them:
confluent schema-registry config update --subject "<topic-prefix>.<schema>.<table>-value" --compatibility FULL_TRANSITIVE --environment <env-id>
Database Configuration:
schema.table)Schema Format: Ask the user: JSON_SR (default, human-readable), AVRO (smaller payloads, high-throughput), or PROTOBUF (strongly typed). All work identically with Flink auto-discovery and Tableflow. Never use plain JSON — it breaks both. See references/connector-configs.md for detailed comparison.
Existing Topics Without SR: See references/connector-configs.md "Handling Topics Without Schema Registry" for options (register JSON schema, schema inference, or Flink raw BYTES).
Tableflow Destination:
Naming Convention:
cdc-pipeline-skill-{database-type}-{YYYYMMDD}cdc-pipeline-skill-postgres-20260323Each database requires specific CDC setup. Read references/database-prerequisites.md for details:
enable_goldengate_replication=TRUE), ARCHIVELOG mode, supplemental logging, XStream admin user with DBMS_XSTREAM_AUTH privileges, XStream outbound server created via DBMS_XSTREAM_ADM.CREATE_OUTBOUND, connector user with XStream connect privilege. Full prereqs: https://docs.confluent.io/cloud/current/connectors/cc-oracle-xstream-cdc-source/prereqs-validation.htmlIf the database isn't properly configured, guide the user through setup before proceeding.
Oracle XStream important limitations:
after.state.only is NOT supported by OracleXStreamSourceGenerate the complete configuration plan and present it to the user for approval.
Based on the database type, generate the connector configuration using the appropriate template from references/connector-configs.md. The templates include all required fields (name, connector.class, topic.prefix, kafka.api.key, output.data.format, decimal.handling.mode, etc.) and database-specific settings.
Set the schema format based on user preference (default JSON_SR):
{
"output.data.format": "JSON_SR",
"output.key.format": "JSON_SR"
}
Replace JSON_SR with AVRO or PROTOBUF if the user requested a different format. Both key and value formats should match. All other connector settings remain the same regardless of format choice.
Topic Naming Pattern:
{topic.prefix}.{schema}.{table}
Example with topic.prefix = "postgres-cdc": postgres-cdc.public.customers
In Confluent Cloud Flink, the CDC source table is auto-discovered from the Kafka topic. You only need to:
Note: The examples below use a customers table with illustrative column names. Substitute the user's actual table name, columns, and types based on the schema discovered from their CDC topic.
CREATE TABLE `target_customers` (
`id` INT NOT NULL,
`name` STRING,
`email` STRING,
`created_at` TIMESTAMP_LTZ(3),
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'changelog.mode' = 'upsert'
);
INSERT INTO `target_customers`
SELECT
`id`,
`name`,
`email`,
TO_TIMESTAMP_LTZ(`created_at` / 1000, 3)
FROM `postgres-cdc.public.customers`;
IMPORTANT Cloud Flink differences:
'connector', 'value.format', 'properties.bootstrap.servers', or Schema Registry URLs in CREATE TABLE — Cloud Flink handles all of this automaticallyafter.* fields or filter by op — Flink interprets CDC changelog semantics nativelyTIMESTAMP_LTZ(3) for Debezium timestamps, not TIMESTAMP(3)DynamoDB CDC is different from SQL CDC in Flink. The auto-discovered table has columns id (key) and val (a complex ROW type containing the CDC envelope with op, before.document, after.document). Flink does NOT auto-decode this envelope like it does for SQL Debezium connectors. You must manually extract fields:
CREATE TABLE `target_dynamodb` (
`id` STRING NOT NULL,
`document` STRING,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH ('changelog.mode' = 'upsert');
INSERT INTO `target_dynamodb`
SELECT `id`, `val`.`after`.`document`
FROM `dynamodb-cdc-source-topic`;
The document field is a JSON string of the DynamoDB item in DynamoDB's native type format (e.g., {"name":{"S":"Alice"},"age":{"N":"30"}}).
Debezium Type Conversions: See references/flink-sql-patterns.md for the full type mapping table. Key conversions: use TO_TIMESTAMP_LTZ(col / 1000, 3) for MicroTimestamp, TO_TIMESTAMP_LTZ(col, 3) for Timestamp, and ensure decimal.handling.mode=string is set on the connector (BYTES default is unusable in Flink).
Tableflow is a native topic-level feature, not a connector. It is enabled per-topic.
Storage Options:
Table Formats: Iceberg (recommended) or Delta Lake
Show the user:
Wait for explicit confirmation before proceeding.
Execute step-by-step using MCP tools, checking status after each component.
Build the connector configuration using the template for the user's database type from references/connector-configs.md. Each template includes all required fields, including the name field.
Using MCP:
mcp__confluent__create-connector(
connectorName: "<connector-name>",
environmentId: "<env-id>",
clusterId: "<cluster-id>",
connectorConfig: { <config from references/connector-configs.md> }
)
Verify: Managed connectors take 2-5 minutes to provision. Poll mcp__confluent__read-connector — tasks: [] means still provisioning; tasks: [{...}] means ready. Then verify schemas with mcp__confluent__list-schemas(subjectPrefix: "postgres-cdc"). If no schemas after 5 min with tasks assigned, check database connectivity. Use Confluent Cloud UI for connector error logs (MCP doesn't expose them).
If the user doesn't have an existing Flink compute pool, create one before executing SQL:
confluent flink compute-pool create <pool-name> --cloud <cloud-provider> --region <region> --environment <env-id>
Use the same cloud provider and region as the Kafka cluster. Wait for the pool status to be RUNNING before proceeding.
Step 1: Verify CDC table is auto-discovered:
mcp__confluent__create-flink-statement(
statementName: "show-tables-check",
statement: "SHOW TABLES;",
environmentId: "<env-id>",
computePoolId: "<pool-id>",
catalogName: "<environment-display-name>",
databaseName: "<cluster-display-name>"
)
Then read results:
mcp__confluent__read-flink-statement(statementName: "show-tables-check", environmentId: "<env-id>")
Look for the CDC topic table (e.g., postgres-cdc.public.customers). If not present, the connector hasn't produced data yet — wait and retry.
Step 2: Create target table:
mcp__confluent__create-flink-statement(
statementName: "cdc-create-target-customers",
statement: "CREATE TABLE `target_customers` (...) WITH ('changelog.mode' = 'upsert');",
environmentId, computePoolId, catalogName, databaseName
)
Step 3: Create INSERT job:
mcp__confluent__create-flink-statement(
statementName: "cdc-decode-customers",
statement: "INSERT INTO `target_customers` SELECT ... FROM `postgres-cdc.public.customers`;",
environmentId, computePoolId, catalogName, databaseName
)
The INSERT creates a continuous Flink job. Verify it transitions to RUNNING (not FAILED):
mcp__confluent__read-flink-statement(statementName: "cdc-decode-customers", environmentId)
Common INSERT failures:
Advisory warnings (can be ignored):
Using MCP:
mcp__confluent__create-tableflow-topic(
tableflowTopicConfig: {
"display_name": "target_customers",
"storage": { "kind": "Managed", "bucket_name": "managed", "provider_integration_id": "managed" },
"table_formats": ["ICEBERG"],
"config": { "record_failure_strategy": "SUSPEND", "retention_ms": "6048000000" }
}
)
KNOWN LIMITATION: The MCP create-tableflow-topic tool does NOT accept environmentId or clusterId parameters. It defaults to the cluster configured in the MCP server. If the MCP server points to a different cluster than where the target topic exists, this will fail with "topic not found". Use the CLI or UI as a workaround.
Using CLI:
# Managed storage (Confluent manages S3)
confluent tableflow topic enable target_customers \
--cluster <cluster-id> \
--environment <env-id> \
--storage-type MANAGED \
--table-formats ICEBERG
# BYOB / BYOS (user's own S3 bucket)
confluent tableflow topic enable target_customers \
--cluster <cluster-id> \
--environment <env-id> \
--storage-type BYOS \
--provider-integration <provider-integration-id> \
--bucket-name <bucket-name> \
--table-formats ICEBERG
# Azure Data Lake Storage Gen2
confluent tableflow topic enable target_customers \
--cluster <cluster-id> \
--environment <env-id> \
--storage-type AzureDataLakeStorageGen2 \
--provider-integration <provider-integration-id> \
--storage-account-name <account-name> \
--container-name <container-name> \
--table-formats DELTA
Use --table-formats DELTA for Delta Lake instead of Iceberg.
Verify Tableflow is enabled:
mcp__confluent__list-tableflow-topics(environmentId, clusterId)
Or via CLI:
confluent tableflow topic describe target_customers --cluster <cluster-id> --environment <env-id>
confluent tableflow topic list --cluster <cluster-id> --environment <env-id>
Status will transition from PENDING → ACTIVE.
Large Table Snapshots: If the connector was created with snapshot.mode: initial on a large table, verification may take hours. To distinguish a running snapshot from a broken pipeline:
read-connector → tasks is non-empty)list-schemas → key/value schemas exist) — this means the snapshot has started producingIf you used snapshot.mode: schema_only for initial validation, insert a test row in the source database to trigger a CDC event and verify the full pipeline. See references/troubleshooting.md for detailed snapshot troubleshooting.
Check each component:
| Check | MCP Tool | What to Look For |
|---|---|---|
| Connector running | read-connector | tasks array is non-empty |
| Schemas registered | list-schemas(subjectPrefix) | Key and value schemas for CDC topic |
| CDC table in Flink | create-flink-statement("SHOW TABLES") | CDC topic appears as table |
| Flink job running | read-flink-statement | No error in response |
| Target topic has data | consume-messages(topicNames) | Messages appear (note: consumer starts at latest offset) |
| Tableflow enabled | list-tableflow-topics | Status is PENDING or ACTIVE |
Consume from target topic to verify decoded data:
mcp__confluent__consume-messages(
topicNames: ["target_customers"],
value: { "useSchemaRegistry": true },
key: { "useSchemaRegistry": true },
maxMessages: 5,
timeoutMs: 15000
)
Note: The consumer starts at the latest offset. If the initial snapshot already completed, you may see 0 messages until a new database change occurs.
Test real-time CDC by inserting a row in the source database (adapt table name and columns to match the user's actual schema):
INSERT INTO public.customers (name, email, created_at)
VALUES ('Test User', 'test@example.com', NOW());
For detailed troubleshooting, see references/troubleshooting.md.
Quick reference — pipeline-blocking issues:
| Symptom | Likely Cause | Fix |
|---|---|---|
| Connector tasks stay empty | Still provisioning | Wait 2-5 minutes, retry |
| No schemas after 5 min | DB connectivity or credentials | Check host, port, user, password; verify DB CDC config |
| SHOW TABLES missing CDC table | Connector not producing yet | Verify schemas exist first, then wait |
| INSERT: "Incompatible types" | Debezium type mismatch | Use TIMESTAMP_LTZ(3) + TO_TIMESTAMP_LTZ; see references/flink-sql-patterns.md |
| Tableflow: "topic not found" | MCP cluster mismatch | Use CLI: confluent tableflow topic enable or Confluent Cloud UI |
| Topic exists but not in SHOW TABLES | No schema in SR | Register a JSON schema in SR or use schema inference; see references/connector-configs.md |
After successful setup, provide the user with:
references/database-prerequisites.mdreferences/connector-configs.mdreferences/flink-sql-patterns.mdreferences/troubleshooting.mdreferences/rest-api.md