Help us improve
Share bugs, ideas, or general feedback.
From databricks-pack
Guides Databricks migrations from Hadoop, Snowflake, Redshift, Synapse, or legacy warehouses. Includes table assessment scripts, data transfer patterns, ETL conversion, and cutover plans.
npx claudepluginhub jeremylongshore/claude-code-plugins-plus-skills --plugin databricks-packHow this skill is triggered — by the user, by Claude, or both
Slash command
/databricks-pack:databricks-migration-deep-diveThis skill is limited to the following tools:
The summary Claude sees in its skill listing — used to decide when to auto-load this skill
Comprehensive migration strategies for moving to Databricks from Hadoop, Snowflake, Redshift, Synapse, or legacy data warehouses. Covers discovery and assessment, schema conversion, data migration with batching and validation, ETL/pipeline conversion, and cutover planning with rollback procedures.
Guides Snowflake migrations from Redshift, BigQuery, or on-prem databases including schema conversion, SQL dialect mapping, and data transfer via S3 or other storage.
Upgrades Databricks Runtime versions and migrates to Unity Catalog, checking compatibility, removing deprecated configs, and handling table migrations via SYNC/CTAS.
Guides dbt project migration across data platforms (e.g., Snowflake to Databricks) using dbt Fusion compilation to identify and fix SQL dialect differences, with unit tests for validation.
Share bugs, ideas, or general feedback.
Comprehensive migration strategies for moving to Databricks from Hadoop, Snowflake, Redshift, Synapse, or legacy data warehouses. Covers discovery and assessment, schema conversion, data migration with batching and validation, ETL/pipeline conversion, and cutover planning with rollback procedures.
| Source | Pattern | Complexity | Timeline |
|---|---|---|---|
| Hive Metastore (same workspace) | SYNC / CTAS / DEEP CLONE | Low | Days |
| On-prem Hadoop/HDFS | Lift-and-shift to cloud storage + UC | High | 6-12 months |
| Snowflake | Parallel run + cutover | Medium | 3-6 months |
| AWS Redshift | Unload to S3 + Auto Loader | Medium | 3-6 months |
| Legacy DW (Oracle/Teradata) | Full rebuild with JDBC extraction | High | 12-18 months |
Inventory all source tables with metadata for migration planning.
from pyspark.sql import SparkSession
from dataclasses import dataclass
spark = SparkSession.builder.getOrCreate()
@dataclass
class TableInventory:
database: str
table: str
table_type: str
format: str
row_count: int
size_mb: float
columns: int
partitions: list[str]
def assess_hive_metastore() -> list[TableInventory]:
"""Inventory all Hive Metastore tables for migration planning."""
inventory = []
databases = [r.databaseName for r in spark.sql("SHOW DATABASES").collect()]
for db in databases:
tables = spark.sql(f"SHOW TABLES IN hive_metastore.{db}").collect()
for t in tables:
table_name = f"hive_metastore.{db}.{t.tableName}"
try:
detail = spark.sql(f"DESCRIBE DETAIL {table_name}").first()
schema = spark.table(table_name).schema
inventory.append(TableInventory(
database=db,
table=t.tableName,
table_type=detail.format or "unknown",
format=detail.format or "unknown",
row_count=spark.table(table_name).count(),
size_mb=detail.sizeInBytes / 1048576 if detail.sizeInBytes else 0,
columns=len(schema),
partitions=detail.partitionColumns or [],
))
except Exception as e:
print(f" Skipping {table_name}: {e}")
return inventory
# Generate migration plan
tables = assess_hive_metastore()
tables.sort(key=lambda t: t.size_mb, reverse=True)
print(f"\nTotal tables: {len(tables)}")
print(f"Total size: {sum(t.size_mb for t in tables):.0f} MB")
print(f"\nTop 10 by size:")
for t in tables[:10]:
print(f" {t.database}.{t.table}: {t.size_mb:.0f}MB, {t.row_count:,} rows, {t.format}")
# Schema conversion for common type mismatches
TYPE_MAP = {
# Hadoop/Hive types → Delta Lake/Spark types
"CHAR": "STRING",
"VARCHAR": "STRING",
"TINYINT": "INT",
"SMALLINT": "INT",
"BINARY": "BINARY",
# Snowflake types
"NUMBER": "DECIMAL",
"VARIANT": "STRING", # Store as JSON string, parse in Silver
"TIMESTAMP_NTZ": "TIMESTAMP",
"TIMESTAMP_TZ": "TIMESTAMP",
# Redshift types
"SUPER": "STRING",
"TIMETZ": "TIMESTAMP",
}
def generate_create_table(source_table: str, target_table: str) -> str:
"""Generate CREATE TABLE DDL with type conversions."""
schema = spark.table(source_table).schema
cols = []
for field in schema:
dtype = TYPE_MAP.get(str(field.dataType).upper(), str(field.dataType))
cols.append(f" {field.name} {dtype}")
return f"""CREATE TABLE IF NOT EXISTS {target_table} (
{',\n'.join(cols)}
) USING DELTA
TBLPROPERTIES (
'delta.autoOptimize.optimizeWrite' = 'true',
'delta.autoOptimize.autoCompact' = 'true'
);"""
def migrate_table(
source_table: str,
target_table: str,
method: str = "ctas",
batch_size_mb: int = 500,
) -> dict:
"""Migrate a table with validation."""
result = {"source": source_table, "target": target_table, "method": method}
if method == "sync":
# In-place metadata migration (fastest, no data copy)
spark.sql(f"SYNC TABLE {target_table} FROM {source_table}")
elif method == "deep_clone":
# Delta-to-Delta with history preservation
spark.sql(f"CREATE TABLE {target_table} DEEP CLONE {source_table}")
elif method == "ctas":
# Full data copy (works with any source format)
source_size_mb = spark.sql(
f"DESCRIBE DETAIL {source_table}"
).first().sizeInBytes / 1048576
if source_size_mb > batch_size_mb:
# Batch large tables by partition or row number
spark.sql(f"""
CREATE TABLE {target_table}
USING DELTA
AS SELECT * FROM {source_table}
""")
else:
spark.sql(f"CREATE TABLE {target_table} AS SELECT * FROM {source_table}")
elif method == "jdbc":
# External database migration
df = (spark.read
.format("jdbc")
.option("url", f"jdbc:postgresql://host:5432/db")
.option("dbtable", source_table)
.option("fetchsize", "10000")
.load())
df.write.format("delta").saveAsTable(target_table)
# Validate
src_count = spark.table(source_table).count()
tgt_count = spark.table(target_table).count()
result["source_rows"] = src_count
result["target_rows"] = tgt_count
result["match"] = src_count == tgt_count
result["status"] = "OK" if result["match"] else "MISMATCH"
return result
# Migrate with validation
result = migrate_table(
"hive_metastore.legacy.customers",
"analytics.migrated.customers",
method="ctas",
)
print(f"{result['source']} -> {result['target']}: "
f"{result['source_rows']:,} rows [{result['status']}]")
# Snowflake: Use Lakehouse Federation or Unload + Auto Loader
# Option A: Lakehouse Federation (query in place, no copy)
spark.sql("""
CREATE FOREIGN CATALOG snowflake_catalog
USING CONNECTION snowflake_conn
OPTIONS (database 'PROD_DB')
""")
# Query directly: SELECT * FROM snowflake_catalog.schema.table
# Option B: Unload to S3 + ingest
# In Snowflake:
# COPY INTO @my_s3_stage/export/customers/
# FROM PROD_DB.PUBLIC.CUSTOMERS
# FILE_FORMAT = (TYPE = PARQUET);
# In Databricks:
df = spark.read.parquet("s3://migration-bucket/export/customers/")
df.write.format("delta").saveAsTable("analytics.migrated.customers")
# Redshift: Unload to S3 + Auto Loader
# In Redshift:
# UNLOAD ('SELECT * FROM prod.customers')
# TO 's3://migration-bucket/redshift/customers/'
# FORMAT PARQUET;
# In Databricks:
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "parquet")
.option("cloudFiles.schemaLocation", "/checkpoints/migration/schema")
.load("s3://migration-bucket/redshift/customers/")
.writeStream
.format("delta")
.option("checkpointLocation", "/checkpoints/migration/data")
.toTable("analytics.migrated.customers"))
# Convert Oozie/Airflow jobs to Databricks Asset Bundles
# Before (Oozie/spark-submit):
# spark-submit --class com.company.ETL --master yarn app.jar
# hive -e "INSERT OVERWRITE TABLE target SELECT * FROM staging"
# After (Asset Bundle):
# databricks.yml resources:
"""
resources:
jobs:
migrated_etl:
name: migrated-etl
tasks:
- task_key: extract
notebook_task:
notebook_path: src/extract.py
- task_key: transform
depends_on: [{task_key: extract}]
notebook_task:
notebook_path: src/transform.py
"""
# Convert HiveQL to Spark SQL
# Before: INSERT OVERWRITE TABLE target SELECT ...
# After: (Use MERGE for upserts or write.mode("overwrite").saveAsTable)
cutover_steps = [
{"step": 1, "action": "Final validation", "rollback": "No action needed"},
{"step": 2, "action": "Disable source pipelines", "rollback": "Re-enable source"},
{"step": 3, "action": "Final data sync", "rollback": "Data already in place"},
{"step": 4, "action": "Switch apps to Databricks endpoints", "rollback": "Revert app config"},
{"step": 5, "action": "Enable Databricks pipelines", "rollback": "Disable and restore source"},
{"step": 6, "action": "Monitor for 24 hours", "rollback": "Full rollback if issues"},
]
# Validation query to run at each step
validation_query = """
SELECT 'source' AS system, COUNT(*) AS rows FROM source_table
UNION ALL
SELECT 'target', COUNT(*) FROM target_table
"""
| Error | Cause | Solution |
|---|---|---|
| Schema incompatibility | Unsupported types (VARIANT, SUPER) | Convert to STRING, parse in Silver layer |
| Row count mismatch | Truncation or filter during migration | Check for NULLs, encoding issues, or WHERE clauses |
| JDBC timeout | Large table extraction | Use fetchsize, partition reads, or incremental export |
SYNC fails | External table storage inaccessible | Verify cloud storage credentials and network access |
| Pipeline dependency failure | Wrong migration order | Build dependency graph, migrate leaf tables first |
-- Compare source and target counts
SELECT 'hive_metastore' AS source, COUNT(*) AS rows
FROM hive_metastore.legacy.customers
UNION ALL
SELECT 'unity_catalog', COUNT(*)
FROM analytics.migrated.customers;
migration_plan = [
("hive_metastore.legacy.customers", "analytics.migrated.customers", "ctas"),
("hive_metastore.legacy.orders", "analytics.migrated.orders", "deep_clone"),
("hive_metastore.legacy.products", "analytics.migrated.products", "sync"),
]
results = []
for src, tgt, method in migration_plan:
print(f"Migrating {src} -> {tgt} ({method})...")
result = migrate_table(src, tgt, method)
results.append(result)
print(f" {result['status']}: {result['source_rows']:,} -> {result['target_rows']:,}")
failed = [r for r in results if r["status"] != "OK"]
print(f"\nCompleted: {len(results) - len(failed)}/{len(results)} OK")