From snowflake-pack
Loads data into Snowflake tables from S3, GCS, Azure via stages and COPY INTO. Creates file formats, sets up Snowpipe for continuous ingestion, handles bulk files.
npx claudepluginhub jeremylongshore/claude-code-plugins-plus-skills --plugin snowflake-packThis skill is limited to using the following tools:
Primary data loading workflow: stages, file formats, COPY INTO, and Snowpipe for continuous ingestion.
Guides Snowflake migrations from Redshift, BigQuery, or on-prem databases including schema conversion, SQL dialect mapping, and data transfer via S3 or other storage.
Assists Snowflake developers with SQL best practices, data pipelines (Dynamic Tables, Streams, Tasks, Snowpipe), Cortex AI functions/Agents, Snowpark Python, dbt integration, performance tuning, and security hardening.
Automates Snowflake data warehouse operations: lists databases/schemas/tables with filters, executes SQL (SELECT/DDL/DML), manages workflows via Composio MCP.
Share bugs, ideas, or general feedback.
Primary data loading workflow: stages, file formats, COPY INTO, and Snowpipe for continuous ingestion.
snowflake-install-auth setupCREATE STAGE and USAGE on warehouse-- CSV format
CREATE OR REPLACE FILE FORMAT my_csv_format
TYPE = 'CSV'
FIELD_DELIMITER = ','
SKIP_HEADER = 1
NULL_IF = ('NULL', 'null', '')
EMPTY_FIELD_AS_NULL = TRUE
FIELD_OPTIONALLY_ENCLOSED_BY = '"'
ERROR_ON_COLUMN_COUNT_MISMATCH = FALSE;
-- JSON format (for semi-structured data)
CREATE OR REPLACE FILE FORMAT my_json_format
TYPE = 'JSON'
STRIP_OUTER_ARRAY = TRUE
IGNORE_UTF8_ERRORS = TRUE;
-- Parquet format
CREATE OR REPLACE FILE FORMAT my_parquet_format
TYPE = 'PARQUET'
SNAPPY_COMPRESSION = TRUE;
-- External stage (S3)
CREATE OR REPLACE STAGE my_s3_stage
STORAGE_INTEGRATION = my_s3_integration
URL = 's3://my-bucket/data/'
FILE_FORMAT = my_csv_format;
-- External stage (GCS)
CREATE OR REPLACE STAGE my_gcs_stage
STORAGE_INTEGRATION = my_gcs_integration
URL = 'gcs://my-bucket/data/'
FILE_FORMAT = my_csv_format;
-- Internal stage (Snowflake-managed storage)
CREATE OR REPLACE STAGE my_internal_stage
FILE_FORMAT = my_csv_format;
-- List files in stage
LIST @my_s3_stage;
# Using SnowSQL PUT command
snowsql -c prod -q "PUT file:///tmp/data/*.csv @my_internal_stage AUTO_COMPRESS=TRUE"
# Using Python connector
cursor.execute("PUT file:///tmp/data/users.csv @my_internal_stage AUTO_COMPRESS=TRUE")
-- Basic COPY INTO from stage
COPY INTO my_db.my_schema.users
FROM @my_s3_stage/users/
FILE_FORMAT = my_csv_format
ON_ERROR = 'CONTINUE' -- Skip bad rows
PURGE = TRUE; -- Delete files after load
-- COPY with column mapping
COPY INTO my_db.my_schema.orders (order_id, customer_id, amount, order_date)
FROM (
SELECT $1, $2, $3::FLOAT, $4::TIMESTAMP_NTZ
FROM @my_s3_stage/orders/
)
FILE_FORMAT = my_csv_format;
-- Load JSON into VARIANT column
COPY INTO my_db.my_schema.raw_events
FROM @my_s3_stage/events/
FILE_FORMAT = my_json_format
MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE;
-- Check COPY history
SELECT * FROM TABLE(INFORMATION_SCHEMA.COPY_HISTORY(
TABLE_NAME => 'USERS',
START_TIME => DATEADD(hours, -24, CURRENT_TIMESTAMP())
));
import { pool } from './snowflake/pool';
import { query } from './snowflake/query';
async function loadDataFromStage(
tableName: string,
stagePath: string,
fileFormat: string = 'my_csv_format'
) {
return pool.withConnection(async (conn) => {
const result = await query(conn, `
COPY INTO ${tableName}
FROM @${stagePath}
FILE_FORMAT = ${fileFormat}
ON_ERROR = 'CONTINUE'
FORCE = FALSE
`);
// COPY INTO returns load status per file
for (const row of result.rows) {
console.log(`File: ${row.file}, Status: ${row.status}, Rows: ${row.rows_loaded}`);
if (row.errors_seen > 0) {
console.warn(` Errors: ${row.errors_seen}, First error: ${row.first_error}`);
}
}
return result.rows;
});
}
-- Create pipe for auto-ingest from S3
CREATE OR REPLACE PIPE my_db.my_schema.user_pipe
AUTO_INGEST = TRUE
AS
COPY INTO my_db.my_schema.users
FROM @my_s3_stage/users/
FILE_FORMAT = my_csv_format;
-- Get the SQS queue ARN for S3 event notifications
SHOW PIPES LIKE 'user_pipe';
-- Use the notification_channel value to configure S3 bucket events
-- Monitor pipe status
SELECT SYSTEM$PIPE_STATUS('my_db.my_schema.user_pipe');
-- Check Snowpipe load history
SELECT *
FROM TABLE(INFORMATION_SCHEMA.COPY_HISTORY(
TABLE_NAME => 'USERS',
START_TIME => DATEADD(hours, -1, CURRENT_TIMESTAMP())
))
WHERE pipe_catalog_name IS NOT NULL;
| Error | Cause | Solution |
|---|---|---|
Insufficient privileges on stage | Role lacks USAGE | GRANT USAGE ON STAGE x TO ROLE y |
File not found | Wrong stage path | Run LIST @stage to verify files |
Number of columns in file does not match | Schema mismatch | Use ERROR_ON_COLUMN_COUNT_MISMATCH = FALSE or fix file |
Files already loaded | COPY deduplication | Use FORCE = TRUE to reload (careful) |
Pipe not receiving files | Missing S3 event notification | Configure SQS notification from SHOW PIPES output |
For data transformation workflows, see snowflake-core-workflow-b.