Data Mesh architecture patterns — domain ownership, data products with SLOs, self-serve platform design, Delta Lake vs Iceberg, federated Trino queries, data contracts, OpenLineage, and migration from centralized data warehouse.
From clarcnpx claudepluginhub marvinrichter/clarc --plugin clarcThis skill uses the workspace's default tool permissions.
Designs and optimizes AI agent action spaces, tool definitions, observation formats, error recovery, and context for higher task completion rates.
Enables AI agents to execute x402 payments with per-task budgets, spending controls, and non-custodial wallets via MCP tools. Use when agents pay for APIs, services, or other agents.
Compares coding agents like Claude Code and Aider on custom YAML-defined codebase tasks using git worktrees, measuring pass rate, cost, time, and consistency.
Each business domain team owns, operates, and is accountable for their data products — end to end.
Traditional (centralized):
Business Domain → Events/DB → Central ETL Team → Data Warehouse → Consumers
Data Mesh (decentralized):
Business Domain → Events/DB → Domain Team builds & operates Data Product → Consumers
↑ ↑
produces data accesses via well-defined interface
Data products have the same rigor as software products: SLOs, documentation, versioning, and ownership.
# data-product-spec.yaml — Data Product "Customer Orders"
name: customer-orders
domain: orders
owner: orders-engineering@company.com
version: "2.1.0"
description: >
All completed customer orders since 2020-01-01.
Refreshed hourly from the orders database.
slo:
freshness: 1h # data is never older than 1 hour
completeness: 99.9% # < 0.1% missing order records
accuracy: 99.99% # financial amounts accurate to 2 decimal places
output_ports:
- type: sql
endpoint: trino://data-platform.internal/orders/customer_orders
format: Delta Lake table
- type: api
endpoint: https://data-api.orders.internal/v2/customer-orders
format: JSON (paginated)
- type: stream
endpoint: kafka://kafka.internal/orders.customer_orders.v2
format: Avro (Schema Registry)
input_ports:
- source: orders-db.public.orders
refresh: streaming CDC (Debezium)
- source: payments.payment_confirmed events
refresh: Kafka stream
quality_checks:
- tool: great_expectations
suite: customer_orders_suite
run: hourly
- tool: soda
checks: soda/orders/customer_orders.yml
run: on_refresh
catalog:
registered_in: DataHub
tags: [pii, gdpr, financial]
pii_columns: [customer_email, shipping_address, phone_number]
Central platform team provides primitives — domain teams consume via self-service.
Platform Team provides:
├── Compute: Spark clusters on demand (Databricks / EMR)
├── Storage: Delta Lake / Iceberg tables on S3/GCS
├── Catalog: DataHub / OpenMetadata registration API
├── Monitoring: Quality dashboard (Great Expectations Data Docs)
├── Lineage: OpenLineage collector (Marquez)
└── Access: Fine-grained access control (Apache Ranger / Databricks Unity Catalog)
Domain Teams self-serve:
├── Provision new Delta Lake table (via IaC template)
├── Register data product in catalog (via CI pipeline)
├── Configure quality checks (via Great Expectations / Soda YAML)
└── Set up CDC from operational DB (via Debezium template)
Central standards + domain-level implementation.
Global standards (enforced by platform):
- All PII columns tagged and access-controlled
- Data retention policies enforced (7 years for financial, 1 year for behavioral)
- Schema changes are backward-compatible or versioned (v2/)
- All data products registered in catalog before consumers can access them
Domain autonomy:
- Technology stack within data domain (Delta Lake or Iceberg — domain's choice)
- Schema design within standards
- SLO definition (must meet minimums, can exceed)
- Refresh cadence (must meet freshness SLO)
# Domain: Orders
# Data Product: customer-orders
# Input port: CDC from orders database via Debezium
# Config in Debezium connector
{
"name": "orders-postgres-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "orders-db.internal",
"database.port": "5432",
"database.user": "debezium",
"database.dbname": "orders",
"table.include.list": "public.orders,public.order_items",
"topic.prefix": "orders.cdc",
"slot.name": "debezium_slot"
}
}
# Output port: Delta Lake table written by Spark streaming job
from pyspark.sql import SparkSession
from delta import DeltaTable
spark = SparkSession.builder \
.appName("customer-orders-data-product") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
# Read from CDC stream (Kafka)
orders_stream = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka.internal:9092") \
.option("subscribe", "orders.cdc.public.orders") \
.option("startingOffsets", "latest") \
.load()
# Write to Delta Lake (output port)
orders_stream \
.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "s3://data-platform/checkpoints/customer-orders") \
.trigger(processingTime="5 minutes") \
.toTable("orders.customer_orders")
# great_expectations/expectations/customer_orders_suite.py
import great_expectations as gx
context = gx.get_context()
suite = context.add_expectation_suite("customer_orders_suite")
suite.add_expectation(gx.expectations.ExpectColumnValuesToNotBeNull(column="order_id"))
suite.add_expectation(gx.expectations.ExpectColumnValuesToBeUnique(column="order_id"))
suite.add_expectation(gx.expectations.ExpectColumnValuesToBeBetween(
column="total_amount", min_value=0.01, max_value=1_000_000))
suite.add_expectation(gx.expectations.ExpectColumnValuesToBeInSet(
column="status",
value_set=["pending", "confirmed", "shipped", "delivered", "cancelled"]))
suite.add_expectation(gx.expectations.ExpectColumnMaxToBeBetween(
column="created_at", min_value="now() - interval '2 hours'"))
# soda/orders/customer_orders.yml
checks for customer_orders:
# Freshness SLO
- freshness(created_at) < 1h
# Completeness
- missing_count(order_id) = 0
- missing_count(customer_id) = 0
- missing_count(total_amount) = 0
# Validity
- invalid_count(status) = 0:
valid values: [pending, confirmed, shipped, delivered, cancelled]
# Accuracy
- min(total_amount) >= 0.01
- max(total_amount) <= 1000000
# Volume anomaly detection
- anomaly score for row_count < default
# Schema change detection
- schema:
fail:
when required column missing: [order_id, customer_id, total_amount, status, created_at]
when wrong column type:
total_amount: decimal
# Run Soda scan
soda scan \
--datasource orders_delta_lake \
--configuration soda/configuration.yml \
soda/orders/customer_orders.yml
# Delta Lake: ACID transactions + time travel on S3/ADLS/GCS
from delta.tables import DeltaTable
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.getOrCreate()
# Write with ACID guarantees
df.write.format("delta").mode("overwrite").saveAsTable("orders.customer_orders")
# Merge (upsert) — safe for CDC patterns
delta_table = DeltaTable.forName(spark, "orders.customer_orders")
delta_table.alias("target").merge(
updates.alias("source"),
"target.order_id = source.order_id"
).whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
# Time travel — query historical data
spark.read \
.format("delta") \
.option("timestampAsOf", "2024-01-01") \
.table("orders.customer_orders")
# Schema evolution — add a new column without rewriting
spark.sql("ALTER TABLE orders.customer_orders ADD COLUMN loyalty_points INT")
# Vacuum old versions (keep 7 days for time travel)
delta_table.vacuum(168) # 168 hours = 7 days
# Iceberg: multi-engine table format (Spark + Flink + Trino + DuckDB)
# Iceberg excels at: partition evolution, hidden partitioning, multi-engine access
# Spark with Iceberg catalog
spark = SparkSession.builder \
.config("spark.sql.catalog.orders", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.orders.type", "rest") \
.config("spark.sql.catalog.orders.uri", "http://iceberg-rest.internal") \
.config("spark.sql.catalog.orders.warehouse", "s3://data-platform/iceberg") \
.getOrCreate()
# Create table with hidden partitioning (no partition column visible to consumers)
spark.sql("""
CREATE TABLE orders.customer_orders (
order_id STRING NOT NULL,
customer_id STRING NOT NULL,
total_amount DECIMAL(18, 2),
status STRING,
created_at TIMESTAMP
)
USING iceberg
PARTITIONED BY (days(created_at)) -- hidden partition, transparent to queries
""")
# Partition evolution — change partitioning without rewriting data
spark.sql("""
ALTER TABLE orders.customer_orders
REPLACE PARTITION FIELD days(created_at)
WITH months(created_at)
""")
# Time travel
spark.read \
.option("as-of-timestamp", "2024-01-01T00:00:00Z") \
.table("orders.customer_orders")
| Feature | Delta Lake | Apache Iceberg |
|---|---|---|
| Primary engine | Spark / Databricks | Multi-engine (Spark, Flink, Trino, DuckDB) |
| Partition evolution | ⚠️ Full rewrite | ✅ In-place evolution |
| Hidden partitioning | ❌ | ✅ |
| Merge (upsert) | ✅ | ✅ |
| Time travel | ✅ | ✅ |
| Schema evolution | ✅ | ✅ |
| Databricks integration | ✅ Native | ⚠️ Via connector |
| Trino/Presto queries | ⚠️ Via connector | ✅ Native |
| Best for | Databricks-centric | Multi-engine / Trino |
Trino enables cross-domain queries without ETL — query Delta/Iceberg tables from any domain.
-- Query across domains without copying data
-- Orders domain table + Customers domain table
SELECT
o.order_id,
o.total_amount,
c.country,
c.customer_segment
FROM
delta.orders.customer_orders o -- Orders domain (Delta Lake)
JOIN iceberg.customers.customer_profiles c -- Customers domain (Iceberg)
ON o.customer_id = c.customer_id
WHERE
o.created_at >= CURRENT_DATE - INTERVAL '30' DAY
AND o.status = 'delivered';
# etc/catalog/orders-delta.properties
connector.name=delta_lake
hive.metastore.uri=thrift://hive-metastore.orders.internal:9083
delta.native-reader.enabled=true
# etc/catalog/customers-iceberg.properties
connector.name=iceberg
iceberg.catalog.type=rest
iceberg.rest-catalog.uri=http://iceberg-catalog.customers.internal
Configure in spark-defaults.conf:
spark.extraListeners=io.openlineage.spark.agent.OpenLineageSparkListener
spark.openlineage.transport.type=http
spark.openlineage.transport.url=http://marquez.internal:5000
spark.openlineage.namespace=orders
Lineage is emitted automatically for all Spark jobs: input datasets (tables, Kafka topics), output datasets, job metadata, and column-level lineage. Visualize in the Marquez UI at http://marquez.internal:3000.
# Register a data product in DataHub via Python SDK
from datahub.emitter.mce_builder import make_dataset_urn
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.schema_classes import DatasetPropertiesClass, OwnershipClass, OwnerClass, OwnershipTypeClass
emitter = DatahubRestEmitter(gms_server="http://datahub.internal:8080")
dataset_urn = make_dataset_urn(platform="delta-lake", name="orders.customer_orders", env="PROD")
# Emit ownership + properties in one MCE
emitter.emit_mce(MetadataChangeEventClass(
proposedSnapshot=DatasetSnapshotClass(
urn=dataset_urn,
aspects=[
OwnershipClass(owners=[OwnerClass(owner="urn:li:corpuser:orders-team", type=OwnershipTypeClass.DATAOWNER)]),
DatasetPropertiesClass(
description="All completed customer orders since 2020. Refreshed hourly.",
customProperties={"domain": "orders", "slo_freshness": "1h", "contains_pii": "true"},
),
],
)
))
A data contract is a formal, version-controlled agreement between a data product producer and its consumers.
# contracts/orders-to-finance/customer_orders_contract_v2.yaml
apiVersion: datacontract.com/v0.9
kind: DataContract
id: customer-orders-contract-v2
info:
title: Customer Orders — Finance Consumer Contract
version: 2.1.0
owner: orders-engineering@company.com
consumer: finance-analytics@company.com
servers:
production:
type: trino
host: trino.internal
catalog: delta
schema: orders
models:
customer_orders:
description: Completed customer orders
fields:
order_id:
type: string
required: true
unique: true
pii: false
total_amount:
type: decimal(18, 2)
required: true
minimum: 0.01
currency:
type: string
enum: [USD, EUR, GBP]
created_at:
type: timestamp
required: true
quality:
type: great-expectations
specification: expectations/customer_orders_suite.json
terms:
usage: Finance team may use this data for revenue reporting and forecasting
limitations: Do not use customer_id to join with PII data from other domains without DPA approval
billing_plan: internal
Backward compatible (no version bump): adding a nullable column, adding enum values, adding an output port.
Breaking change (requires v2/ new version): removing or renaming a column, changing a column type, removing enum values, tightening an SLO (e.g., freshness 1h → 30min).
Use when: 5+ domains each producing significant data; central data team has 2+ week backlog; data quality issues blamed across team boundaries; moving toward domain-driven microservices.
Do NOT use when: small org (< 5 domains); data team can scale to meet demand; no clear domain ownership (mesh becomes ungoverned chaos); no budget for platform engineering.
data-engineering — ETL/ELT pipelines, dbt, Spark — foundational data engineeringduckdb-patterns — DuckDB for local/analytical queries within a domain's data productevent-driven-patterns — Kafka for streaming input ports of data productskubernetes-patterns — deploying Spark, Trino, and data platform services on Kubernetesobservability — monitoring data pipeline SLOs (freshness, completeness)