Auto-activate for sqlspec imports. Produces database adapter configurations, SQL query mappings, and framework extensions using SQLSpec. Use when working with database adapters, SQL execution, query building, Arrow integration, parameter handling, framework extensions, filters, pagination, event channels, SQL file loading, or storage integrations. Not for raw SQL or ORM-specific patterns (see sqlalchemy, advanced-alchemy).
From flownpx claudepluginhub cofin/flow --plugin flowThis skill uses the workspace's default tool permissions.
references/adapters.mdreferences/architecture.mdreferences/arrow.mdreferences/driver_api.mdreferences/events.mdreferences/extensions.mdreferences/filters.mdreferences/loader.mdreferences/observability.mdreferences/patterns.mdreferences/query_builder.mdreferences/sqlglot.mdreferences/standards.mdreferences/storage.mdSearches, retrieves, and installs Agent Skills from prompts.chat registry using MCP tools like search_skills and get_skill. Activates for finding skills, browsing catalogs, or extending Claude.
Searches prompts.chat for AI prompt templates by keyword or category, retrieves by ID with variable handling, and improves prompts via AI. Use for discovering or enhancing prompts.
Guides agentic engineering workflows: eval-first loops, 15-min task decomposition, model routing (Haiku/Sonnet/Opus), AI code reviews, and cost tracking.
SQLSpec is a type-safe SQL query mapper for Python -- NOT an ORM. It provides flexible connectivity with consistent interfaces across 15+ database adapters. Write raw SQL, use the builder API, or load SQL from files. All statements pass through a sqlglot-powered AST pipeline for validation and dialect conversion.
from sqlspec.adapters.asyncpg import AsyncPGConfig, AsyncPGDriver
# Configure the adapter with connection details
config = AsyncPGConfig(
dsn="postgresql://user:pass@localhost:5432/mydb",
pool_min_size=2,
pool_max_size=10,
)
# Use the driver as a context manager for connection lifecycle
async with config.create_driver() as db:
users = await db.select_many(
"SELECT * FROM users WHERE active = $1",
[True],
schema_type=User,
)
from sqlspec import sql
# SELECT with filters
stmt = (
sql.select("id", "name", "email")
.from_("users")
.where_eq("status", "active")
.where("created_at > :since", since=cutoff_date)
.order_by("created_at", desc=True)
.limit(50)
.to_statement()
)
# INSERT
stmt = (
sql.insert_into("users")
.columns("name", "email")
.values(name="Alice", email="alice@example.com")
.to_statement()
)
# MERGE / upsert
stmt = (
sql.merge_into("inventory")
.using("updates", on="inventory.product_id = updates.product_id")
.when_matched().do_update(qty="updates.qty")
.when_not_matched().do_insert(product_id="updates.product_id", qty="updates.qty")
.to_statement()
)
| Method | Returns | Use Case |
|---|---|---|
select_value() | Single scalar | COUNT(*), MAX(), existence checks |
select_one() | One row (strict) | Get-by-ID, raises NotFoundError |
select_one_or_none() | One row or None | Optional lookup |
select_many() | List of rows | Filtered queries, listing |
select_to_arrow() | pyarrow.Table | Bulk data export, analytics |
execute() | Row count | INSERT/UPDATE/DELETE |
execute_many() | Row count | Batch operations |
# Zero-copy on DuckDB, ADBC adapters; conversion on others
arrow_table = await db.select_to_arrow(
"SELECT * FROM large_dataset WHERE region = $1", [region]
)
# Bulk load from Arrow
await db.copy_from_arrow(arrow_table, target_table="users")
<workflow>
| Need | Adapter | Key Feature |
|---|---|---|
| PostgreSQL async | asyncpg, psycopg | Async, NUMERIC/PYFORMAT params |
| PostgreSQL sync | psycopg | Sync+async, PYFORMAT params |
| SQLite | sqlite, aiosqlite | QMARK params, local dev |
| DuckDB analytics | duckdb | Arrow-native, zero-copy |
| MySQL async | asyncmy | PYFORMAT params |
| Oracle | oracledb | NAMED_COLON params, sync+async |
| BigQuery / Spanner | bigquery, spanner | NAMED_AT params |
| Raw SQL strings | Driver methods | select_many(), execute() |
| Dynamic queries | Query builder | sql.select()...to_statement() |
| SQL from files | SQLFileLoader | Metadata directives, caching |
create_driver() context manager for connection lifecycleschema_type parameter for typed results (Pydantic or msgspec models)LimitOffsetFilter, OrderByFilter, SearchFilterRun through the validation checkpoint below before considering the work complete.
</workflow> <guardrails>schema_type for query results -- get typed objects, not raw dictsasync with config.create_driver() as db:SQLFileLoader for static queries -- keeps SQL out of Python, enables caching$1 for asyncpg, %s for psycopg, ? for sqlite, :name for oracledbBefore delivering SQLSpec code, verify:
sqlspec.adapters.<name>)create_driver() context managerschema_type for type-safe mappingLimitOffsetFilter, etc.) not manual LIMIT/OFFSETTask: "Set up an asyncpg adapter, define a typed model, and execute a parameterized query with pagination."
from dataclasses import dataclass
from sqlspec.adapters.asyncpg import AsyncPGConfig
from sqlspec.core.filters import LimitOffsetFilter, OrderByFilter
# --- Typed model ---
@dataclass
class User:
id: int
name: str
email: str
active: bool
# --- Adapter setup ---
config = AsyncPGConfig(
dsn="postgresql://user:pass@localhost:5432/mydb",
pool_min_size=2,
pool_max_size=10,
)
# --- Query execution ---
async def list_active_users(page: int = 1, page_size: int = 25) -> list[User]:
filters = [
OrderByFilter(columns=[("name", "asc")]),
LimitOffsetFilter(limit=page_size, offset=(page - 1) * page_size),
]
async with config.create_driver() as db:
users = await db.select_many(
"SELECT id, name, email, active FROM users WHERE active = $1",
[True],
*filters,
schema_type=User,
)
return users
async def get_user_count() -> int:
async with config.create_driver() as db:
count = await db.select_value(
"SELECT COUNT(*) FROM users WHERE active = $1", [True]
)
return count
</example>
The sql factory provides a fluent builder API with full method chaining. All builders terminate with .to_statement() and pass through sqlglot for validation and dialect conversion.
| Builder | Entry Point | Key Methods |
|---|---|---|
| SELECT | sql.select(*cols) | .from_(), .where(), .where_eq(), .join(), .order_by(), .limit(), .offset() |
| INSERT | sql.insert_into(table) | .columns(), .values(), .returning() |
| UPDATE | sql.update(table) | .set_(), .where(), .returning() |
| DELETE | sql.delete_from(table) | .where(), .returning() |
| MERGE | sql.merge_into(target) | .using(), .when_matched(), .when_not_matched() |
| CREATE TABLE | sql.create_table(name) | .column(), .primary_key(), .if_not_exists() |
| DROP TABLE | sql.drop_table(name) | .if_exists(), .cascade() |
select_to_arrow() returns an Apache Arrow Table for bulk and analytical workloads:
copy_from_arrow(table, target_table) for bulk loads back into the databaseSQLSpec filter objects are passed directly to driver methods alongside the SQL string. They modify the statement before execution.
| Filter | Purpose | Example Use |
|---|---|---|
BeforeAfterFilter | Date range bounds (before, after) | Audit log queries, time-range pagination |
InCollectionFilter | SQL IN (...) clause | Filter by a set of IDs or enum values |
LimitOffsetFilter | Page-based pagination | limit=25, offset=50 |
OrderByFilter | Dynamic sort columns and direction | User-supplied sort fields |
SearchFilter | Text search (ILIKE / LIKE) | Full-text style search on string columns |
Filters are composable — pass multiple to a single select_many() call and they are applied in order.
| Framework | Integration | Key Feature |
|---|---|---|
| Litestar | SQLSpecPlugin | Dependency injection of typed driver; auto session lifecycle |
| FastAPI / Starlette | Middleware | Request-scoped connection; injects driver into route dependencies |
| Flask | Extension | init_app() pattern; driver available via g or current_app |
SQLSpecPlugin for Litestar registers the driver as a DI provider — inject it into route handlers via type annotation without manual context management.
For databases that support server-side pub/sub (e.g., PostgreSQL LISTEN/NOTIFY):
AsyncEventChannel to subscribe to named channelsNOTIFY channel, payload from SQL or from the publish() methodEventMessage objects with channel name, payload, and PIDSQL object holds all state for a given statementSQL object return new instancesFor detailed instructions, patterns, and API guides, refer to the following documents:
copy=False pattern.sql factory: select, insert, update, delete, merge.select_value(), select_one(), select_many(), select_to_arrow().LimitOffsetFilter, OrderByFilter, SearchFilter.select_to_arrow() zero-copy, copy_from_arrow() bulk loading.SQLFileLoader with search paths, metadata directives.AsyncEventChannel, subscribe/publish patterns.