From snowflake-pack
Execute Snowflake primary workflow: data loading via stages and COPY INTO. Use when loading data from S3/GCS/Azure into Snowflake tables, setting up Snowpipe for continuous ingestion, or bulk loading files. Trigger with phrases like "snowflake load data", "snowflake COPY INTO", "snowflake stage", "snowflake ingest", "snowflake S3 load", "snowpipe".
npx claudepluginhub flight505/skill-forge --plugin snowflake-packThis skill is limited to using the following tools:
Primary data loading workflow: stages, file formats, COPY INTO, and Snowpipe for continuous ingestion.
Conducts multi-round deep research on GitHub repos via API and web searches, generating markdown reports with executive summaries, timelines, metrics, and Mermaid diagrams.
Share bugs, ideas, or general feedback.
Primary data loading workflow: stages, file formats, COPY INTO, and Snowpipe for continuous ingestion.
snowflake-install-auth setupCREATE STAGE and USAGE on warehouse-- CSV format
CREATE OR REPLACE FILE FORMAT my_csv_format
TYPE = 'CSV'
FIELD_DELIMITER = ','
SKIP_HEADER = 1
NULL_IF = ('NULL', 'null', '')
EMPTY_FIELD_AS_NULL = TRUE
FIELD_OPTIONALLY_ENCLOSED_BY = '"'
ERROR_ON_COLUMN_COUNT_MISMATCH = FALSE;
-- JSON format (for semi-structured data)
CREATE OR REPLACE FILE FORMAT my_json_format
TYPE = 'JSON'
STRIP_OUTER_ARRAY = TRUE
IGNORE_UTF8_ERRORS = TRUE;
-- Parquet format
CREATE OR REPLACE FILE FORMAT my_parquet_format
TYPE = 'PARQUET'
SNAPPY_COMPRESSION = TRUE;
-- External stage (S3)
CREATE OR REPLACE STAGE my_s3_stage
STORAGE_INTEGRATION = my_s3_integration
URL = 's3://my-bucket/data/'
FILE_FORMAT = my_csv_format;
-- External stage (GCS)
CREATE OR REPLACE STAGE my_gcs_stage
STORAGE_INTEGRATION = my_gcs_integration
URL = 'gcs://my-bucket/data/'
FILE_FORMAT = my_csv_format;
-- Internal stage (Snowflake-managed storage)
CREATE OR REPLACE STAGE my_internal_stage
FILE_FORMAT = my_csv_format;
-- List files in stage
LIST @my_s3_stage;
# Using SnowSQL PUT command
snowsql -c prod -q "PUT file:///tmp/data/*.csv @my_internal_stage AUTO_COMPRESS=TRUE"
# Using Python connector
cursor.execute("PUT file:///tmp/data/users.csv @my_internal_stage AUTO_COMPRESS=TRUE")
-- Basic COPY INTO from stage
COPY INTO my_db.my_schema.users
FROM @my_s3_stage/users/
FILE_FORMAT = my_csv_format
ON_ERROR = 'CONTINUE' -- Skip bad rows
PURGE = TRUE; -- Delete files after load
-- COPY with column mapping
COPY INTO my_db.my_schema.orders (order_id, customer_id, amount, order_date)
FROM (
SELECT $1, $2, $3::FLOAT, $4::TIMESTAMP_NTZ
FROM @my_s3_stage/orders/
)
FILE_FORMAT = my_csv_format;
-- Load JSON into VARIANT column
COPY INTO my_db.my_schema.raw_events
FROM @my_s3_stage/events/
FILE_FORMAT = my_json_format
MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE;
-- Check COPY history
SELECT * FROM TABLE(INFORMATION_SCHEMA.COPY_HISTORY(
TABLE_NAME => 'USERS',
START_TIME => DATEADD(hours, -24, CURRENT_TIMESTAMP())
));
import { pool } from './snowflake/pool';
import { query } from './snowflake/query';
async function loadDataFromStage(
tableName: string,
stagePath: string,
fileFormat: string = 'my_csv_format'
) {
return pool.withConnection(async (conn) => {
const result = await query(conn, `
COPY INTO ${tableName}
FROM @${stagePath}
FILE_FORMAT = ${fileFormat}
ON_ERROR = 'CONTINUE'
FORCE = FALSE
`);
// COPY INTO returns load status per file
for (const row of result.rows) {
console.log(`File: ${row.file}, Status: ${row.status}, Rows: ${row.rows_loaded}`);
if (row.errors_seen > 0) {
console.warn(` Errors: ${row.errors_seen}, First error: ${row.first_error}`);
}
}
return result.rows;
});
}
-- Create pipe for auto-ingest from S3
CREATE OR REPLACE PIPE my_db.my_schema.user_pipe
AUTO_INGEST = TRUE
AS
COPY INTO my_db.my_schema.users
FROM @my_s3_stage/users/
FILE_FORMAT = my_csv_format;
-- Get the SQS queue ARN for S3 event notifications
SHOW PIPES LIKE 'user_pipe';
-- Use the notification_channel value to configure S3 bucket events
-- Monitor pipe status
SELECT SYSTEM$PIPE_STATUS('my_db.my_schema.user_pipe');
-- Check Snowpipe load history
SELECT *
FROM TABLE(INFORMATION_SCHEMA.COPY_HISTORY(
TABLE_NAME => 'USERS',
START_TIME => DATEADD(hours, -1, CURRENT_TIMESTAMP())
))
WHERE pipe_catalog_name IS NOT NULL;
| Error | Cause | Solution |
|---|---|---|
Insufficient privileges on stage | Role lacks USAGE | GRANT USAGE ON STAGE x TO ROLE y |
File not found | Wrong stage path | Run LIST @stage to verify files |
Number of columns in file does not match | Schema mismatch | Use ERROR_ON_COLUMN_COUNT_MISMATCH = FALSE or fix file |
Files already loaded | COPY deduplication | Use FORCE = TRUE to reload (careful) |
Pipe not receiving files | Missing S3 event notification | Configure SQS notification from SHOW PIPES output |
For data transformation workflows, see snowflake-core-workflow-b.