Guide for building concurrent, fault-tolerant systems using OTP (GenServer, Supervisor, Task, Agent) and Elixir concurrency primitives
Provides guidance for building concurrent, fault-tolerant Elixir systems using OTP behaviors like GenServer, Supervisor, and Task. Claude activates this when implementing process supervision, managing state with Agents, or designing concurrent architectures.
/plugin marketplace add vinnie357/claude-skills/plugin install all-skills@vinnie357This skill inherits all available tools. When active, it can use any tool Claude has access to.
This skill activates when working with OTP behaviors, building concurrent systems, managing processes, or implementing fault-tolerant architectures in Elixir.
Activate when:
Use GenServer for stateful processes:
defmodule MyApp.Counter do
use GenServer
# Client API
def start_link(initial_value) do
GenServer.start_link(__MODULE__, initial_value, name: __MODULE__)
end
def increment do
GenServer.call(__MODULE__, :increment)
end
def get_value do
GenServer.call(__MODULE__, :get)
end
# Server Callbacks
@impl true
def init(initial_value) do
{:ok, initial_value}
end
@impl true
def handle_call(:increment, _from, state) do
{:reply, state + 1, state + 1}
end
@impl true
def handle_call(:get, _from, state) do
{:reply, state, state}
end
end
call for synchronous requests that need a responsecast for asynchronous fire-and-forget messageshandle_info for receiving regular messagesvia tuples or Registry for dynamic namingBackground Work:
def init(state) do
schedule_work()
{:ok, state}
end
def handle_info(:work, state) do
do_work(state)
schedule_work()
{:noreply, state}
end
defp schedule_work do
Process.send_after(self(), :work, 5000)
end
State Timeouts:
def handle_call(:get, _from, state) do
{:reply, state, state, {:state_timeout, 30_000, :cleanup}}
end
def handle_state_timeout(:cleanup, state) do
{:stop, :normal, state}
end
Build supervision trees for fault tolerance:
defmodule MyApp.Application do
use Application
@impl true
def start(_type, _args) do
children = [
# Database connection pool
{MyApp.Repo, []},
# PubSub system
{Phoenix.PubSub, name: MyApp.PubSub},
# Custom supervisor
{MyApp.WorkerSupervisor, []},
# Individual workers
{MyApp.Cache, []},
{MyApp.RateLimiter, []},
# Web endpoint
MyAppWeb.Endpoint
]
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
end
end
:one_for_one - If a child dies, only that child is restarted
Supervisor.start_link(children, strategy: :one_for_one)
:one_for_all - If any child dies, all children are terminated and restarted
Supervisor.start_link(children, strategy: :one_for_all)
:rest_for_one - If a child dies, it and all children started after it are restarted
Supervisor.start_link(children, strategy: :rest_for_one)
For dynamically creating processes:
defmodule MyApp.WorkerSupervisor do
use DynamicSupervisor
def start_link(init_arg) do
DynamicSupervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
end
def start_worker(args) do
spec = {MyApp.Worker, args}
DynamicSupervisor.start_child(__MODULE__, spec)
end
@impl true
def init(_init_arg) do
DynamicSupervisor.init(strategy: :one_for_one)
end
end
Configure child restart behavior:
children = [
# Always restart (default)
{MyApp.CriticalWorker, restart: :permanent},
# Never restart
{MyApp.OneTimeTask, restart: :temporary},
# Only restart on abnormal exit
{MyApp.OptionalWorker, restart: :transient}
]
For concurrent work without needing results:
Task.start(fn ->
send_email(user, "Welcome!")
end)
For concurrent work with results:
task = Task.async(fn ->
expensive_computation()
end)
# Do other work...
result = Task.await(task, 5000) # 5 second timeout
For long-running tasks under supervision:
defmodule MyApp.Application do
use Application
def start(_type, _args) do
children = [
{Task.Supervisor, name: MyApp.TaskSupervisor}
]
Supervisor.start_link(children, strategy: :one_for_one)
end
end
# Use the supervised task
Task.Supervisor.start_child(MyApp.TaskSupervisor, fn ->
long_running_operation()
end)
Process collections concurrently:
# Sequential
results = Enum.map(urls, &fetch_url/1)
# Concurrent
results = Task.async_stream(urls, &fetch_url/1, max_concurrency: 10)
|> Enum.to_list()
Use Agent for simple state:
{:ok, agent} = Agent.start_link(fn -> %{} end, name: MyApp.Cache)
# Get state
value = Agent.get(MyApp.Cache, fn state -> Map.get(state, :key) end)
# Update state
Agent.update(MyApp.Cache, fn state -> Map.put(state, :key, value) end)
# Get and update atomically
Agent.get_and_update(MyApp.Cache, fn state ->
{Map.get(state, :key), Map.delete(state, :key)}
end)
When to use Agent vs GenServer:
Basic message passing:
# Send message
send(pid, {:hello, "world"})
# Receive message
receive do
{:hello, msg} -> IO.puts(msg)
after
5000 -> IO.puts("Timeout")
end
Register processes by name:
# Local registration
Process.register(self(), :my_process)
send(:my_process, :hello)
# Via Registry
{:ok, _} = Registry.start_link(keys: :unique, name: MyApp.Registry)
{:ok, pid} = GenServer.start_link(MyWorker, nil,
name: {:via, Registry, {MyApp.Registry, "worker_1"}}
)
# Look up process
[{pid, _}] = Registry.lookup(MyApp.Registry, "worker_1")
Links - Bidirectional, propagate exits:
# Link processes
Process.link(pid)
# Spawn linked
spawn_link(fn -> do_work() end)
Monitors - Unidirectional, receive DOWN messages:
ref = Process.monitor(pid)
receive do
{:DOWN, ^ref, :process, ^pid, reason} ->
IO.puts("Process died: #{inspect(reason)}")
end
Chain operations with concurrency:
defmodule Pipeline do
def process(data) do
data
|> async(&step1/1)
|> async(&step2/1)
|> async(&step3/1)
|> await_all()
end
defp async(input, fun) do
Task.async(fn -> fun.(input) end)
end
defp await_all(tasks) when is_list(tasks) do
Enum.map(tasks, &Task.await/1)
end
end
Implement a worker pool:
defmodule MyApp.WorkerPool do
use GenServer
def start_link(opts) do
pool_size = Keyword.get(opts, :size, 10)
GenServer.start_link(__MODULE__, pool_size, name: __MODULE__)
end
def execute(fun) do
GenServer.call(__MODULE__, {:execute, fun})
end
@impl true
def init(pool_size) do
workers = for _ <- 1..pool_size do
{:ok, pid} = Task.Supervisor.start_link()
pid
end
{:ok, %{workers: workers, index: 0}}
end
@impl true
def handle_call({:execute, fun}, _from, state) do
worker = Enum.at(state.workers, state.index)
task = Task.Supervisor.async_nolink(worker, fun)
new_index = rem(state.index + 1, length(state.workers))
{:reply, task, %{state | index: new_index}}
end
end
For producer-consumer pipelines:
defmodule Producer do
use GenStage
def start_link(initial) do
GenStage.start_link(__MODULE__, initial, name: __MODULE__)
end
def init(initial) do
{:producer, initial}
end
def handle_demand(demand, state) do
events = Enum.to_list(state..state + demand - 1)
{:noreply, events, state + demand}
end
end
defmodule Consumer do
use GenStage
def start_link() do
GenStage.start_link(__MODULE__, :ok)
end
def init(:ok) do
{:consumer, :ok}
end
def handle_events(events, _from, state) do
Enum.each(events, &process_event/1)
{:noreply, [], state}
end
end
In-memory key-value storage:
# Create table
:ets.new(:my_table, [:named_table, :public, read_concurrency: true])
# Insert
:ets.insert(:my_table, {:key, "value"})
# Lookup
[{:key, value}] = :ets.lookup(:my_table, :key)
# Delete
:ets.delete(:my_table, :key)
# Match patterns
:ets.match(:my_table, {:"$1", "value"})
# Iterate
:ets.foldl(fn {k, v}, acc -> [{k, v} | acc] end, [], :my_table)
read_concurrency: true for read-heavy workloadswrite_concurrency: true for write-heavy workloads:set (default) for unique keys:bag or :duplicate_bag for multiple values per keyDesign for failure:
# Don't do defensive programming
def process_order(order_id) do
# Let it crash if order doesn't exist
order = Repo.get!(Order, order_id)
# Let it crash if validation fails
{:ok, processed} = process(order)
processed
end
When to handle errors vs let crash:
# Handle expected errors
def fetch_user(id) do
case HTTPoison.get("#{@api_url}/users/#{id}") do
{:ok, %{status_code: 200, body: body}} ->
Jason.decode(body)
{:ok, %{status_code: 404}} ->
{:error, :not_found}
{:ok, %{status_code: status}} ->
{:error, {:unexpected_status, status}}
{:error, reason} ->
{:error, {:network_error, reason}}
end
end
# Let unexpected errors crash
def update_user!(id, params) do
user = Repo.get!(User, id) # Crash if not found
user
|> User.changeset(params)
|> Repo.update!() # Crash if invalid
end
Prevent cascading failures:
defmodule CircuitBreaker do
use GenServer
def start_link(_) do
GenServer.start_link(__MODULE__, %{status: :closed, failures: 0}, name: __MODULE__)
end
def call(fun) do
case GenServer.call(__MODULE__, :status) do
:open -> {:error, :circuit_open}
:closed -> execute(fun)
end
end
defp execute(fun) do
try do
result = fun.()
GenServer.cast(__MODULE__, :success)
{:ok, result}
rescue
e ->
GenServer.cast(__MODULE__, :failure)
{:error, e}
end
end
@impl true
def init(state), do: {:ok, state}
@impl true
def handle_call(:status, _from, state) do
{:reply, state.status, state}
end
@impl true
def handle_cast(:success, state) do
{:noreply, %{state | failures: 0, status: :closed}}
end
@impl true
def handle_cast(:failure, state) do
new_failures = state.failures + 1
if new_failures >= 5 do
Process.send_after(self(), :half_open, 30_000)
{:noreply, %{state | failures: new_failures, status: :open}}
else
{:noreply, %{state | failures: new_failures}}
end
end
@impl true
def handle_info(:half_open, state) do
{:noreply, %{state | status: :closed, failures: 0}}
end
end
defmodule MyApp.CounterTest do
use ExUnit.Case, async: true
test "increments counter" do
{:ok, pid} = MyApp.Counter.start_link(0)
assert MyApp.Counter.increment(pid) == 1
assert MyApp.Counter.increment(pid) == 2
assert MyApp.Counter.get_value(pid) == 2
end
end
test "process receives message" do
parent = self()
spawn(fn ->
receive do
:ping -> send(parent, :pong)
end
end)
send(pid, :ping)
assert_receive :pong, 1000
end
test "supervisor restarts crashed worker" do
{:ok, sup} = Supervisor.start_link([MyApp.Worker], strategy: :one_for_one)
[{_, worker_pid, _, _}] = Supervisor.which_children(sup)
# Crash the worker
Process.exit(worker_pid, :kill)
# Wait for restart
Process.sleep(100)
# Verify new worker started
[{_, new_pid, _, _}] = Supervisor.which_children(sup)
assert new_pid != worker_pid
assert Process.alive?(new_pid)
end
Launch Observer for visual process inspection:
:observer.start()
Inspect running processes:
# List all processes
Process.list()
# Process information
Process.info(pid)
# Message queue length
{:message_queue_len, count} = Process.info(pid, :message_queue_len)
# Current function
{:current_function, {mod, fun, arity}} = Process.info(pid, :current_function)
Use :sys module for debugging:
# Enable tracing
:sys.trace(pid, true)
# Get state
:sys.get_state(pid)
# Get status
:sys.get_status(pid)
read_concurrency for read-heavy workloadsThis skill should be used when the user asks to "create a slash command", "add a command", "write a custom command", "define command arguments", "use command frontmatter", "organize commands", "create command with file references", "interactive command", "use AskUserQuestion in command", or needs guidance on slash command structure, YAML frontmatter fields, dynamic arguments, bash execution in commands, user interaction patterns, or command development best practices for Claude Code.
This skill should be used when the user asks to "create an agent", "add an agent", "write a subagent", "agent frontmatter", "when to use description", "agent examples", "agent tools", "agent colors", "autonomous agent", or needs guidance on agent structure, system prompts, triggering conditions, or agent development best practices for Claude Code plugins.
This skill should be used when the user asks to "create a hook", "add a PreToolUse/PostToolUse/Stop hook", "validate tool use", "implement prompt-based hooks", "use ${CLAUDE_PLUGIN_ROOT}", "set up event-driven automation", "block dangerous commands", or mentions hook events (PreToolUse, PostToolUse, Stop, SubagentStop, SessionStart, SessionEnd, UserPromptSubmit, PreCompact, Notification). Provides comprehensive guidance for creating and implementing Claude Code plugin hooks with focus on advanced prompt-based hooks API.