Use when implementing pgdbm database operations - provides complete AsyncDatabaseManager and DatabaseConfig API with all methods and parameters
Provides complete API reference for pgdbm's AsyncDatabaseManager, DatabaseConfig, and TransactionManager with all methods, parameters, and examples. Use when implementing database operations to access method signatures, return types, and usage patterns without external documentation.
/plugin marketplace add juanre/pgdbm/plugin install pgdbm@juanre-ai-toolsThis skill inherits all available tools. When active, it can use any tool Claude has access to.
Complete API reference for AsyncDatabaseManager, DatabaseConfig, and TransactionManager.
All signatures, parameters, return types, and usage examples. No documentation lookup needed.
# Pattern 1: Create own pool
AsyncDatabaseManager(config: DatabaseConfig)
# Pattern 2: Use external pool
AsyncDatabaseManager(
pool: asyncpg.Pool,
schema: Optional[str] = None
)
Rules:
config and poolschema only valid with external poolconnect() if using configconnect() if using external pool# Create shared pool (class method)
pool = await AsyncDatabaseManager.create_shared_pool(config: DatabaseConfig) -> asyncpg.Pool
# Connect (only for config-based init)
await db.connect() -> None
# Raises PoolError if using external pool
# Disconnect (only for config-based init)
await db.disconnect() -> None
# Does nothing if using external pool
All methods automatically apply {{tables.}} template substitution.
# Execute without return
await db.execute(
query: str,
*args: Any,
timeout: Optional[float] = None
) -> str
# Returns: asyncpg status string like "INSERT 0 1"
# Execute and return generated ID
await db.execute_and_return_id(
query: str,
*args: Any
) -> Any
# Automatically appends RETURNING id if not present
# Returns: The id value
# Fetch single value
await db.fetch_value(
query: str,
*args: Any,
column: int = 0,
timeout: Optional[float] = None
) -> Any
# Returns: Single value from result (or None)
# Fetch single row
await db.fetch_one(
query: str,
*args: Any,
timeout: Optional[float] = None
) -> Optional[dict[str, Any]]
# Returns: Dictionary of column->value (or None if no results)
# Fetch all rows
await db.fetch_all(
query: str,
*args: Any,
timeout: Optional[float] = None
) -> list[dict[str, Any]]
# Returns: List of dictionaries
# Batch execute (multiple parameter sets)
await db.executemany(
query: str,
args_list: list[tuple]
) -> None
# Executes same query with different parameter sets
# More efficient than looping execute()
Examples:
# execute_and_return_id - Common for inserts
user_id = await db.execute_and_return_id(
"INSERT INTO {{tables.users}} (email, name) VALUES ($1, $2)",
"alice@example.com",
"Alice"
)
# Automatically becomes: ... RETURNING id
# fetch_value with column parameter
email = await db.fetch_value(
"SELECT email, name FROM {{tables.users}} WHERE id = $1",
user_id,
column=0 # Get first column (email)
)
# executemany for batch inserts
users = [
("alice@example.com", "Alice"),
("bob@example.com", "Bob"),
("charlie@example.com", "Charlie"),
]
await db.executemany(
"INSERT INTO {{tables.users}} (email, name) VALUES ($1, $2)",
users
)
# Copy records (MUCH faster than INSERT for bulk data)
await db.copy_records_to_table(
table_name: str,
records: list[tuple],
columns: Optional[list[str]] = None
) -> int
# Uses PostgreSQL COPY command
# Returns: Number of records copied
# Example
records = [
("alice@example.com", "Alice"),
("bob@example.com", "Bob"),
]
count = await db.copy_records_to_table(
"users", # Don't use {{tables.}} here - just table name
records=records,
columns=["email", "name"]
)
# Returns: 2
from pydantic import BaseModel
class User(BaseModel):
id: int
email: str
name: str
# Fetch single row as model
user = await db.fetch_as_model(
User,
query: str,
*args: Any,
timeout: Optional[float] = None
) -> Optional[User]
# Fetch all rows as models
users = await db.fetch_all_as_model(
User,
query: str,
*args: Any,
timeout: Optional[float] = None
) -> list[User]
# Example
user = await db.fetch_as_model(
User,
"SELECT * FROM {{tables.users}} WHERE id = $1",
user_id
)
# Returns: User(id=1, email="alice@example.com", name="Alice")
# Check if table exists
exists = await db.table_exists(table_name: str) -> bool
# Examples
exists = await db.table_exists("users") # Check in current schema
exists = await db.table_exists("other_schema.users") # Check in specific schema
# Debug template substitution (useful for troubleshooting)
prepared = db.prepare_query(query: str) -> str
# Example - see how templates expand for this manager
print(db.prepare_query("SELECT * FROM {{tables.users}}"))
# With schema="myapp": 'SELECT * FROM "myapp".users'
# Without schema: 'SELECT * FROM users'
Note: In shared-pool mode, pgdbm does NOT change search_path. Schema isolation happens via template substitution at query time, not connection configuration.
# Create transaction context
async with db.transaction() as tx:
# tx has same API as db (execute, fetch_one, fetch_all, etc.)
user_id = await tx.fetch_value(
"INSERT INTO {{tables.users}} (email) VALUES ($1) RETURNING id",
email
)
await tx.execute(
"INSERT INTO {{tables.profiles}} (user_id) VALUES ($1)",
user_id
)
# Auto-commits on success, rolls back on exception
# Nested transactions (savepoints)
async with db.transaction() as tx:
await tx.execute("INSERT INTO {{tables.users}} ...")
async with tx.transaction() as nested:
await nested.execute("UPDATE {{tables.users}} ...")
# Nested transaction uses SAVEPOINT
# Get pool statistics
stats = await db.get_pool_stats() -> dict[str, Any]
# Returns: {
# "status": "connected",
# "min_size": 10,
# "max_size": 50,
# "size": 15, # Current total connections
# "free_size": 10, # Idle connections
# "used_size": 5, # Active connections
# "database": "myapp",
# "schema": "myschema",
# "pid": 12345,
# "version": "PostgreSQL 15.3"
# }
# Add prepared statement (performance optimization)
db.add_prepared_statement(
name: str,
query: str
) -> None
# Prepared statements created on all connections in pool
# Improves performance for frequently-used queries
# Acquire connection directly (advanced)
async with db.acquire() as conn:
# conn is raw asyncpg connection
# Use for operations not covered by AsyncDatabaseManager
await conn.execute("...")
from pgdbm import DatabaseConfig
config = DatabaseConfig(
# Connection (either connection_string OR individual params)
connection_string: Optional[str] = None, # e.g., "postgresql://user:pass@host/db"
host: str = "localhost",
port: int = 5432,
database: str = "postgres",
user: str = "postgres",
password: Optional[str] = None,
schema: Optional[str] = None, # Alias: schema_name
# Connection Pool
min_connections: int = 10,
max_connections: int = 20,
max_queries: int = 50000, # Queries per connection before recycling
max_inactive_connection_lifetime: float = 300.0, # Seconds
command_timeout: float = 60.0, # Default query timeout (seconds)
# Connection Initialization
server_settings: Optional[dict[str, str]] = None, # PostgreSQL settings
init_commands: Optional[list[str]] = None, # Run on each connection
# TLS/SSL Configuration
ssl_enabled: bool = False,
ssl_mode: Optional[str] = None, # 'require', 'verify-ca', 'verify-full'
ssl_ca_file: Optional[str] = None, # Path to CA certificate
ssl_cert_file: Optional[str] = None, # Path to client certificate
ssl_key_file: Optional[str] = None, # Path to client key
ssl_key_password: Optional[str] = None, # Key password if encrypted
# Server-Side Timeouts (milliseconds, None to disable)
statement_timeout_ms: Optional[int] = 60000, # Abort long queries
idle_in_transaction_session_timeout_ms: Optional[int] = 60000, # Abort idle transactions
lock_timeout_ms: Optional[int] = 5000, # Abort lock waits
# Retry Configuration
retry_attempts: int = 3,
retry_delay: float = 1.0, # Initial delay (seconds)
retry_backoff: float = 2.0, # Exponential backoff multiplier
retry_max_delay: float = 30.0, # Maximum delay (seconds)
)
Development:
config = DatabaseConfig(
connection_string="postgresql://localhost/myapp_dev",
min_connections=2,
max_connections=10,
)
Production with TLS:
config = DatabaseConfig(
connection_string="postgresql://db.example.com/myapp",
min_connections=20,
max_connections=100,
ssl_enabled=True,
ssl_mode="verify-full",
ssl_ca_file="/etc/ssl/certs/ca.pem",
statement_timeout_ms=30000, # 30 second timeout
lock_timeout_ms=5000, # 5 second lock timeout
)
Custom initialization:
config = DatabaseConfig(
connection_string="postgresql://localhost/myapp",
init_commands=[
"SET timezone TO 'UTC'",
"SET statement_timeout TO '30s'",
],
server_settings={
"jit": "off", # Disable JIT compilation
"application_name": "myapp",
},
)
Same API as AsyncDatabaseManager but within transaction context:
async with db.transaction() as tx:
# All methods available
await tx.execute(query, *args, timeout=None) -> str
await tx.executemany(query, args_list) -> None
await tx.fetch_one(query, *args, timeout=None) -> Optional[dict]
await tx.fetch_all(query, *args, timeout=None) -> list[dict]
await tx.fetch_value(query, *args, column=0, timeout=None) -> Any
# Nested transactions (savepoints)
async with tx.transaction() as nested_tx:
...
# Access underlying connection
conn = tx.connection # Property, not method
| Method | Parameters | Returns | Use Case |
|---|---|---|---|
execute | query, *args, timeout | str | No results needed |
execute_and_return_id | query, *args | Any | INSERT with auto RETURNING id |
executemany | query, args_list | None | Batch execute same query |
fetch_value | query, *args, column, timeout | Any | Single value |
fetch_one | query, *args, timeout | dict|None | Single row |
fetch_all | query, *args, timeout | list[dict] | Multiple rows |
fetch_as_model | model, query, *args, timeout | Model|None | Single row as Pydantic |
fetch_all_as_model | model, query, *args, timeout | list[Model] | Rows as Pydantic |
copy_records_to_table | table, records, columns | int | Bulk COPY (fast) |
table_exists | table_name | bool | Schema checking |
prepare_query | query | str | Debug template expansion |
transaction | - | TransactionManager | Transaction context |
get_pool_stats | - | dict | Pool monitoring |
add_prepared_statement | name, query | None | Performance optimization |
acquire | - | Connection | Advanced: raw connection |
connect | - | None | Initialize pool (config-based only) |
disconnect | - | None | Close pool (config-based only) |
create_shared_pool | config | asyncpg.Pool | Class method: create shared pool |
Compatibility aliases
fetch_val(...) → fetch_value(...)execute_many(...) → executemany(...)| Method | Parameters | Returns |
|---|---|---|
execute | query, *args, timeout | str |
executemany | query, args_list | None |
fetch_value | query, *args, column, timeout | Any |
fetch_one | query, *args, timeout | dict|None |
fetch_all | query, *args, timeout | list[dict] |
transaction | - | TransactionManager (nested) |
connection | - | Connection (property) |
Note: TransactionManager does NOT have:
Use regular fetch_value for IDs within transactions.
All query methods support template substitution:
# Available templates
{{tables.tablename}} # → "schema".tablename (or tablename if no schema)
{{schema}} # → "schema" (or empty)
# Example
query = "SELECT * FROM {{tables.users}} WHERE created_at > $1"
# With schema="myapp"
# Becomes: SELECT * FROM "myapp".users WHERE created_at > $1
# Without schema
# Becomes: SELECT * FROM users WHERE created_at > $1
# Insert and get ID
user_id = await db.execute_and_return_id(
"INSERT INTO {{tables.users}} (email, name) VALUES ($1, $2)",
"alice@example.com",
"Alice"
)
# Fetch single value
count = await db.fetch_value(
"SELECT COUNT(*) FROM {{tables.users}}"
)
# Fetch with specific column
email = await db.fetch_value(
"SELECT email, name FROM {{tables.users}} WHERE id = $1",
user_id,
column=0 # Get email (first column)
)
# Fetch one row
user = await db.fetch_one(
"SELECT * FROM {{tables.users}} WHERE id = $1",
user_id
)
# user = {"id": 1, "email": "...", "name": "..."}
# Fetch all rows
users = await db.fetch_all(
"SELECT * FROM {{tables.users}} WHERE is_active = $1",
True
)
# users = [{"id": 1, ...}, {"id": 2, ...}]
# Execute without results
await db.execute(
"DELETE FROM {{tables.users}} WHERE id = $1",
user_id
)
# Check table exists
if await db.table_exists("users"):
print("Users table exists")
# executemany - same query, different params
users = [
("alice@example.com", "Alice"),
("bob@example.com", "Bob"),
("charlie@example.com", "Charlie"),
]
await db.executemany(
"INSERT INTO {{tables.users}} (email, name) VALUES ($1, $2)",
users
)
# copy_records_to_table - fastest for bulk data
records = [
("alice@example.com", "Alice"),
("bob@example.com", "Bob"),
# ... thousands more
]
count = await db.copy_records_to_table(
"users", # Just table name (template applied internally)
records=records,
columns=["email", "name"]
)
# Much faster than executemany for >1000 rows
from pydantic import BaseModel
class User(BaseModel):
id: int
email: str
name: str
is_active: bool = True
# Fetch as model
user = await db.fetch_as_model(
User,
"SELECT * FROM {{tables.users}} WHERE id = $1",
user_id
)
# user is User instance (typed)
# Fetch all as models
users = await db.fetch_all_as_model(
User,
"SELECT * FROM {{tables.users}} WHERE is_active = $1",
True
)
# users is list[User] (typed)
# Basic transaction
async with db.transaction() as tx:
user_id = await tx.fetch_value(
"INSERT INTO {{tables.users}} (email) VALUES ($1) RETURNING id",
email
)
await tx.execute(
"INSERT INTO {{tables.profiles}} (user_id, bio) VALUES ($1, $2)",
user_id,
"Bio text"
)
# Commits on success, rolls back on exception
# Nested transaction (savepoint)
async with db.transaction() as tx:
await tx.execute("INSERT INTO {{tables.users}} ...")
try:
async with tx.transaction() as nested:
await nested.execute("UPDATE {{tables.users}} SET risky_field = $1", value)
# This can rollback without affecting outer transaction
except Exception:
# Nested rolled back, outer transaction continues
pass
# Get pool statistics
stats = await db.get_pool_stats()
print(f"Total connections: {stats['size']}")
print(f"Active: {stats['used_size']}")
print(f"Idle: {stats['free_size']}")
print(f"Usage: {stats['used_size'] / stats['size']:.1%}")
# Monitor pool health
usage = stats['used_size'] / stats['size']
if usage > 0.8:
logger.warning(f"High pool usage: {usage:.1%}")
# Add frequently-used query as prepared statement
db.add_prepared_statement(
"get_user_by_email",
"SELECT * FROM {{tables.users}} WHERE email = $1"
)
# Prepared statements are created on all pool connections
# Improves performance for queries executed repeatedly
# Use connection_string (recommended)
config = DatabaseConfig(
connection_string="postgresql://user:pass@host:port/database"
)
# OR use individual parameters
config = DatabaseConfig(
host="localhost",
port=5432,
database="myapp",
user="postgres",
password="secret",
schema="myschema", # Optional schema
)
config = DatabaseConfig(
connection_string="...",
# Pool sizing (start small, tune based on metrics)
min_connections=5, # Pool floor - connections opened eagerly
max_connections=20, # Pool cap - keep under DB's max_connections
# Connection lifecycle
max_queries=50000, # Queries before recycling connection
max_inactive_connection_lifetime=300.0, # Seconds before closing idle
command_timeout=60.0, # Default query timeout (seconds)
)
config = DatabaseConfig(
connection_string="postgresql://db.example.com/myapp",
# Enable SSL
ssl_enabled=True,
ssl_mode="verify-full", # 'require', 'verify-ca', 'verify-full'
# Certificate files
ssl_ca_file="/etc/ssl/certs/ca.pem",
ssl_cert_file="/etc/ssl/certs/client.crt", # For mutual TLS
ssl_key_file="/etc/ssl/private/client.key",
ssl_key_password="keypass", # If key is encrypted
)
SSL Modes:
require: Encrypt connection (don't verify certificate)verify-ca: Verify certificate is signed by trusted CAverify-full: Verify certificate AND hostname matchPrevent runaway queries and stuck transactions:
config = DatabaseConfig(
connection_string="...",
# Timeouts in milliseconds (None to disable)
statement_timeout_ms=30000, # Abort queries >30 seconds
idle_in_transaction_session_timeout_ms=60000, # Abort idle transactions >1 minute
lock_timeout_ms=5000, # Abort lock waits >5 seconds
)
Default values:
statement_timeout_ms: 60000 (60 seconds)idle_in_transaction_session_timeout_ms: 60000lock_timeout_ms: 5000Set to None to disable.
config = DatabaseConfig(
connection_string="...",
# Custom server settings
server_settings={
"jit": "off", # Disable JIT (prevents latency spikes)
"application_name": "myapp",
"timezone": "UTC",
},
# Commands run on each new connection
init_commands=[
"SET timezone TO 'UTC'",
"SET work_mem TO '256MB'",
],
)
config = DatabaseConfig(
connection_string="...",
# Connection retry settings
retry_attempts=3, # Number of retries
retry_delay=1.0, # Initial delay (seconds)
retry_backoff=2.0, # Exponential backoff multiplier
retry_max_delay=30.0, # Maximum delay between retries
)
pgdbm:using-pgdbm, pgdbm:choosing-patternpgdbm:migrations-api-referencepgdbm:testing-database-code