Comprehensive pytest skill for async Python testing with proper mocking, fixtures, and patterns from production-ready test suites. Use when writing or improving async tests for Python applications, especially FastAPI backends with database interactions.
Provides comprehensive async pytest patterns for FastAPI backends with database mocking. Use when writing tests for async Python apps, especially DAO/service layers with database interactions.
/plugin marketplace add rafaelkamimura/claude-tools/plugin install rafaelkamimura-claude-tools@rafaelkamimura/claude-toolsThis skill inherits all available tools. When active, it can use any tool Claude has access to.
README.mdgenerate_test_template.pyExpert guidance for writing comprehensive async Python tests using pytest, based on production patterns from a 387-test FastAPI backend test suite.
Activate this skill when:
tests/
├── conftest.py # Shared fixtures (app, client, event_loop, faker)
├── fakes.py # Reusable mock objects (FakeConnection, FakeRecord)
├── test_<module>_dao.py # DAO layer tests
├── test_<module>_service.py # Service layer tests
├── test_<module>_router.py # API endpoint tests
└── test_<module>_dto.py # DTO validation tests
test_<module>_<layer>.pytest_<what>_<scenario> (e.g., test_create_calls_execute, test_fetch_by_id_error_maps_to_500)async def test_fetch_user_success(faker: Faker) -> None:
user_id: int = faker.random_int(1, 100)
conn: FakeConnection = FakeConnection()
# ...
import asyncio
import pytest
from fastapi.testclient import TestClient
from httpx import AsyncClient, ASGITransport
from faker import Faker
@pytest.fixture(scope='session')
def app():
"""Create a FastAPI app instance for testing."""
from src.config.factory import create_app
return create_app()
@pytest.fixture(scope='session')
def client(app):
"""Provides a synchronous TestClient for FastAPI."""
with TestClient(app) as c:
yield c
@pytest.fixture
async def async_client(app):
"""Provides an asynchronous AsyncClient for FastAPI using ASGI transport."""
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url='http://test') as ac:
yield ac
@pytest.fixture
def event_loop():
"""Create a new event loop for each test."""
loop = asyncio.new_event_loop()
yield loop
loop.close()
@pytest.fixture
def faker():
"""Provide a Faker instance configured for Brazilian Portuguese."""
return Faker('pt_BR') # Adjust locale as needed
class FakeRecord:
"""Simulate a database record with a .result() method and optional rowcount."""
def __init__(self, data, rowcount=None):
self._data = data
self.rowcount = rowcount if rowcount is not None else (
data if isinstance(data, int) else 1
)
def result(self):
return self._data
class FakeConnection:
"""Simulate a psqlpy/asyncpg Connection with execute, fetch, fetch_val, and fetch_row."""
def __init__(self):
self.execute_return = None
self.fetch_return = None
self.fetch_row_return = None
self.fetch_val_return = None
self.execute_calls = []
self.fetch_calls = []
self.fetch_val_calls = []
def transaction(self):
return FakeTransactionContext(self)
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc, tb):
return False
async def execute(self, stmt, parameters=None):
self.execute_calls.append((stmt, parameters))
if isinstance(self.execute_return, Exception):
raise self.execute_return
# Support list of return values for multiple execute calls
if (isinstance(self.execute_return, list) and
len(self.execute_return) > 0 and
all(isinstance(item, list) for item in self.execute_return)):
return FakeRecord(self.execute_return.pop(0))
return FakeRecord(self.execute_return)
async def execute_many(self, stmt, parameters_list=None):
"""Simulate execute_many for bulk operations."""
if parameters_list is None:
parameters_list = []
self.execute_calls.append((stmt, parameters_list))
if isinstance(self.execute_return, Exception):
raise self.execute_return
total_rows = len(parameters_list) if parameters_list else 0
return FakeRecord(data=total_rows, rowcount=total_rows)
async def fetch(self, stmt, parameters=None):
self.fetch_calls.append((stmt, parameters))
return FakeRecord(self.fetch_return)
async def fetch_val(self, stmt, parameters=None):
self.fetch_val_calls.append((stmt, parameters))
if isinstance(self.fetch_val_return, Exception):
raise self.fetch_val_return
return self.fetch_val_return
async def fetch_row(self, stmt, parameters=None):
"""Simulate fetching a single row."""
self.fetch_calls.append((stmt, parameters))
if isinstance(self.fetch_row_return, Exception):
raise self.fetch_row_return
if self.fetch_row_return is not None:
return self.fetch_row_return
if isinstance(self.fetch_return, list) and len(self.fetch_return) > 0:
return FakeRecord(self.fetch_return.pop(0))
return FakeRecord(self.fetch_return)
class FakeTransaction:
"""Simulate a database transaction context."""
def __init__(self, connection):
self.connection = connection
async def execute(self, stmt, parameters=None):
return await self.connection.execute(stmt, parameters)
async def execute_many(self, stmt, parameters_list=None, parameters=None):
"""Simulate execute_many - delegate to connection's execute_many if available."""
params = parameters if parameters is not None else parameters_list
if hasattr(self.connection, 'execute_many'):
return await self.connection.execute_many(stmt, params)
# Fallback: simulate by calling execute for each parameter set
if params is None:
params = []
results = []
for param_set in params:
result = await self.connection.execute(stmt, param_set)
results.append(result)
if results:
total_rowcount = sum(getattr(r, 'rowcount', 0) for r in results)
return FakeRecord(data=total_rowcount, rowcount=total_rowcount)
else:
return FakeRecord(data=0, rowcount=0)
async def fetch(self, stmt, parameters=None):
return await self.connection.fetch(stmt, parameters)
async def fetch_row(self, stmt, parameters=None):
return await self.connection.fetch_row(stmt, parameters)
async def fetch_val(self, stmt, parameters=None):
return await self.connection.fetch_val(stmt, parameters)
class FakeTransactionContext:
"""Simulate the transaction context manager returned by conn.transaction()."""
def __init__(self, connection):
self.connection = connection
self.transaction = FakeTransaction(connection)
async def __aenter__(self):
return self.transaction
async def __aexit__(self, exc_type, exc, tb):
return False
Use __wrapped__ to bypass connection decorators:
@pytest.mark.asyncio
async def test_create_calls_execute(faker):
"""Test that create method calls execute with correct SQL and parameters."""
# Arrange: Prepare test data
create_dto = UserDTO.Create(
name=faker.name(),
email=faker.email(),
cpf=faker.ssn()
)
conn = FakeConnection()
# Act: Call DAO method directly with __wrapped__
await UserDAO.create.__wrapped__(conn, create_dto)
# Assert: Verify execute was called with correct SQL
assert len(conn.execute_calls) == 1
stmt, params = conn.execute_calls[0]
assert 'INSERT INTO users' in stmt
assert isinstance(params, list)
assert len(params) == len(create_dto.model_dump())
@pytest.mark.asyncio
async def test_fetch_by_id_error_maps_to_500():
"""Test that database errors are properly mapped to DAOException."""
conn = FakeConnection()
async def broken_fetch_row(stmt, parameters=None):
raise RustPSQLDriverPyBaseError('db fail')
conn.fetch_row = broken_fetch_row
with pytest.raises(DAOException) as exc:
await UserDAO.fetch_by_id.__wrapped__(conn, 1)
err = exc.value
assert err.status_code == 500
assert 'Erro ao buscar' in err.detail
Create dummy dependencies for isolated testing:
class DummyUserAdapter:
"""Mock adapter for testing service layer."""
def __init__(self, users):
self.users = users
self.called = False
async def get_users_by_permission(self, _permission_id, _auth_header, _permission_scope):
self.called = True
return self.users
class DummyUserDAO:
"""Mock DAO for testing service layer."""
def __init__(self):
self.fetch_called = False
self.create_called = False
async def fetch_all(self):
self.fetch_called = True
return [UserDTO.Read(id=1, name='Test User', email='test@example.com')]
async def create(self, dto):
self.create_called = (dto,)
@pytest.mark.asyncio
async def test_service_coordinates_dao_and_adapter():
"""Test that service properly coordinates between DAO and adapter."""
adapter = DummyUserAdapter([])
dao = DummyUserDAO()
service = UserService(user_adapter=adapter, user_dao=dao)
result = await service.get_all_users()
assert dao.fetch_called
assert isinstance(result[0], UserDTO.Read)
@pytest.mark.asyncio
async def test_assign_with_dal_connection(monkeypatch, faker):
"""Test method that uses DAL connection wrapper."""
from src.domain.dal import DAL
conn = FakeConnection()
# Monkeypatch connection acquisition
async def fake_get_connection(cls):
return conn
monkeypatch.setattr(DAL, '_DAL__get_connection', classmethod(fake_get_connection))
# Stub other dependencies
async def fake_verify_scope(id_, scope_type):
return None
monkeypatch.setattr(UserDAO, '_verify_scope', fake_verify_scope)
# Prepare test data
dto = UserDTO.Assign(user_id=1, role_id=2)
# Call the actual DAO method (not __wrapped__)
await UserDAO.assign(10, dto)
# Verify execution
assert len(conn.execute_calls) > 0
@pytest.mark.asyncio
async def test_sync_calls_execute_many(faker):
"""Test that bulk sync uses execute_many for efficiency."""
items = [
UserDTO.Create(name=faker.name(), email=faker.email())
for _ in range(3)
]
conn = FakeConnection()
executed = []
async def fake_execute_many(stmt, parameters=None, **kwargs):
params = parameters if parameters is not None else kwargs.get('parameters_list')
executed.append((stmt, params))
# Patch transaction's execute_many
original_transaction = conn.transaction
async def patched_transaction():
t = await original_transaction().__aenter__()
t.execute_many = fake_execute_many
return t
class PatchedTransactionContext:
async def __aenter__(self):
return await patched_transaction()
async def __aexit__(self, exc_type, exc, tb):
return False
conn.transaction = lambda: PatchedTransactionContext()
await UserDAO.sync.__wrapped__(conn, items)
# Verify batch execution
assert len(executed) == 1
stmt, params = executed[0]
assert 'INSERT INTO users' in stmt
assert len(params[0]) == len(items)
@pytest.mark.asyncio
async def test_get_users_endpoint(async_client, monkeypatch):
"""Test GET /users endpoint returns proper response."""
# Mock the service layer
async def mock_get_users():
return [UserDTO.Read(id=1, name='Test', email='test@example.com')]
monkeypatch.setattr('src.api.path.users.UserService.get_all', mock_get_users)
# Make request
response = await async_client.get('/users')
# Assert response
assert response.status_code == 200
data = response.json()
assert len(data) == 1
assert data[0]['name'] == 'Test'
@pytest.mark.asyncio
async def test_multiple_queries_with_different_results(faker):
"""Test method that makes multiple queries with different expected results."""
conn = FakeConnection()
# Set up multiple return values (will be popped in order)
conn.execute_return = [
[{'id': 1, 'status': 'pending'}], # First query
[{'id': 2, 'status': 'approved'}] # Second query
]
# First call gets first result
result1 = await UserDAO.some_method.__wrapped__(conn, 1)
assert result1[0]['status'] == 'pending'
# Second call gets second result
result2 = await UserDAO.some_method.__wrapped__(conn, 2)
assert result2[0]['status'] == 'approved'
@pytest.mark.asyncio
@pytest.mark.parametrize('status,expected_count', [
('pending', 5),
('approved', 3),
('rejected', 2),
])
async def test_count_by_status(status, expected_count):
"""Test counting users by different status values."""
conn = FakeConnection()
conn.fetch_val_return = expected_count
result = await UserDAO.count_by_status.__wrapped__(conn, status)
assert result == expected_count
assert len(conn.fetch_val_calls) == 1
test_<action>_<scenario>pytest tests/test_your_module.py -vpytest --cov=src/domain/dao/your_module tests/test_your_module.pyscope='session' for expensive fixtures (app creation)scope='function' (default) for mutable fixturespytest -x to stop on first failure during developmentpytest tests/test_dao.py# Run all tests with coverage
pytest --cov=src --cov-report=html --cov-report=term
# Run only unit tests (fast)
pytest tests/ -m "not integration"
# Run with verbose output
pytest -v --tb=short
# Run specific test file
pytest tests/test_user_dao.py -v
# Run tests matching pattern
pytest -k "test_create" -v
"""Tests for UserDAO database access layer."""
from datetime import datetime
import pytest
from src.domain.dal.dao.user import UserDAO
from src.domain.dal.dao.exception import DAOException
from src.domain.dto.user import UserDTO
from tests.fakes import FakeConnection, FakeRecord
@pytest.mark.asyncio
async def test_create_inserts_user(faker):
"""Test that create method inserts user with correct parameters."""
create_dto = UserDTO.Create(
name=faker.name(),
email=faker.email(),
cpf=faker.ssn()
)
conn = FakeConnection()
await UserDAO.create.__wrapped__(conn, create_dto)
assert len(conn.execute_calls) == 1
stmt, params = conn.execute_calls[0]
assert 'INSERT INTO users' in stmt
assert params[0] == create_dto.name
@pytest.mark.asyncio
async def test_fetch_by_id_returns_user(faker):
"""Test that fetch_by_id returns properly formatted UserDTO."""
fake_row = {
'id': faker.random_int(1, 100),
'name': faker.name(),
'email': faker.email(),
'created_at': faker.date_time()
}
conn = FakeConnection()
conn.fetch_row_return = FakeRecord(fake_row)
result = await UserDAO.fetch_by_id.__wrapped__(conn, fake_row['id'])
assert result.id == fake_row['id']
assert result.name == fake_row['name']
assert isinstance(result, UserDTO.Read)
@pytest.mark.asyncio
async def test_fetch_by_id_raises_on_db_error():
"""Test that database errors are properly handled and mapped."""
conn = FakeConnection()
async def broken_fetch_row(stmt, parameters=None):
raise Exception('Connection lost')
conn.fetch_row = broken_fetch_row
with pytest.raises(DAOException) as exc:
await UserDAO.fetch_by_id.__wrapped__(conn, 1)
assert exc.value.status_code == 500
# Run single test
pytest tests/test_user_dao.py::test_create_inserts_user -v
# Run all tests in file
pytest tests/test_user_dao.py -v
# Run with coverage for specific module
pytest --cov=src/domain/dao/user tests/test_user_dao.py
# Stop on first failure
pytest -x tests/
# Show local variables on failure
pytest --showlocals tests/
# Run last failed tests
pytest --lf tests/
This skill provides production-proven patterns for async Python testing:
When in doubt, follow the "Arrange-Act-Assert" pattern and always verify both the happy path and error scenarios.
Creating algorithmic art using p5.js with seeded randomness and interactive parameter exploration. Use this when users request creating art using code, generative art, algorithmic art, flow fields, or particle systems. Create original algorithmic art rather than copying existing artists' work to avoid copyright violations.
Applies Anthropic's official brand colors and typography to any sort of artifact that may benefit from having Anthropic's look-and-feel. Use it when brand colors or style guidelines, visual formatting, or company design standards apply.
Create beautiful visual art in .png and .pdf documents using design philosophy. You should use this skill when the user asks to create a poster, piece of art, design, or other static piece. Create original visual designs, never copying existing artists' work to avoid copyright violations.