From 3commas
Build distributed actor-based systems with Ergo Framework in Go. Covers actor lifecycle, supervision, message patterns, meta processes for I/O, cluster configuration, and EDF serialization. Use when implementing Ergo applications or working with actor-based distributed systems.
npx claudepluginhub 3commas-io/commas-claude --plugin 3commasThis skill uses the workspace's default tool permissions.
Build distributed, fault-tolerant actor-based systems in Go with Erlang-like reliability. Ergo Framework provides actor model, supervision trees, network transparency, and cluster support.
Guides Next.js Cache Components and Partial Prerendering (PPR) with cacheComponents enabled. Implements 'use cache', cacheLife(), cacheTag(), revalidateTag(), static/dynamic optimization, and cache debugging.
Migrates code, prompts, and API calls from Claude Sonnet 4.0/4.5 or Opus 4.1 to Opus 4.5, updating model strings on Anthropic, AWS, GCP, Azure platforms.
Automates semantic versioning and release workflow for Claude Code plugins: bumps versions in package.json, marketplace.json, plugin.json; verifies builds; creates git tags, GitHub releases, changelogs.
Build distributed, fault-tolerant actor-based systems in Go with Erlang-like reliability. Ergo Framework provides actor model, supervision trees, network transparency, and cluster support.
Process States:
Mailbox Queues (priority order):
Critical Rule: NEVER use mutexes, goroutines, or blocking operations in actor callbacks.
Strategies:
Child Restart Policies:
Async (Send): Fire-and-forget, no response expected Sync (Call): Blocks until response received Important Delivery: Confirms delivery to remote mailbox FR-2PC: Fully-Reliable Two-Phase Commit for distributed transactions
ApplicationSpec fields:
package main
import (
"ergo.services/ergo"
"ergo.services/ergo/act"
"ergo.services/ergo/gen"
)
type MyActor struct {
act.Actor
counter int
}
func createMyActor() gen.ProcessBehavior {
return &MyActor{}
}
func (a *MyActor) Init(args ...any) error {
a.Log().Info("actor started")
return nil
}
func (a *MyActor) HandleMessage(from gen.PID, message any) error {
switch m := message.(type) {
case MessageIncrement:
a.counter += m.Value
a.Log().Info("counter: %d", a.counter)
}
return nil
}
func (a *MyActor) HandleCall(from gen.PID, ref gen.Ref, request any) (any, error) {
switch r := request.(type) {
case GetCounterRequest:
return GetCounterResponse{Value: a.counter}, nil
}
return nil, fmt.Errorf("unknown request: %T", request)
}
func (a *MyActor) Terminate(reason error) {
a.Log().Info("actor terminated: %v", reason)
}
// Messages - use MessageXXX for async, XXXRequest/XXXResponse for sync
type MessageIncrement struct {
Value int
}
type GetCounterRequest struct{}
type GetCounterResponse struct {
Value int
}
func main() {
node, err := ergo.StartNode("mynode@localhost", gen.NodeOptions{})
if err != nil {
panic(err)
}
pid, err := node.Spawn(createMyActor, gen.ProcessOptions{})
if err != nil {
panic(err)
}
// Send async message
node.Send(pid, MessageIncrement{Value: 5})
// Call sync request
result, err := node.Call(pid, GetCounterRequest{})
if err == nil {
resp := result.(GetCounterResponse)
fmt.Printf("Counter: %d\n", resp.Value)
}
node.Wait()
}
func (a *ParentActor) Init(args ...any) error {
// Spawn child actor
childPID, err := a.Spawn(createChildActor, gen.ProcessOptions{})
if err != nil {
return err
}
a.childPID = childPID
// Spawn with registered name
_, err = a.SpawnRegister("worker", createWorker, gen.ProcessOptions{})
if err != nil {
return err
}
return nil
}
func (a *MyActor) Init(args ...any) error {
// Link - bidirectional, both terminate together
a.Link(otherPID)
// Monitor - unidirectional, receive MessageDownPID on termination
a.Monitor(otherPID)
return nil
}
func (a *MyActor) HandleMessage(from gen.PID, message any) error {
switch m := message.(type) {
case gen.MessageDownPID:
a.Log().Info("monitored process %v terminated: %v", m.PID, m.Reason)
}
return nil
}
type MySupervisor struct {
act.Supervisor
}
func createSupervisor() gen.ProcessBehavior {
return &MySupervisor{}
}
func (s *MySupervisor) Init(args ...any) (act.SupervisorSpec, error) {
return act.SupervisorSpec{
Type: act.SupervisorTypeOneForOne,
Restart: act.SupervisorRestart{
Strategy: act.SupervisorStrategyTransient,
Intensity: 5, // max restarts
Period: 10, // within 10 seconds
},
Children: []act.SupervisorChild{
{
Name: "worker1",
Factory: createWorker,
Args: []any{"config1"},
Restart: act.SupervisorChildRestartPermanent,
},
{
Name: "worker2",
Factory: createWorker,
Args: []any{"config2"},
Restart: act.SupervisorChildRestartTransient,
},
},
}, nil
}
func (a *MyActor) HandleMessage(from gen.PID, message any) error {
switch m := message.(type) {
case StartWorkerRequest:
// Start child via supervisor
spec := act.SupervisorChild{
Name: m.Name,
Factory: createWorker,
Restart: act.SupervisorChildRestartTemporary,
}
err := a.Send(a.supervisorPID, act.MessageStartChild{Child: spec})
case StopWorkerRequest:
err := a.Send(a.supervisorPID, act.MessageStopChild{Name: m.Name})
}
return nil
}
// Async - fire and forget
a.Send(targetPID, MessageDoWork{Data: "task1"})
// Async with priority
a.SendWithPriority(targetPID, UrgentMessage{}, gen.MessagePriorityHigh)
// Sync - blocks until response
result, err := a.Call(targetPID, GetStatusRequest{})
if err != nil {
// Handle timeout or error
}
// Sync with timeout
result, err := a.CallWithTimeout(targetPID, request, 5*time.Second)
// Send with delivery confirmation (blocks until ACK)
err := a.SendImportant(remotePID, CriticalMessage{})
if err != nil {
// ErrProcessUnknown - process doesn't exist
// ErrProcessMailboxFull - mailbox is full
// ErrTimeout - no ACK received
}
// Call with delivery confirmation
result, err := a.CallImportant(remotePID, CriticalRequest{})
// Immediate error if process missing (no ambiguous timeout)
// Process-level flag - all messages use Important Delivery
func (a *MyActor) Init(args ...any) error {
a.SetImportantDelivery(true)
return nil
}
// Caller side - request confirmed delivered
result, err := process.CallImportant(target, TransactionRequest{})
// Handler side - response confirmed delivered
func (h *Handler) HandleCall(from gen.PID, ref gen.Ref, request any) (any, error) {
h.SetImportantDelivery(true) // Response uses Important Delivery
result := h.processTransaction(request)
return TransactionResponse{Result: result}, nil
}
// Both directions guaranteed - foundation for distributed transactions
type MyApp struct {
act.Application
}
func createMyApp() gen.ApplicationBehavior {
return &MyApp{}
}
func (a *MyApp) Load(args ...any) (gen.ApplicationSpec, error) {
return gen.ApplicationSpec{
Name: "my-service",
Description: "My service description",
Version: "1.0.0",
Mode: gen.ApplicationModeTransient,
Tags: []gen.Tag{"production"},
Map: gen.ApplicationMap{
"api": "api-handler",
"worker": "background-worker",
},
Group: []gen.ApplicationMemberSpec{
{
Factory: createAPIHandler,
Name: "api-handler",
},
{
Factory: createWorker,
Name: "background-worker",
},
},
Env: map[gen.Env]any{
"config_key": "config_value",
},
}, nil
}
func (a *MyApp) Start(mode gen.ApplicationMode) error {
a.Log().Info("application started in mode: %s", mode)
return nil
}
func (a *MyApp) Terminate(reason error) {
a.Log().Info("application terminated: %v", reason)
}
func main() {
node, _ := ergo.StartNode("mynode@localhost", gen.NodeOptions{})
// Load and start application
err := node.ApplicationLoad(createMyApp)
if err != nil {
panic(err)
}
err = node.ApplicationStart("my-service")
if err != nil {
panic(err)
}
node.Wait()
}
type WorkerPool struct {
act.Pool
}
func createPool() gen.ProcessBehavior {
return &WorkerPool{}
}
func (p *WorkerPool) Init(args ...any) (act.PoolOptions, error) {
return act.PoolOptions{
PoolSize: 10, // 10 workers
WorkerMailboxSize: 20, // 20 messages per worker
WorkerFactory: createWorker, // Worker factory
WorkerArgs: []any{"config"},
}, nil
}
// Capacity = PoolSize * WorkerMailboxSize = 200 concurrent messages
// Pool-level message handling (Urgent/System priority)
func (p *WorkerPool) HandleMessage(from gen.PID, message any) error {
switch m := message.(type) {
case ScaleUpCommand:
newSize, _ := p.AddWorkers(m.Count)
p.Log().Info("scaled to %d workers", newSize)
}
return nil
}
// Worker implementation
type Worker struct {
act.Actor
}
func createWorker() gen.ProcessBehavior {
return &Worker{}
}
func (w *Worker) HandleMessage(from gen.PID, message any) error {
switch m := message.(type) {
case WorkRequest:
result := w.process(m)
w.Send(from, WorkResult{Data: result})
}
return nil
}
Don't use Pool when:
func (a *Actor) Init(args ...any) error {
options := meta.TCPServerOptions{
Host: "0.0.0.0",
Port: 8080,
ProcessPool: []gen.Atom{"worker1", "worker2"}, // Route connections
}
server, err := meta.CreateTCPServer(options)
if err != nil {
return err
}
serverID, err := a.SpawnMeta(server, gen.MetaOptions{})
if err != nil {
server.Terminate(err) // Close socket on failure
return err
}
return nil
}
func (a *Actor) HandleMessage(from gen.PID, message any) error {
switch m := message.(type) {
case meta.MessageTCPConnect:
a.Log().Info("client connected: %s", m.RemoteAddr)
a.Send(m.ID, meta.MessageTCP{Data: []byte("Welcome!\n")})
case meta.MessageTCP:
// Echo received data
a.Send(m.ID, meta.MessageTCP{Data: m.Data})
case meta.MessageTCPDisconnect:
a.Log().Info("client disconnected: %s", m.ID)
}
return nil
}
func (a *Actor) Init(args ...any) error {
options := meta.TCPConnectionOptions{
Host: "example.com",
Port: 80,
}
conn, err := meta.CreateTCPConnection(options)
if err != nil {
return err
}
connID, err := a.SpawnMeta(conn, gen.MetaOptions{})
if err != nil {
conn.Terminate(err)
return err
}
a.connID = connID
return nil
}
func (a *Actor) HandleMessage(from gen.PID, message any) error {
switch m := message.(type) {
case meta.MessageTCPConnect:
// Connection established, send request
request := "GET / HTTP/1.1\r\nHost: example.com\r\n\r\n"
a.Send(m.ID, meta.MessageTCP{Data: []byte(request)})
case meta.MessageTCP:
a.Log().Info("response: %s", string(m.Data))
case meta.MessageTCPDisconnect:
a.Log().Info("disconnected")
}
return nil
}
import "ergo.services/ergo/net/edf"
// Register in init() BEFORE node starts
func init() {
// Register nested types first
edf.RegisterTypeOf(Address{})
edf.RegisterTypeOf(Person{})
// Register errors
edf.RegisterError(ErrInvalidOrder)
}
// All fields MUST be Exported for cross-node messages
type Address struct {
City string // Exported
Street string // Exported
}
type Person struct {
Name string // Exported
Address Address // Exported, nested type registered first
}
// WRONG - unexported fields cannot be serialized
type BadMessage struct {
Name string
data []byte // unexported - EDF will fail
}
| Type | Max Size |
|---|---|
| Atom | 255 bytes |
| String | 65535 bytes |
| Error | 32767 bytes |
| Binary | 4GB |
| Collections | 2^32 elements |
import "ergo.services/registrar/etcd"
node, err := ergo.StartNode("mynode@localhost", gen.NodeOptions{
Network: gen.NetworkOptions{
Registrar: etcd.Create(etcd.Options{
Endpoints: []string{"etcd1:2379", "etcd2:2379"},
}),
},
})
import "ergo.services/registrar/saturn"
node, err := ergo.StartNode("mynode@localhost", gen.NodeOptions{
Network: gen.NetworkOptions{
Registrar: saturn.Create(saturn.Options{
Nodes: []string{"saturn1:4499", "saturn2:4499"},
}),
},
})
func (a *Gateway) callService(request any) (any, error) {
registrar, _ := a.Node().Network().Registrar()
routes, _ := registrar.Resolver().ResolveApplication("my-service")
for _, r := range routes {
for _, tag := range r.Tags {
if tag == "production" {
return a.Call(
gen.ProcessID{Node: r.Node, Name: r.Map.Get("handler")},
request,
)
}
}
}
return nil, fmt.Errorf("service not found")
}
When uncertain about APIs or behavior, consult the Ergo Framework source code and /docs directory in Go module cache:
ls $(go env GOMODCACHE)/ergo.services/ergo@*/docs/