Guide for OTP and Elixir concurrency. Use when implementing GenServers, designing supervision trees, or building fault-tolerant concurrent systems.
Provides guidance for building fault-tolerant concurrent systems using Elixir OTP behaviors.
npx claudepluginhub vinnie357/claude-skillsThis skill inherits all available tools. When active, it can use any tool Claude has access to.
This skill activates when working with OTP behaviors, building concurrent systems, managing processes, or implementing fault-tolerant architectures in Elixir.
Activate when:
Use GenServer for stateful processes:
defmodule MyApp.Counter do
use GenServer
# Client API
def start_link(initial_value) do
GenServer.start_link(__MODULE__, initial_value, name: __MODULE__)
end
def increment do
GenServer.call(__MODULE__, :increment)
end
def get_value do
GenServer.call(__MODULE__, :get)
end
# Server Callbacks
@impl true
def init(initial_value) do
{:ok, initial_value}
end
@impl true
def handle_call(:increment, _from, state) do
{:reply, state + 1, state + 1}
end
@impl true
def handle_call(:get, _from, state) do
{:reply, state, state}
end
end
call for synchronous requests that need a responsecast for asynchronous fire-and-forget messageshandle_info for receiving regular messagesvia tuples or Registry for dynamic namingBackground Work:
def init(state) do
schedule_work()
{:ok, state}
end
def handle_info(:work, state) do
do_work(state)
schedule_work()
{:noreply, state}
end
defp schedule_work do
Process.send_after(self(), :work, 5000)
end
State Timeouts:
def handle_call(:get, _from, state) do
{:reply, state, state, {:state_timeout, 30_000, :cleanup}}
end
def handle_state_timeout(:cleanup, state) do
{:stop, :normal, state}
end
Build supervision trees for fault tolerance:
defmodule MyApp.Application do
use Application
@impl true
def start(_type, _args) do
children = [
# Database connection pool
{MyApp.Repo, []},
# PubSub system
{Phoenix.PubSub, name: MyApp.PubSub},
# Custom supervisor
{MyApp.WorkerSupervisor, []},
# Individual workers
{MyApp.Cache, []},
{MyApp.RateLimiter, []},
# Web endpoint
MyAppWeb.Endpoint
]
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
end
end
:one_for_one - If a child dies, only that child is restarted
Supervisor.start_link(children, strategy: :one_for_one)
:one_for_all - If any child dies, all children are terminated and restarted
Supervisor.start_link(children, strategy: :one_for_all)
:rest_for_one - If a child dies, it and all children started after it are restarted
Supervisor.start_link(children, strategy: :rest_for_one)
For dynamically creating processes:
defmodule MyApp.WorkerSupervisor do
use DynamicSupervisor
def start_link(init_arg) do
DynamicSupervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
end
def start_worker(args) do
spec = {MyApp.Worker, args}
DynamicSupervisor.start_child(__MODULE__, spec)
end
@impl true
def init(_init_arg) do
DynamicSupervisor.init(strategy: :one_for_one)
end
end
Configure child restart behavior:
children = [
# Always restart (default)
{MyApp.CriticalWorker, restart: :permanent},
# Never restart
{MyApp.OneTimeTask, restart: :temporary},
# Only restart on abnormal exit
{MyApp.OptionalWorker, restart: :transient}
]
For concurrent work without needing results:
Task.start(fn ->
send_email(user, "Welcome!")
end)
For concurrent work with results:
task = Task.async(fn ->
expensive_computation()
end)
# Do other work...
result = Task.await(task, 5000) # 5 second timeout
For long-running tasks under supervision:
defmodule MyApp.Application do
use Application
def start(_type, _args) do
children = [
{Task.Supervisor, name: MyApp.TaskSupervisor}
]
Supervisor.start_link(children, strategy: :one_for_one)
end
end
# Use the supervised task
Task.Supervisor.start_child(MyApp.TaskSupervisor, fn ->
long_running_operation()
end)
Process collections concurrently:
# Sequential
results = Enum.map(urls, &fetch_url/1)
# Concurrent
results = Task.async_stream(urls, &fetch_url/1, max_concurrency: 10)
|> Enum.to_list()
Use Agent for simple state:
{:ok, agent} = Agent.start_link(fn -> %{} end, name: MyApp.Cache)
# Get state
value = Agent.get(MyApp.Cache, fn state -> Map.get(state, :key) end)
# Update state
Agent.update(MyApp.Cache, fn state -> Map.put(state, :key, value) end)
# Get and update atomically
Agent.get_and_update(MyApp.Cache, fn state ->
{Map.get(state, :key), Map.delete(state, :key)}
end)
When to use Agent vs GenServer:
Basic message passing:
# Send message
send(pid, {:hello, "world"})
# Receive message
receive do
{:hello, msg} -> IO.puts(msg)
after
5000 -> IO.puts("Timeout")
end
Register processes by name:
# Local registration
Process.register(self(), :my_process)
send(:my_process, :hello)
# Via Registry
{:ok, _} = Registry.start_link(keys: :unique, name: MyApp.Registry)
{:ok, pid} = GenServer.start_link(MyWorker, nil,
name: {:via, Registry, {MyApp.Registry, "worker_1"}}
)
# Look up process
[{pid, _}] = Registry.lookup(MyApp.Registry, "worker_1")
Links - Bidirectional, propagate exits:
# Link processes
Process.link(pid)
# Spawn linked
spawn_link(fn -> do_work() end)
Monitors - Unidirectional, receive DOWN messages:
ref = Process.monitor(pid)
receive do
{:DOWN, ^ref, :process, ^pid, reason} ->
IO.puts("Process died: #{inspect(reason)}")
end
Chain operations with concurrency:
defmodule Pipeline do
def process(data) do
data
|> async(&step1/1)
|> async(&step2/1)
|> async(&step3/1)
|> await_all()
end
defp async(input, fun) do
Task.async(fn -> fun.(input) end)
end
defp await_all(tasks) when is_list(tasks) do
Enum.map(tasks, &Task.await/1)
end
end
Implement a worker pool:
defmodule MyApp.WorkerPool do
use GenServer
def start_link(opts) do
pool_size = Keyword.get(opts, :size, 10)
GenServer.start_link(__MODULE__, pool_size, name: __MODULE__)
end
def execute(fun) do
GenServer.call(__MODULE__, {:execute, fun})
end
@impl true
def init(pool_size) do
workers = for _ <- 1..pool_size do
{:ok, pid} = Task.Supervisor.start_link()
pid
end
{:ok, %{workers: workers, index: 0}}
end
@impl true
def handle_call({:execute, fun}, _from, state) do
worker = Enum.at(state.workers, state.index)
task = Task.Supervisor.async_nolink(worker, fun)
new_index = rem(state.index + 1, length(state.workers))
{:reply, task, %{state | index: new_index}}
end
end
For producer-consumer pipelines:
defmodule Producer do
use GenStage
def start_link(initial) do
GenStage.start_link(__MODULE__, initial, name: __MODULE__)
end
def init(initial) do
{:producer, initial}
end
def handle_demand(demand, state) do
events = Enum.to_list(state..state + demand - 1)
{:noreply, events, state + demand}
end
end
defmodule Consumer do
use GenStage
def start_link() do
GenStage.start_link(__MODULE__, :ok)
end
def init(:ok) do
{:consumer, :ok}
end
def handle_events(events, _from, state) do
Enum.each(events, &process_event/1)
{:noreply, [], state}
end
end
In-memory key-value storage:
# Create table
:ets.new(:my_table, [:named_table, :public, read_concurrency: true])
# Insert
:ets.insert(:my_table, {:key, "value"})
# Lookup
[{:key, value}] = :ets.lookup(:my_table, :key)
# Delete
:ets.delete(:my_table, :key)
# Match patterns
:ets.match(:my_table, {:"$1", "value"})
# Iterate
:ets.foldl(fn {k, v}, acc -> [{k, v} | acc] end, [], :my_table)
read_concurrency: true for read-heavy workloadswrite_concurrency: true for write-heavy workloads:set (default) for unique keys:bag or :duplicate_bag for multiple values per keyDesign for failure:
# Don't do defensive programming
def process_order(order_id) do
# Let it crash if order doesn't exist
order = Repo.get!(Order, order_id)
# Let it crash if validation fails
{:ok, processed} = process(order)
processed
end
When to handle errors vs let crash:
# Handle expected errors
def fetch_user(id) do
case HTTPoison.get("#{@api_url}/users/#{id}") do
{:ok, %{status_code: 200, body: body}} ->
Jason.decode(body)
{:ok, %{status_code: 404}} ->
{:error, :not_found}
{:ok, %{status_code: status}} ->
{:error, {:unexpected_status, status}}
{:error, reason} ->
{:error, {:network_error, reason}}
end
end
# Let unexpected errors crash
def update_user!(id, params) do
user = Repo.get!(User, id) # Crash if not found
user
|> User.changeset(params)
|> Repo.update!() # Crash if invalid
end
Prevent cascading failures:
defmodule CircuitBreaker do
use GenServer
def start_link(_) do
GenServer.start_link(__MODULE__, %{status: :closed, failures: 0}, name: __MODULE__)
end
def call(fun) do
case GenServer.call(__MODULE__, :status) do
:open -> {:error, :circuit_open}
:closed -> execute(fun)
end
end
defp execute(fun) do
try do
result = fun.()
GenServer.cast(__MODULE__, :success)
{:ok, result}
rescue
e ->
GenServer.cast(__MODULE__, :failure)
{:error, e}
end
end
@impl true
def init(state), do: {:ok, state}
@impl true
def handle_call(:status, _from, state) do
{:reply, state.status, state}
end
@impl true
def handle_cast(:success, state) do
{:noreply, %{state | failures: 0, status: :closed}}
end
@impl true
def handle_cast(:failure, state) do
new_failures = state.failures + 1
if new_failures >= 5 do
Process.send_after(self(), :half_open, 30_000)
{:noreply, %{state | failures: new_failures, status: :open}}
else
{:noreply, %{state | failures: new_failures}}
end
end
@impl true
def handle_info(:half_open, state) do
{:noreply, %{state | status: :closed, failures: 0}}
end
end
defmodule MyApp.CounterTest do
use ExUnit.Case, async: true
test "increments counter" do
{:ok, pid} = MyApp.Counter.start_link(0)
assert MyApp.Counter.increment(pid) == 1
assert MyApp.Counter.increment(pid) == 2
assert MyApp.Counter.get_value(pid) == 2
end
end
test "process receives message" do
parent = self()
spawn(fn ->
receive do
:ping -> send(parent, :pong)
end
end)
send(pid, :ping)
assert_receive :pong, 1000
end
test "supervisor restarts crashed worker" do
{:ok, sup} = Supervisor.start_link([MyApp.Worker], strategy: :one_for_one)
[{_, worker_pid, _, _}] = Supervisor.which_children(sup)
# Crash the worker
Process.exit(worker_pid, :kill)
# Wait for restart
Process.sleep(100)
# Verify new worker started
[{_, new_pid, _, _}] = Supervisor.which_children(sup)
assert new_pid != worker_pid
assert Process.alive?(new_pid)
end
Launch Observer for visual process inspection:
:observer.start()
Inspect running processes:
# List all processes
Process.list()
# Process information
Process.info(pid)
# Message queue length
{:message_queue_len, count} = Process.info(pid, :message_queue_len)
# Current function
{:current_function, {mod, fun, arity}} = Process.info(pid, :current_function)
Use :sys module for debugging:
# Enable tracing
:sys.trace(pid, true)
# Get state
:sys.get_state(pid)
# Get status
:sys.get_status(pid)
read_concurrency for read-heavy workloadsActivates when the user asks about AI prompts, needs prompt templates, wants to search for prompts, or mentions prompts.chat. Use for discovering, retrieving, and improving prompts.
Search, retrieve, and install Agent Skills from the prompts.chat registry using MCP tools. Use when the user asks to find skills, browse skill catalogs, install a skill for Claude, or extend Claude's capabilities with reusable AI agent components.
This skill should be used when the user wants to "create a skill", "add a skill to plugin", "write a new skill", "improve skill description", "organize skill content", or needs guidance on skill structure, progressive disclosure, or skill development best practices for Claude Code plugins.