From litestar-skills
Auto-activate for sqlspec imports. Produces database adapter configurations, SQL query mappings, and framework extensions using SQLSpec. Use when working with database adapters, SQL execution, query building, Arrow integration, parameter handling, framework extensions, filters, pagination, event channels, SQL file loading, or storage integrations. Not for raw SQL or ORM-specific patterns (see sqlalchemy, advanced-alchemy).
npx claudepluginhub litestar-org/litestar-skills --plugin litestar-skillsThis skill uses the workspace's default tool permissions.
SQLSpec is a **type-safe SQL query mapper for Python** -- NOT an ORM. It provides flexible connectivity with consistent interfaces across 15+ database adapters. Write raw SQL, use the builder API, or load SQL from files. All statements pass through a sqlglot-powered AST pipeline for validation and dialect conversion.
references/adapters.mdreferences/architecture.mdreferences/arrow.mdreferences/dishka-integration.mdreferences/driver_api.mdreferences/events.mdreferences/extensions.mdreferences/filters.mdreferences/loader.mdreferences/observability.mdreferences/patterns.mdreferences/query_builder.mdreferences/service-patterns.mdreferences/sqlglot.mdreferences/standards.mdreferences/storage.mdreferences/vector-search.mdGuides Payload CMS config (payload.config.ts), collections, fields, hooks, access control, APIs. Debugs validation errors, security, relationships, queries, transactions, hook behavior.
Builds scalable data pipelines, modern data warehouses, and real-time streaming architectures using Spark, dbt, Airflow, Kafka, and cloud platforms like Snowflake, BigQuery.
Builds production Apache Airflow DAGs with best practices for operators, sensors, testing, and deployment. For data pipelines, workflow orchestration, and batch job scheduling.
SQLSpec is a type-safe SQL query mapper for Python -- NOT an ORM. It provides flexible connectivity with consistent interfaces across 15+ database adapters. Write raw SQL, use the builder API, or load SQL from files. All statements pass through a sqlglot-powered AST pipeline for validation and dialect conversion.
from __future__ import annotations rule — SQLSpec adapter config modules and driver definitions avoid from __future__ import annotations because configs are introspected at runtime. Consumer application modules (handlers, services, tests that use a configured driver) MAY and typically SHOULD use it — canonical Litestar apps use it in 100+ files.from sqlspec.adapters.asyncpg import AsyncPGConfig, AsyncPGDriver
# Configure the adapter with connection details
config = AsyncPGConfig(
dsn="postgresql://user:pass@localhost:5432/mydb",
pool_min_size=2,
pool_max_size=10,
)
# Use the driver as a context manager for connection lifecycle
async with config.create_driver() as db:
users = await db.select_many(
"SELECT * FROM users WHERE active = $1",
[True],
schema_type=User,
)
from sqlspec import sql
# SELECT with filters
stmt = (
sql.select("id", "name", "email")
.from_("users")
.where_eq("status", "active")
.where("created_at > :since", since=cutoff_date)
.order_by("created_at", desc=True)
.limit(50)
.to_statement()
)
# INSERT
stmt = (
sql.insert_into("users")
.columns("name", "email")
.values(name="Alice", email="alice@example.com")
.to_statement()
)
# MERGE / upsert
stmt = (
sql.merge_into("inventory")
.using("updates", on="inventory.product_id = updates.product_id")
.when_matched().do_update(qty="updates.qty")
.when_not_matched().do_insert(product_id="updates.product_id", qty="updates.qty")
.to_statement()
)
| Method | Returns | Use Case |
|---|---|---|
select_value() | Single scalar | COUNT(*), MAX(), existence checks |
select_one() | One row (strict) | Get-by-ID, raises NotFoundError |
select_one_or_none() | One row or None | Optional lookup |
select_many() | List of rows | Filtered queries, listing |
select_to_arrow() | pyarrow.Table | Bulk data export, analytics |
execute() | Row count | INSERT/UPDATE/DELETE |
execute_many() | Row count | Batch operations |
# Zero-copy on DuckDB, ADBC adapters; conversion on others
arrow_table = await db.select_to_arrow(
"SELECT * FROM large_dataset WHERE region = $1", [region]
)
# Bulk load from Arrow
await db.copy_from_arrow(arrow_table, target_table="users")
<workflow>
| Need | Adapter | Key Feature |
|---|---|---|
| PostgreSQL async | asyncpg, psycopg | Async, NUMERIC/PYFORMAT params |
| PostgreSQL sync | psycopg | Sync+async, PYFORMAT params |
| SQLite | sqlite, aiosqlite | QMARK params, local dev |
| DuckDB analytics | duckdb | Arrow-native, zero-copy |
| MySQL async | asyncmy | PYFORMAT params |
| Oracle | oracledb | NAMED_COLON params, sync+async |
| BigQuery / Spanner | bigquery, spanner | NAMED_AT params |
| Raw SQL strings | Driver methods | select_many(), execute() |
| Dynamic queries | Query builder | sql.select()...to_statement() |
| SQL from files | SQLFileLoader | Metadata directives, caching |
create_driver() context manager for connection lifecycleschema_type parameter for typed results (Pydantic or msgspec models)LimitOffsetFilter, OrderByFilter, SearchFilterRun through the validation checkpoint below before considering the work complete.
</workflow> <guardrails>schema_type for query results -- get typed objects, not raw dictsasync with config.create_driver() as db:SQLFileLoader for static queries -- keeps SQL out of Python, enables caching$1 for asyncpg, %s for psycopg, ? for sqlite, :name for oracledbfrom __future__ import annotations. Consumer app modules MAY use it.Before delivering SQLSpec code, verify:
sqlspec.adapters.<name>)create_driver() context managerschema_type for type-safe mappingLimitOffsetFilter, etc.) not manual LIMIT/OFFSETTask: "Set up an asyncpg adapter, define a typed model, and execute a parameterized query with pagination."
from dataclasses import dataclass
from sqlspec.adapters.asyncpg import AsyncPGConfig
from sqlspec.core.filters import LimitOffsetFilter, OrderByFilter
# --- Typed model ---
@dataclass
class User:
id: int
name: str
email: str
active: bool
# --- Adapter setup ---
config = AsyncPGConfig(
dsn="postgresql://user:pass@localhost:5432/mydb",
pool_min_size=2,
pool_max_size=10,
)
# --- Query execution ---
async def list_active_users(page: int = 1, page_size: int = 25) -> list[User]:
filters = [
OrderByFilter(columns=[("name", "asc")]),
LimitOffsetFilter(limit=page_size, offset=(page - 1) * page_size),
]
async with config.create_driver() as db:
users = await db.select_many(
"SELECT id, name, email, active FROM users WHERE active = $1",
[True],
*filters,
schema_type=User,
)
return users
async def get_user_count() -> int:
async with config.create_driver() as db:
count = await db.select_value(
"SELECT COUNT(*) FROM users WHERE active = $1", [True]
)
return count
</example>
The sql factory provides a fluent builder API with full method chaining. All builders terminate with .to_statement() and pass through sqlglot for validation and dialect conversion.
| Builder | Entry Point | Key Methods |
|---|---|---|
| SELECT | sql.select(*cols) | .from_(), .where(), .where_eq(), .join(), .order_by(), .limit(), .offset() |
| INSERT | sql.insert_into(table) | .columns(), .values(), .returning() |
| UPDATE | sql.update(table) | .set_(), .where(), .returning() |
| DELETE | sql.delete_from(table) | .where(), .returning() |
| MERGE | sql.merge_into(target) | .using(), .when_matched(), .when_not_matched() |
| CREATE TABLE | sql.create_table(name) | .column(), .primary_key(), .if_not_exists() |
| DROP TABLE | sql.drop_table(name) | .if_exists(), .cascade() |
select_to_arrow() returns an Apache Arrow Table for bulk and analytical workloads:
copy_from_arrow(table, target_table) for bulk loads back into the databaseSQLSpec filter objects are passed directly to driver methods alongside the SQL string. They modify the statement before execution.
| Filter | Purpose | Example Use |
|---|---|---|
BeforeAfterFilter | Date range bounds (before, after) | Audit log queries, time-range pagination |
InCollectionFilter | SQL IN (...) clause | Filter by a set of IDs or enum values |
LimitOffsetFilter | Page-based pagination | limit=25, offset=50 |
OrderByFilter | Dynamic sort columns and direction | User-supplied sort fields |
SearchFilter | Text search (ILIKE / LIKE) | Full-text style search on string columns |
Filters are composable — pass multiple to a single select_many() call and they are applied in order.
| Framework | Integration | Key Feature |
|---|---|---|
| Litestar | SQLSpecPlugin | Dependency injection of typed driver; auto session lifecycle |
| FastAPI / Starlette | Middleware | Request-scoped connection; injects driver into route dependencies |
| Flask | Extension | init_app() pattern; driver available via g or current_app |
SQLSpecPlugin for Litestar registers the driver as a DI provider — inject it into route handlers via type annotation without manual context management.
For databases that support server-side pub/sub (e.g., PostgreSQL LISTEN/NOTIFY):
AsyncEventChannel to subscribe to named channelsNOTIFY channel, payload from SQL or from the publish() methodEventMessage objects with channel name, payload, and PIDSQL object holds all state for a given statementSQL object return new instancesChoosing between
sqlspecandadvanced-alchemy:advanced-alchemygives you an opinionated ORM service layer withUUIDAuditBase, lifecycle hooks, repository / service / Alembic integration, andOffsetPagination[T]out of the box — pick it when you want a complete CRUD surface with attribute-style row access and you're happy inside the SQLAlchemy ecosystem.sqlspecgives you direct SQL control, 15+ driver adapters (asyncpg, oracledb, DuckDB, BigQuery, SQLite, and more), Arrow-native result streams for analytics, and a builder API when you need it — pick it when you want explicit SQL, heterogeneous database backends, or Arrow integration. Both skills integrate with Litestar via first-party plugins; see../advanced-alchemy/SKILL.mdfor the ORM path.
For detailed instructions, patterns, and API guides, refer to the following documents:
copy=False pattern.sql factory: select, insert, update, delete, merge.select_value(), select_one(), select_many(), select_to_arrow().LimitOffsetFilter, OrderByFilter, SearchFilter.select_to_arrow() zero-copy, copy_from_arrow() bulk loading.SQLFileLoader with search paths, metadata directives.AsyncEventChannel, subscribe/publish patterns.