From bauplan
Safely ingests S3 data into Bauplan lakehouse using temporary branches, quality checks, and Write-Audit-Publish pattern before atomic merge to main. For validated parquet/csv/jsonl imports.
npx claudepluginhub bauplanlabs/bauplan-skills --plugin bauplanThis skill is limited to using the following tools:
Safely ingest data from S3 into the Bauplan lakehouse by isolating changes on a temporary branch, running quality checks, and only merging to `main` after validation succeeds.
Generates data quality check code for bauplan pipelines (expectations.py with @bauplan.expectation) and ingestion workflows (validate_import() in WAP scripts). Uses Polars, never Pandas.
Imports data into AWS data lake (S3 Tables or Iceberg) from S3 files, local uploads, JDBC (Oracle, PostgreSQL, MySQL, SQL Server, RDS), Redshift, Snowflake, BigQuery, DynamoDB, or Glue tables. For one-time loads, pipelines, migrations.
Generates clean, efficient Dataform code for BigQuery ELT pipelines including actions, SQLX transformations, source declarations, GCS ingestion, project init, and workflow_settings.yaml.
Share bugs, ideas, or general feedback.
Safely ingest data from S3 into the Bauplan lakehouse by isolating changes on a temporary branch, running quality checks, and only merging to main after validation succeeds.
This pattern is formally known as Write-Audit-Publish (WAP) in the Iceberg ecosystem.
Implement this as a Python script using the bauplan SDK. Do NOT use CLI commands for the ingestion itself.
Before writing the script, check whether the project uses uv (look for pyproject.toml or uv.lock). If so, use uv run python to execute scripts and uv add to install packages. Otherwise, use the system python and pip install.
Ensure the required packages are installed:
bauplan (the Bauplan Python SDK — required)polars (if validation logic needs DataFrame operations — zero-copy Arrow interop)Do not use pandas. Bauplan's client.query() returns a PyArrow table directly — you can access columns with result.column("name")[0].as_py() or convert to Polars with pl.from_arrow(result). No .to_arrow() call is needed. Pandas requires a full data copy and is slower.
The three phases:
main)main only after validation passesBranch safety: All operations happen on a temporary branch, NEVER on main. By default, branches are kept open for inspection after success or failure.
Atomic multi-table operations: merge_branch is atomic. You can create or modify multiple tables on a branch, and when you merge, either all changes apply to main or none do. This enables safe multi-table ingestion workflows.
Before writing the script, you MUST gather:
s3://bucket/path/*.parquet)inspect (default): Keep the branch open for user inspection before mergingmerge: Automatically merge to main and delete the branchinspect (default): Leave the branch open for inspection/debuggingdelete: Delete the failed branchAsk the user which situation they're in. This determines what validation code goes into the script.
The user is importing data they haven't explored. They can't state expectations because they don't know the shape yet.
Action: Generate the script with a minimal check — table is non-empty (row count > 0). No further questions needed.
Be honest about what this means:
on_success="merge", bad data (wrong types, nulls in key columns, duplicates, stale records) will land on main with no way to catch it before downstream consumers read it.Minimal validation code:
def validate_import(client, table_name, branch, namespace="bauplan"):
fq_table = f"{namespace}.{table_name}"
result = client.query(f"SELECT COUNT(*) as n FROM {fq_table}", ref=branch)
row_count = result.column("n")[0].as_py()
assert row_count > 0, f"{fq_table} has 0 rows after import"
print(f" Row count: {row_count}")
The user can state expectations directly — specific columns, properties, and severities. Examples:
Action: Gather the user's check specifications, then invoke the bauplan-data-quality-checks skill to generate a validate_import() function. Pass it:
The bauplan-data-quality-checks skill will translate the specifications into validation code. Embed the resulting validate_import() function in the script.
Do not ask about downstream pipelines or consumers. The user has already decided what to check. If a specification is incomplete (e.g., "check user_id" without saying what property), ask about the specific check, not the pipeline's purpose.
The user wants to import the data now and build the pipeline first. Once the pipeline exists, the pipeline code will tell the agent exactly what to check.
Action: Generate the script with minimal validation (same as Path A). After a successful import, tell the user:
"The data is imported on branch
<branch_name>. When your pipeline is ready, you can come back and add quality checks — thebauplan-data-quality-checksskill can read yourmodels.pyand derive checks from how the pipeline actually uses this table."
This is not a failure or a shortcut. It's the right order when the user doesn't yet know what the data's consumers need.
"""
Safe ingestion script for <TABLE_NAME>.
Write-Audit-Publish (WAP) pattern: import on an isolated branch, validate, then merge or inspect.
"""
import sys
import time
import bauplan
TABLE_NAME = "<table_name>"
S3_PATH = "<s3_uri>"
NAMESPACE = "bauplan"
def validate_import(client, table_name, branch, namespace="bauplan"):
"""Run quality checks on the imported data. Raises on FAIL, prints on WARN."""
fq_table = f"{namespace}.{table_name}"
# --- Minimal check: table must be non-empty ---
result = client.query(f"SELECT COUNT(*) as n FROM {fq_table}", ref=branch)
row_count = result.column("n")[0].as_py()
assert row_count > 0, f"{fq_table} has 0 rows after import"
print(f" Row count: {row_count}")
# For Path B: replace the above with checks from the bauplan-data-quality-checks skill.
# For Path A/C: the above is sufficient.
def main():
client = bauplan.Client()
info = client.info()
username = info.user.username
timestamp = int(time.time())
branch_name = f"{username}.import_{TABLE_NAME}_{timestamp}"
print(f"Creating branch: {branch_name}")
client.create_branch(branch=branch_name, from_ref="main")
try:
# === IMPORT PHASE ===
print(f"\nPhase 1: Creating table '{TABLE_NAME}' from S3...")
client.create_table(
table=TABLE_NAME,
search_uri=S3_PATH,
branch=branch_name,
namespace=NAMESPACE,
replace=True,
)
print(f" Table schema created.")
print(f" Importing data...")
import_state = client.import_data(
table=TABLE_NAME,
search_uri=S3_PATH,
branch=branch_name,
namespace=NAMESPACE,
)
if import_state.error:
raise RuntimeError(f"import_data failed: {import_state.error}")
print(f" Data imported.")
# === VALIDATION PHASE ===
print(f"\nPhase 2: Running quality checks...")
validate_import(client, TABLE_NAME, branch_name, NAMESPACE)
# === MERGE PHASE ===
# on_success="inspect" (default): keep branch open
print(f"\nImport complete. Branch ready for inspection: '{branch_name}'")
print(f"To query: bauplan query \"SELECT * FROM {NAMESPACE}.{TABLE_NAME} LIMIT 10\" --ref {branch_name}")
print(f"To merge: bauplan branch merge {branch_name} --into main")
print(f"To delete: bauplan branch delete {branch_name}")
# on_success="merge": uncomment below, remove above
# client.merge_branch(source_ref=branch_name, into_branch="main")
# print(f"Successfully published {TABLE_NAME} to main")
# client.delete_branch(branch_name)
except Exception as exc:
print(f"\nImport FAILED: {exc}")
print(f"Branch preserved for debugging: '{branch_name}'")
sys.exit(1)
if __name__ == "__main__":
main()
| Method | Description |
|---|---|
bauplan.Client() | Initialize the bauplan client |
client.info() | Get client info; access username via .user.username |
client.create_branch(name, from_ref="main") | Create a new branch from specified ref |
client.has_branch(name) | Check if branch exists |
client.delete_branch(name) | Delete a branch |
client.create_table(table, search_uri, ...) | Create table with schema inferred from S3 |
client.import_data(table, search_uri, ...) | Import data from S3 into table |
client.query(query, ref) | Run SQL query, returns a PyArrow Table directly |
client.merge_branch(source_ref, into_branch) | Merge branch into target |
client.has_table(table, ref, namespace) | Check if table exists on branch |
bauplan-data-quality-checks skill, embed result in scriptpython <script_name>.pySuccessful run (on_success="inspect"):
Imported 15234 rows
Import complete. Branch ready for inspection: 'alice.import_orders_1704067200'.
To merge manually: bauplan checkout main && bauplan branch merge alice.import_orders_1704067200
Successful run (on_success="merge"):
Imported 15234 rows
Successfully published orders to main
Cleaned up branch: alice.import_orders_1704067200
Failed run (on_failure="inspect"):
Import failed: No data was imported
Branch preserved for inspection/debugging: 'alice.import_orders_1704067200'
If the user chose Path A or C and wants to add checks to an existing script:
bauplan-data-quality-checks skill with context: ingestion.models.py (they built the pipeline and want checks derived from it)bauplan-data-quality-checks skill reads the existing script, finds the validate phase, and replaces the minimal check with proper validation logic.Checks added after an import do not gate any previous run. If the data is already on main, it's there without quality validation.
To append data to a table that already exists on main, skip create_table and only call import_data:
# Table already exists on main — just import new data
client.import_data(
table=table_name,
search_uri=s3_path,
namespace=namespace,
branch=branch_name,
)
The validate and merge phases remain the same. New rows are sandboxed on the branch until merged.
When on_success="inspect" (default), the branch is left open for review. To merge after inspecting:
bauplan checkout main
bauplan branch merge <branch_name>
bauplan branch rm <branch_name> # optional cleanup
The branch name is printed by the script upon completion.
When unsure about a method signature, CLI flag, or concept, fetch the relevant doc page via WebFetch rather than guessing. Pages are markdown and LLM-friendly.
Python SDK: https://docs.bauplanlabs.com/reference/bauplan.md
Relevant guides and concept pages:
https://docs.bauplanlabs.com/tutorial/03-import.mdhttps://docs.bauplanlabs.com/concepts/schema-conflicts.mdhttps://docs.bauplanlabs.com/concepts/schema-conflicts.mdhttps://docs.bauplanlabs.com/concepts/git-for-data/data-branches.mdhttps://docs.bauplanlabs.com/concepts/tables.mdhttps://docs.bauplanlabs.com/concepts/namespaces.mdFull doc index: https://docs.bauplanlabs.com/llms.txt
CLI: The bauplan CLI is self-documenting:
bauplan --help — lists all available commandsbauplan <command> --help — shows arguments and options for a specific command (e.g., bauplan branch --help, bauplan import-data --help)Validating generated Python: After writing or updating the ingestion script, run ruff check and ruff format to catch syntax errors and style issues, and ty to catch type errors — these verify the code compiles and the SDK calls are well-formed without executing it. Only run these if they are installed (check with which ruff / which ty).