Guide for creating new Amplifier modules including protocol implementation, entry points, mount functions, and testing patterns. Use when creating new modules or understanding module architecture.
/plugin marketplace add drillan/amplifier-skills-plugin/plugin install amplifier-skills@amplifier-skills-marketplaceThis skill inherits all available tools. When active, it can use any tool Claude has access to.
Start here for building Amplifier modules.
This directory contains the authoritative guidance for building each type of Amplifier module. Each contract document explains:
| Module Type | Contract | Purpose |
|---|---|---|
| Provider | PROVIDER_CONTRACT.md | LLM backend integration |
| Tool | TOOL_CONTRACT.md | Agent capabilities |
| Hook | HOOK_CONTRACT.md | Lifecycle observation and control |
| Orchestrator | ORCHESTRATOR_CONTRACT.md | Agent loop execution strategy |
| Context | CONTEXT_CONTRACT.md | Conversation memory management |
All modules follow this pattern:
# 1. Implement the Protocol from interfaces.py
class MyModule:
# ... implement required methods
pass
# 2. Provide mount() function
async def mount(coordinator, config):
"""Initialize and register module."""
instance = MyModule(config)
await coordinator.mount("category", instance, name="my-module")
return instance # or cleanup function
# 3. Register entry point in pyproject.toml
# [project.entry-points."amplifier.modules"]
# my-module = "my_package:mount"
Protocols are in code, not docs:
amplifier_core/interfaces.pyamplifier_core/models.pyamplifier_core/message_models.py (Pydantic models for request/response envelopes)amplifier_core/content_models.py (dataclass types for events and streaming)These contract documents provide guidance that code cannot express. Always read the code docstrings first.
Verify your module before release:
# Structural validation
amplifier module validate ./my-module
See individual contract documents for type-specific validation requirements.
For ecosystem overview: amplifier
contract_type: module_specification module_type: tool contract_version: 1.0.0 last_modified: 2025-01-29 related_files:
Tools provide capabilities that agents can invoke during execution.
Tools extend agent capabilities beyond pure conversation:
Source: amplifier_core/interfaces.py lines 121-146
@runtime_checkable
class Tool(Protocol):
@property
def name(self) -> str:
"""Tool name for invocation."""
...
@property
def description(self) -> str:
"""Human-readable tool description."""
...
async def execute(self, input: dict[str, Any]) -> ToolResult:
"""
Execute tool with given input.
Args:
input: Tool-specific input parameters
Returns:
Tool execution result
"""
...
Source: amplifier_core/message_models.py
class ToolCall(BaseModel):
id: str # Unique ID for correlation
name: str # Tool name to invoke
arguments: dict[str, Any] # Tool-specific parameters
Source: amplifier_core/models.py
class ToolResult(BaseModel):
success: bool = True # Whether execution succeeded
output: Any | None = None # Tool output (typically str or dict)
error: dict[str, Any] | None = None # Error details if failed
async def mount(coordinator: ModuleCoordinator, config: dict) -> Tool | Callable | None:
"""
Initialize and register tool.
Returns:
- Tool instance
- Cleanup callable (for resource cleanup)
- None for graceful degradation
"""
tool = MyTool(config=config)
await coordinator.mount("tools", tool, name="my-tool")
return tool
[project.entry-points."amplifier.modules"]
my-tool = "my_tool:mount"
Tools must provide clear identification:
class MyTool:
@property
def name(self) -> str:
return "my_tool" # Used for invocation
@property
def description(self) -> str:
return "Performs specific action with given parameters."
Best practices:
name: Short, snake_case, unique across mounted toolsdescription: Clear explanation of what the tool does and expectsHandle inputs and return structured results:
async def execute(self, input: dict[str, Any]) -> ToolResult:
try:
# Validate input
required_param = input.get("required_param")
if not required_param:
return ToolResult(
success=False,
error={"message": "required_param is required"}
)
# Do the work
result = await self._do_work(required_param)
return ToolResult(
success=True,
output=result
)
except Exception as e:
return ToolResult(
success=False,
error={"message": str(e), "type": type(e).__name__}
)
Provide JSON schema for input validation:
def get_schema(self) -> dict:
"""Return JSON schema for tool input."""
return {
"type": "object",
"properties": {
"required_param": {
"type": "string",
"description": "Description of parameter"
},
"optional_param": {
"type": "integer",
"default": 10
}
},
"required": ["required_param"]
}
Tools receive configuration via Mount Plan:
tools:
- module: my-tool
source: git+https://github.com/org/my-tool@main
config:
max_size: 1048576
allowed_paths:
- /home/user/projects
See MOUNT_PLAN_SPECIFICATION.md for full schema.
Register lifecycle events:
coordinator.register_contributor(
"observability.events",
"my-tool",
lambda: ["my-tool:started", "my-tool:completed", "my-tool:error"]
)
Standard tool events emitted by orchestrators:
tool:pre - Before tool executiontool:post - After successful executiontool:error - On execution failureReference implementation: amplifier-module-tool-filesystem
Study this module for:
Additional examples:
mount() function with entry point in pyproject.tomlToolResult from execute()get_schema()Use test utilities from amplifier_core/testing.py:
from amplifier_core.testing import TestCoordinator, MockTool
@pytest.mark.asyncio
async def test_tool_execution():
tool = MyTool(config={})
result = await tool.execute({
"required_param": "value"
})
assert result.success
assert result.error is None
from amplifier_core.testing import MockTool
mock_tool = MockTool(
name="test_tool",
description="Test tool",
return_value="mock result"
)
# After use
assert mock_tool.call_count == 1
assert mock_tool.last_input == {...}
# Structural validation
amplifier module validate ./my-tool --type tool
Related: README.md | HOOK_CONTRACT.md
contract_type: module_specification module_type: provider contract_version: 1.0.0 last_modified: 2025-01-29 related_files:
Providers translate between Amplifier's unified message format and vendor-specific LLM APIs.
See PROVIDER_SPECIFICATION.md for complete implementation guidance including:
This contract document provides the quick-reference essentials. The specification contains the full details.
Source: amplifier_core/interfaces.py lines 54-119
@runtime_checkable
class Provider(Protocol):
@property
def name(self) -> str: ...
def get_info(self) -> ProviderInfo: ...
async def list_models(self) -> list[ModelInfo]: ...
async def complete(self, request: ChatRequest, **kwargs) -> ChatResponse: ...
def parse_tool_calls(self, response: ChatResponse) -> list[ToolCall]: ...
Note: ToolCall is from amplifier_core.message_models (see REQUEST_ENVELOPE_V1 for details)
async def mount(coordinator: ModuleCoordinator, config: dict) -> Provider | Callable | None:
"""
Initialize and return provider instance.
Returns:
- Provider instance (registered automatically)
- Cleanup callable (for resource cleanup on unmount)
- None for graceful degradation (e.g., missing API key)
"""
api_key = config.get("api_key") or os.environ.get("MY_API_KEY")
if not api_key:
logger.warning("No API key - provider not mounted")
return None
provider = MyProvider(api_key=api_key, config=config)
await coordinator.mount("providers", provider, name="my-provider")
async def cleanup():
await provider.client.close()
return cleanup
[project.entry-points."amplifier.modules"]
my-provider = "my_provider:mount"
Providers receive configuration via Mount Plan:
providers:
- module: my-provider
source: git+https://github.com/org/my-provider@main
config:
api_key: "${MY_API_KEY}"
default_model: model-v1
debug: true
See MOUNT_PLAN_SPECIFICATION.md for full schema.
Register custom events via contribution channels:
coordinator.register_contributor(
"observability.events",
"my-provider",
lambda: ["my-provider:rate_limit", "my-provider:retry"]
)
See CONTRIBUTION_CHANNELS.md for the pattern.
Reference implementation: amplifier-module-provider-anthropic
Study this module for:
mount() function with entry point in pyproject.tomlsignature in ThinkingBlock)Usage (input/output/total tokens)ChatResponse from complete()Use test utilities from amplifier_core/testing.py:
from amplifier_core.testing import TestCoordinator, create_test_coordinator
@pytest.mark.asyncio
async def test_provider_mount():
coordinator = create_test_coordinator()
cleanup = await mount(coordinator, {"api_key": "test-key"})
assert "my-provider" in coordinator.get_mounted("providers")
if cleanup:
await cleanup()
# Structural validation
amplifier module validate ./my-provider --type provider
Related: PROVIDER_SPECIFICATION.md | README.md
contract_type: module_specification module_type: hook contract_version: 1.0.0 last_modified: 2025-01-29 related_files:
Hooks observe, validate, and control agent lifecycle events.
Hooks enable:
See HOOKS_API.md for complete documentation including:
This contract provides the essentials. The API reference contains full details.
Source: amplifier_core/interfaces.py lines 205-220
@runtime_checkable
class HookHandler(Protocol):
async def __call__(self, event: str, data: dict[str, Any]) -> HookResult:
"""
Handle a lifecycle event.
Args:
event: Event name (e.g., "tool:pre", "session:start")
data: Event-specific data
Returns:
HookResult indicating action to take
"""
...
Source: amplifier_core/models.py
| Action | Behavior | Use Case |
|---|---|---|
continue | Proceed normally | Default, observation only |
deny | Block operation | Validation failure, security |
modify | Transform data | Preprocessing, enrichment |
inject_context | Add to agent's context | Feedback loops, corrections |
ask_user | Request approval | High-risk operations |
from amplifier_core.models import HookResult
# Simple observation
HookResult(action="continue")
# Block with reason
HookResult(action="deny", reason="Access denied")
# Inject feedback
HookResult(
action="inject_context",
context_injection="Found 3 linting errors...",
user_message="Linting issues detected"
)
# Request approval
HookResult(
action="ask_user",
approval_prompt="Allow write to production file?",
approval_default="deny"
)
async def mount(coordinator: ModuleCoordinator, config: dict) -> Callable | None:
"""
Initialize and register hook handlers.
Returns:
Cleanup callable to unregister handlers
"""
handlers = []
# Register handlers for specific events
handlers.append(
coordinator.hooks.register("tool:pre", my_validation_hook, priority=10)
)
handlers.append(
coordinator.hooks.register("tool:post", my_feedback_hook, priority=20)
)
# Return cleanup function
def cleanup():
for unregister in handlers:
unregister()
return cleanup
[project.entry-points."amplifier.modules"]
my-hook = "my_hook:mount"
Register handlers during mount():
from amplifier_core.hooks import HookRegistry
# Get registry from coordinator
registry: HookRegistry = coordinator.hooks
# Register with priority (lower = earlier)
unregister = registry.register(
event="tool:post",
handler=my_handler,
priority=10,
name="my_handler"
)
# Later: unregister()
| Event | Trigger | Data Includes |
|---|---|---|
session:start | Session created | session_id, config |
session:end | Session ending | session_id, stats |
prompt:submit | User input | prompt text |
tool:pre | Before tool execution | tool_name, tool_input |
tool:post | After tool execution | tool_name, tool_result |
tool:error | Tool failed | tool_name, error |
provider:request | LLM call starting | provider, messages |
provider:response | LLM call complete | provider, response, usage |
Hooks receive configuration via Mount Plan:
hooks:
- module: my-hook
source: git+https://github.com/org/my-hook@main
config:
enabled_events:
- "tool:pre"
- "tool:post"
log_level: "info"
See MOUNT_PLAN_SPECIFICATION.md for full schema.
Register custom events your hook emits:
coordinator.register_contributor(
"observability.events",
"my-hook",
lambda: ["my-hook:validation_failed", "my-hook:approved"]
)
See CONTRIBUTION_CHANNELS.md for the pattern.
Reference implementation: amplifier-module-hooks-logging
Study this module for:
Additional examples:
async def __call__(event, data) -> HookResultmount() function with entry point in pyproject.tomlHookResult for all code pathsUse test utilities from amplifier_core/testing.py:
from amplifier_core.testing import TestCoordinator, EventRecorder
from amplifier_core.models import HookResult
@pytest.mark.asyncio
async def test_hook_handler():
# Test handler directly
result = await my_validation_hook("tool:pre", {
"tool_name": "Write",
"tool_input": {"file_path": "/etc/passwd"}
})
assert result.action == "deny"
assert "denied" in result.reason.lower()
@pytest.mark.asyncio
async def test_hook_registration():
coordinator = TestCoordinator()
cleanup = await mount(coordinator, {})
# Verify handlers registered
# ... test event emission
cleanup()
from amplifier_core.testing import EventRecorder
recorder = EventRecorder()
# Use in tests
await recorder.record("tool:pre", {"tool_name": "Write"})
# Assert
events = recorder.get_events()
assert len(events) == 1
assert events[0][0] == "tool:pre" # events are (event_name, data) tuples
# Structural validation
amplifier module validate ./my-hook --type hook
Related: HOOKS_API.md | README.md
contract_type: module_specification module_type: orchestrator contract_version: 1.0.0 last_modified: 2025-01-29 related_files:
Orchestrators implement the agent execution loop strategy.
Orchestrators control how agents execute:
Key principle: The orchestrator is policy, not mechanism. Swap orchestrators to change agent behavior without modifying the kernel.
Source: amplifier_core/interfaces.py lines 26-52
@runtime_checkable
class Orchestrator(Protocol):
async def execute(
self,
prompt: str,
context: ContextManager,
providers: dict[str, Provider],
tools: dict[str, Tool],
hooks: HookRegistry,
) -> str:
"""
Execute the agent loop with given prompt.
Args:
prompt: User input prompt
context: Context manager for conversation state
providers: Available LLM providers (keyed by name)
tools: Available tools (keyed by name)
hooks: Hook registry for lifecycle events
Returns:
Final response string
"""
...
A typical orchestrator implements this flow:
User Prompt
↓
Add to Context
↓
┌─────────────────────────────────────┐
│ LOOP until response has no tools │
│ │
│ 1. emit("provider:request") │
│ 2. provider.complete(messages) │
│ 3. emit("provider:response") │
│ 4. Add response to context │
│ │
│ If tool_calls: │
│ for each tool_call: │
│ 5. emit("tool:pre") │
│ 6. tool.execute(input) │
│ 7. emit("tool:post") │
│ 8. Add result to context │
│ │
│ Continue loop... │
└─────────────────────────────────────┘
↓
Return final text response
async def mount(coordinator: ModuleCoordinator, config: dict) -> Orchestrator | Callable | None:
"""
Initialize and return orchestrator instance.
Returns:
- Orchestrator instance
- Cleanup callable
- None for graceful degradation
"""
orchestrator = MyOrchestrator(config=config)
await coordinator.mount("session", orchestrator, name="orchestrator")
return orchestrator
[project.entry-points."amplifier.modules"]
my-orchestrator = "my_orchestrator:mount"
Orchestrators must emit lifecycle events for observability:
async def execute(self, prompt, context, providers, tools, hooks):
# Before LLM call
await hooks.emit("provider:request", {
"provider": provider_name,
"messages": messages,
"model": model_name
})
response = await provider.complete(request)
# After LLM call
await hooks.emit("provider:response", {
"provider": provider_name,
"response": response,
"usage": response.usage
})
# Before tool execution
await hooks.emit("tool:pre", {
"tool_name": tool_call.name,
"tool_input": tool_call.input
})
result = await tool.execute(tool_call.input)
# After tool execution
await hooks.emit("tool:post", {
"tool_name": tool_call.name,
"tool_input": tool_call.input,
"tool_result": result
})
Handle HookResult actions:
# Before tool execution
pre_result = await hooks.emit("tool:pre", data)
if pre_result.action == "deny":
# Don't execute tool
return ToolResult(is_error=True, output=pre_result.reason)
if pre_result.action == "modify":
# Use modified data
data = pre_result.data
if pre_result.action == "inject_context":
# Add feedback to context
await context.add_message({
"role": pre_result.context_injection_role,
"content": pre_result.context_injection
})
if pre_result.action == "ask_user":
# Request approval (requires approval provider)
approved = await request_approval(pre_result)
if not approved:
return ToolResult(is_error=True, output="User denied")
Manage conversation state:
# Add user message
await context.add_message({"role": "user", "content": prompt})
# Add assistant response
await context.add_message({"role": "assistant", "content": response.content})
# Add tool result
await context.add_message({
"role": "tool",
"tool_call_id": tool_call.id,
"content": result.output
})
# Get messages for LLM request (context handles compaction internally)
messages = await context.get_messages_for_request()
Handle multiple providers:
# Get default or configured provider
provider_name = config.get("default_provider", list(providers.keys())[0])
provider = providers[provider_name]
# Or allow per-request provider selection
provider_name = request_options.get("provider", default_provider_name)
Orchestrators receive configuration via Mount Plan:
session:
orchestrator: my-orchestrator
context: context-simple
# Orchestrator-specific config can be passed via providers/tools config
See MOUNT_PLAN_SPECIFICATION.md for full schema.
Register custom events your orchestrator emits:
coordinator.register_contributor(
"observability.events",
"my-orchestrator",
lambda: [
"my-orchestrator:loop_started",
"my-orchestrator:loop_iteration",
"my-orchestrator:loop_completed"
]
)
See CONTRIBUTION_CHANNELS.md for the pattern.
Reference implementation: amplifier-module-loop-basic
Study this module for:
Additional examples:
execute(prompt, context, providers, tools, hooks) -> strmount() function with entry point in pyproject.tomlUse test utilities from amplifier_core/testing.py:
from amplifier_core.testing import (
TestCoordinator,
MockTool,
MockContextManager,
ScriptedOrchestrator,
EventRecorder
)
@pytest.mark.asyncio
async def test_orchestrator_basic():
orchestrator = MyOrchestrator(config={})
context = MockContextManager()
providers = {"test": MockProvider()}
tools = {"test_tool": MockTool()}
hooks = HookRegistry()
result = await orchestrator.execute(
prompt="Test prompt",
context=context,
providers=providers,
tools=tools,
hooks=hooks
)
assert isinstance(result, str)
assert len(context.messages) > 0
from amplifier_core.testing import ScriptedOrchestrator
# For testing components that use orchestrators
orchestrator = ScriptedOrchestrator(responses=["Response 1", "Response 2"])
result = await orchestrator.execute(...)
assert result == "Response 1"
# Structural validation
amplifier module validate ./my-orchestrator --type orchestrator
Related: README.md | CONTEXT_CONTRACT.md
contract_type: module_specification module_type: context contract_version: 2.1.0 last_modified: 2026-01-01 related_files:
Context managers handle conversation memory and message storage.
Context managers control what the agent remembers:
Key principle: The context manager owns policy for memory. The orchestrator asks for messages; the context manager decides how to fit them within limits. Swap context managers to change memory behavior without modifying orchestrators.
Mechanism vs Policy: Orchestrators provide the mechanism (request messages, make LLM calls). Context managers provide the policy (what to return, when to compact, how to fit within limits).
Source: amplifier_core/interfaces.py lines 148-180
@runtime_checkable
class ContextManager(Protocol):
async def add_message(self, message: dict[str, Any]) -> None:
"""Add a message to the context."""
...
async def get_messages_for_request(
self,
token_budget: int | None = None,
provider: Any | None = None,
) -> list[dict[str, Any]]:
"""
Get messages ready for an LLM request.
The context manager handles any compaction needed internally.
Returns messages that fit within the token budget.
Args:
token_budget: Optional explicit token limit (deprecated, prefer provider).
provider: Optional provider instance for dynamic budget calculation.
If provided, budget = context_window - max_output_tokens - safety_margin.
Returns:
Messages ready for LLM request, compacted if necessary.
"""
...
async def get_messages(self) -> list[dict[str, Any]]:
"""Get all messages (raw, uncompacted) for transcripts/debugging."""
...
async def set_messages(self, messages: list[dict[str, Any]]) -> None:
"""Set messages directly (for session resume)."""
...
async def clear(self) -> None:
"""Clear all messages."""
...
Messages follow a standard structure:
# User message
{
"role": "user",
"content": "User's input text"
}
# Assistant message
{
"role": "assistant",
"content": "Assistant's response"
}
# Assistant message with tool calls
{
"role": "assistant",
"content": None,
"tool_calls": [
{
"id": "call_123",
"type": "function",
"function": {"name": "read_file", "arguments": "{...}"}
}
]
}
# System message
{
"role": "system",
"content": "System instructions"
}
# Tool result
{
"role": "tool",
"tool_call_id": "call_123",
"content": "Tool output"
}
async def mount(coordinator: ModuleCoordinator, config: dict) -> ContextManager | Callable | None:
"""
Initialize and return context manager instance.
Returns:
- ContextManager instance
- Cleanup callable
- None for graceful degradation
"""
context = MyContextManager(
max_tokens=config.get("max_tokens", 100000),
compaction_threshold=config.get("compaction_threshold", 0.8)
)
await coordinator.mount("session", context, name="context")
return context
[project.entry-points."amplifier.modules"]
my-context = "my_context:mount"
Store messages with proper validation:
async def add_message(self, message: dict[str, Any]) -> None:
"""Add a message to the context."""
# Validate required fields
if "role" not in message:
raise ValueError("Message must have 'role' field")
# Store message
self._messages.append(message)
# Track token count (approximate)
self._token_count += self._estimate_tokens(message)
Return messages ready for LLM request, handling compaction internally:
async def get_messages_for_request(
self,
token_budget: int | None = None,
provider: Any | None = None,
) -> list[dict[str, Any]]:
"""
Get messages ready for an LLM request.
Handles compaction internally if needed. Orchestrators call this
before every LLM request and trust the context manager to return
messages that fit within limits.
Args:
token_budget: Optional explicit token limit (deprecated, prefer provider).
provider: Optional provider instance for dynamic budget calculation.
If provided, budget = context_window - max_output_tokens - safety_margin.
"""
budget = self._calculate_budget(token_budget, provider)
# Check if compaction needed
if self._token_count > (budget * self._compaction_threshold):
await self._compact_internal()
return list(self._messages) # Return copy to prevent mutation
def _calculate_budget(self, token_budget: int | None, provider: Any | None) -> int:
"""Calculate effective token budget from provider or fallback to config."""
# Explicit budget takes precedence (for backward compatibility)
if token_budget is not None:
return token_budget
# Try provider-based dynamic budget
if provider is not None:
try:
info = provider.get_info()
defaults = info.defaults or {}
context_window = defaults.get("context_window")
max_output_tokens = defaults.get("max_output_tokens")
if context_window and max_output_tokens:
safety_margin = 1000 # Buffer to avoid hitting hard limits
return context_window - max_output_tokens - safety_margin
except Exception:
pass # Fall back to configured max_tokens
return self._max_tokens
Return all messages for transcripts/debugging (no compaction):
async def get_messages(self) -> list[dict[str, Any]]:
"""Get all messages (raw, uncompacted) for transcripts/debugging."""
return list(self._messages) # Return copy to prevent mutation
Set messages directly for session resume:
async def set_messages(self, messages: list[dict[str, Any]]) -> None:
"""Set messages directly (for session resume)."""
self._messages = list(messages)
self._token_count = sum(self._estimate_tokens(m) for m in self._messages)
File-Based Context Managers - Special Behavior:
For context managers with persistent file storage (like context-persistent), the behavior on session resume is different:
async def set_messages(self, messages: list[dict[str, Any]]) -> None:
"""
Set messages - behavior depends on whether we loaded from file.
If we already loaded from our own file (session resume):
- IGNORE this call to preserve our complete history
- CLI's filtered transcript would lose system/developer messages
If this is a fresh session or migration:
- Accept the messages and write to our file
"""
if self._loaded_from_file:
# Already have complete history - ignore CLI's filtered transcript
logger.info("Ignoring set_messages - loaded from persistent file")
return
# Fresh session: accept messages
self._messages = list(messages)
self._write_to_file()
Why This Pattern?:
SessionStore saves a filtered transcript (no system/developer messages)Reset context state:
async def clear(self) -> None:
"""Clear all messages."""
self._messages = []
self._token_count = 0
Compaction is an internal implementation detail of the context manager. It happens automatically when get_messages_for_request() is called and the context exceeds thresholds.
Critical Design Principle: Compaction MUST be ephemeral - it returns a compacted VIEW without modifying the stored history.
┌─────────────────────────────────────────────────────────────────┐
│ NON-DESTRUCTIVE COMPACTION │
├─────────────────────────────────────────────────────────────────┤
│ │
│ messages[] get_messages_for_request() │
│ ┌──────────┐ ┌──────────┐ │
│ │ msg 1 │ │ msg 1 │ (compacted view) │
│ │ msg 2 │ ──────────▶ │ [summ] │ │
│ │ msg 3 │ ephemeral │ msg N │ │
│ │ ... │ compaction └──────────┘ │
│ │ msg N │ │
│ └──────────┘ get_messages() │
│ │ ┌──────────┐ │
│ │ │ msg 1 │ (FULL history) │
│ └───────────────────▶ │ msg 2 │ │
│ unchanged │ msg 3 │ │
│ │ ... │ │
│ │ msg N │ │
│ └──────────┘ │
│ │
│ Key: Internal state is NEVER modified by compaction. │
│ Compaction produces temporary views for LLM requests. │
│ Full history is always available via get_messages(). │
│ │
└─────────────────────────────────────────────────────────────────┘
Why Non-Destructive?:
Implementation Pattern:
async def get_messages_for_request(self, token_budget=None, provider=None):
"""Return compacted VIEW without modifying internal state."""
budget = self._calculate_budget(token_budget, provider)
# Read current messages (don't modify)
messages = list(self._messages) # Copy!
# Check if compaction needed
token_count = self._count_tokens(messages)
if not self._should_compact(token_count, budget):
return messages
# Compact EPHEMERALLY - return compacted copy
return self._compact_messages(messages, budget) # Returns NEW list
async def get_messages(self):
"""Return FULL history (never compacted)."""
return list(self._messages) # Always complete
Critical: During compaction, tool_use and tool_result messages must be kept together. Separating them causes LLM API errors.
async def _compact_internal(self) -> None:
"""Internal compaction - preserves tool pairs."""
# Emit pre-compaction event
await self._hooks.emit("context:pre_compact", {
"message_count": len(self._messages),
"token_count": self._token_count
})
# Build tool_call_id -> tool_use index map
tool_use_ids = set()
for msg in self._messages:
if msg.get("role") == "assistant" and msg.get("tool_calls"):
for tc in msg["tool_calls"]:
tool_use_ids.add(tc.get("id"))
# Identify which tool results have matching tool_use
orphan_result_indices = []
for i, msg in enumerate(self._messages):
if msg.get("role") == "tool":
if msg.get("tool_call_id") not in tool_use_ids:
orphan_result_indices.append(i)
# Strategy: Keep system messages + recent messages
# But ensure we don't split tool pairs
system_messages = [m for m in self._messages if m["role"] == "system"]
# Find safe truncation point (not in middle of tool sequence)
keep_count = self._keep_recent
recent_start = max(0, len(self._messages) - keep_count)
# Adjust start to not split tool sequences
while recent_start > 0:
msg = self._messages[recent_start]
if msg.get("role") == "tool":
# This is a tool result - need to include the tool_use before it
recent_start -= 1
else:
break
recent_messages = self._messages[recent_start:]
self._messages = system_messages + recent_messages
self._token_count = sum(self._estimate_tokens(m) for m in self._messages)
# Emit post-compaction event
await self._hooks.emit("context:post_compact", {
"message_count": len(self._messages),
"token_count": self._token_count
})
Different strategies for different use cases:
Keep N most recent messages (with tool pair preservation):
# Find safe truncation point
keep_from = len(self._messages) - keep_count
# Adjust to not split tool pairs
while keep_from > 0 and self._messages[keep_from].get("role") == "tool":
keep_from -= 1
self._messages = self._messages[keep_from:]
Use LLM to summarize older messages:
# Summarize old messages
old_messages = self._messages[:-keep_recent]
summary = await summarize(old_messages)
# Replace with summary
self._messages = [
{"role": "system", "content": f"Previous conversation summary: {summary}"},
*self._messages[-keep_recent:]
]
Keep messages based on importance score:
scored = [(m, self._score_importance(m)) for m in self._messages]
scored.sort(key=lambda x: x[1], reverse=True)
# Keep high-importance messages, but preserve tool pairs
self._messages = self._reorder_preserving_tool_pairs(
[m for m, _ in scored[:keep_count]]
)
Context managers receive configuration via Mount Plan:
session:
orchestrator: loop-basic
context: my-context
# Context config can be passed via top-level config
See MOUNT_PLAN_SPECIFICATION.md for full schema.
Register compaction events:
coordinator.register_contributor(
"observability.events",
"my-context",
lambda: ["context:pre_compact", "context:post_compact"]
)
Standard events to emit:
context:pre_compact - Before compaction (include message_count, token_count)context:post_compact - After compaction (include new counts)See CONTRIBUTION_CHANNELS.md for the pattern.
Reference implementation: amplifier-module-context-simple
Study this module for:
Additional examples:
mount() function with entry point in pyproject.tomlget_messages_for_request() handles compaction internallyUse test utilities from amplifier_core/testing.py:
from amplifier_core.testing import MockContextManager
@pytest.mark.asyncio
async def test_context_manager():
context = MyContextManager(max_tokens=1000)
# Add messages
await context.add_message({"role": "user", "content": "Hello"})
await context.add_message({"role": "assistant", "content": "Hi there!"})
# Get messages for request (may compact)
messages = await context.get_messages_for_request()
assert len(messages) == 2
assert messages[0]["role"] == "user"
# Get raw messages (no compaction)
raw_messages = await context.get_messages()
assert len(raw_messages) == 2
# Test clear
await context.clear()
assert len(await context.get_messages()) == 0
@pytest.mark.asyncio
async def test_compaction_preserves_tool_pairs():
"""Verify tool_use and tool_result stay together during compaction."""
context = MyContextManager(max_tokens=100, compaction_threshold=0.5)
# Add messages including tool sequence
await context.add_message({"role": "user", "content": "Read file.txt"})
await context.add_message({
"role": "assistant",
"content": None,
"tool_calls": [{"id": "call_123", "type": "function", "function": {...}}]
})
await context.add_message({
"role": "tool",
"tool_call_id": "call_123",
"content": "File contents..."
})
# Force compaction by adding more messages
for i in range(50):
await context.add_message({"role": "user", "content": f"Message {i}"})
# Get messages for request (triggers compaction)
messages = await context.get_messages_for_request()
# Verify tool pairs are preserved
tool_use_ids = set()
tool_result_ids = set()
for msg in messages:
if msg.get("tool_calls"):
for tc in msg["tool_calls"]:
tool_use_ids.add(tc.get("id"))
if msg.get("role") == "tool":
tool_result_ids.add(msg.get("tool_call_id"))
# Every tool result should have matching tool use
assert tool_result_ids.issubset(tool_use_ids), "Orphaned tool results found!"
@pytest.mark.asyncio
async def test_session_resume():
"""Verify set_messages works for session resume."""
context = MyContextManager(max_tokens=1000)
saved_messages = [
{"role": "user", "content": "Previous conversation"},
{"role": "assistant", "content": "Previous response"}
]
await context.set_messages(saved_messages)
messages = await context.get_messages()
assert len(messages) == 2
assert messages[0]["content"] == "Previous conversation"
from amplifier_core.testing import MockContextManager
# For testing orchestrators
context = MockContextManager()
await context.add_message({"role": "user", "content": "Test"})
messages = await context.get_messages_for_request()
# Access internal state for assertions
assert len(context.messages) == 1
# Structural validation
amplifier module validate ./my-context --type context
Related: README.md | ORCHESTRATOR_CONTRACT.md
Version: 1.0.0 Layer: Kernel Mechanism Status: Specification
The kernel provides a mechanism for custom module source resolution. The loader accepts an optional ModuleSourceResolver via mount point injection. If no resolver is provided, the kernel falls back to standard Python entry point discovery.
How modules are discovered and from where is app-layer policy.
class ModuleSource(Protocol):
"""Contract for module sources.
Implementations must resolve to a filesystem path where a Python module
can be imported.
"""
def resolve(self) -> Path:
"""
Resolve source to filesystem path.
Returns:
Path: Directory containing importable Python module
Raises:
ModuleNotFoundError: Source cannot be resolved
OSError: Filesystem access error
"""
Examples of conforming implementations (app-layer):
Kernel does NOT define these implementations. They are app-layer policy.
class ModuleSourceResolver(Protocol):
"""Contract for module source resolution strategies.
Implementations decide WHERE to find modules based on module ID and
optional profile hints.
"""
def resolve(self, module_id: str, profile_hint: Any = None) -> ModuleSource:
"""
Resolve module ID to a source.
Args:
module_id: Module identifier (e.g., "tool-bash")
profile_hint: Optional hint from profile configuration
(format defined by app layer)
Returns:
ModuleSource that can be resolved to a path
Raises:
ModuleNotFoundError: Module cannot be found by this resolver
"""
The resolver is app-layer policy. Different apps may use different resolution strategies:
Kernel does NOT define resolution strategy. It only provides the injection mechanism.
class ModuleLoader:
"""Kernel mechanism for loading modules.
Accepts optional ModuleSourceResolver via coordinator mount point.
Falls back to direct entry-point discovery if no resolver provided.
"""
def __init__(self, coordinator):
"""Initialize loader with coordinator."""
self.coordinator = coordinator
async def load(self, module_id: str, config: dict = None, profile_source = None):
"""
Load module using resolver or fallback to direct discovery.
Args:
module_id: Module identifier
config: Optional module configuration
profile_source: Optional source hint from profile/config
Raises:
ModuleNotFoundError: Module not found
ModuleLoadError: Module found but failed to load
"""
# Try to get resolver from mount point
source_resolver = None
if self.coordinator:
try:
source_resolver = self.coordinator.get("module-source-resolver")
except ValueError:
pass # No resolver mounted
if source_resolver is None:
# No resolver - use direct entry-point discovery
return await self._load_direct(module_id, config)
# Use resolver
source = source_resolver.resolve(module_id, profile_source)
module_path = source.resolve()
# Load from resolved path
# ... import and mount logic ...
# App layer creates resolver (policy)
resolver = CustomModuleSourceResolver()
# Mount it before creating loader
coordinator.mount("module-source-resolver", resolver)
# Loader will use custom resolver
loader = AmplifierModuleLoader(coordinator)
Kernel provides the mount point and fallback. App layer provides the resolver.
The kernel:
The kernel does NOT:
class ModuleNotFoundError(Exception):
"""Raised when a module cannot be found.
Resolvers MUST raise this when all resolution attempts fail.
Loaders MUST propagate this to callers.
Message SHOULD be helpful, indicating:
- What module was requested
- What resolution attempts were made (if applicable)
- Suggestions for resolution (if applicable)
"""
class ModuleLoadError(Exception):
"""Raised when a module is found but cannot be loaded.
Examples:
- Module path exists but isn't valid Python
- Import fails due to missing dependencies
- Module doesn't implement required protocol
"""
When no ModuleSourceResolver is mounted, the kernel falls back to direct entry point discovery via the _load_direct() method:
Implementation: The _load_direct() method directly calls _load_entry_point() and _load_filesystem() without creating a resolver wrapper object.
This ensures the kernel works without any app-layer resolver.
Not in kernel, but shown for clarity:
# App layer defines custom resolution strategy
class MyCustomResolver:
"""Example custom resolver (app-layer policy)."""
def resolve(self, module_id: str, profile_hint: Any = None) -> ModuleSource:
# App-specific logic
if module_id in self.overrides:
return FileSource(self.overrides[module_id])
# Fall back to profile hint
if profile_hint:
return self.parse_profile_hint(profile_hint)
# Fall back to some default
return PackageSource(f"myapp-module-{module_id}")
This is policy, not kernel. Different apps can implement different strategies.
When implementing custom resolvers:
Kernel specifications:
Related Specifications:
Note: Module source resolution implementation is application-layer policy. Applications may implement custom resolution strategies using the protocols defined above.
This skill should be used when the user asks to "create a slash command", "add a command", "write a custom command", "define command arguments", "use command frontmatter", "organize commands", "create command with file references", "interactive command", "use AskUserQuestion in command", or needs guidance on slash command structure, YAML frontmatter fields, dynamic arguments, bash execution in commands, user interaction patterns, or command development best practices for Claude Code.
This skill should be used when the user asks to "create an agent", "add an agent", "write a subagent", "agent frontmatter", "when to use description", "agent examples", "agent tools", "agent colors", "autonomous agent", or needs guidance on agent structure, system prompts, triggering conditions, or agent development best practices for Claude Code plugins.
This skill should be used when the user asks to "create a hook", "add a PreToolUse/PostToolUse/Stop hook", "validate tool use", "implement prompt-based hooks", "use ${CLAUDE_PLUGIN_ROOT}", "set up event-driven automation", "block dangerous commands", or mentions hook events (PreToolUse, PostToolUse, Stop, SubagentStop, SessionStart, SessionEnd, UserPromptSubmit, PreCompact, Notification). Provides comprehensive guidance for creating and implementing Claude Code plugin hooks with focus on advanced prompt-based hooks API.