Help us improve
Share bugs, ideas, or general feedback.
From elixir-phoenix-guide
Enforces Elixir OTP best practices for GenServers (public APIs, fast init via handle_continue, call/cast), Supervisors, Tasks, Agents. Invoke before implementing these modules.
npx claudepluginhub j-morgan6/elixir-phoenix-guide --plugin elixir-phoenix-guideHow this skill is triggered — by the user, by Claude, or both
Slash command
/elixir-phoenix-guide:otp-essentialsThe summary Claude sees in its skill listing — used to decide when to auto-load this skill
1. **Always use `@impl true`** before GenServer/Agent callbacks (init, handle_call, handle_cast, handle_info, terminate)
Provides OTP patterns for GenServer, Supervisor, Agent, Task to build concurrent, fault-tolerant Elixir applications.
Guides implementation of OTP behaviors (GenServer, Supervisor), supervision trees, and fault-tolerant concurrent systems in Elixir.
Guides Erlang OTP behaviors: gen_server for stateful processes, gen_statem for state machines, supervisors for fault tolerance, gen_event for event handling.
Share bugs, ideas, or general feedback.
@impl true before GenServer/Agent callbacks (init, handle_call, handle_cast, handle_info, terminate)init/1 fast — no blocking calls, no DB queries; use handle_continue for expensive setupGenServer.call for request/response, GenServer.cast for fire-and-forget — never cast when you need a resultGenServer.call(pid, ...) directlyTask.async/Task.await with bounded timeouts — never Task.async without a corresponding Task.await or Task.yieldAlways wrap GenServer calls behind a public module API. Callers should not know they're talking to a GenServer.
# Bad — leaks GenServer implementation to callers
GenServer.call(MyApp.Cache, {:get, key})
# Good — public API hides the GenServer
defmodule MyApp.Cache do
use GenServer
# --- Public API ---
def start_link(opts) do
name = Keyword.get(opts, :name, __MODULE__)
GenServer.start_link(__MODULE__, opts, name: name)
end
def get(key, server \\ __MODULE__) do
GenServer.call(server, {:get, key})
end
def put(key, value, server \\ __MODULE__) do
GenServer.cast(server, {:put, key, value})
end
# --- Callbacks ---
@impl true
def init(_opts) do
{:ok, %{}}
end
@impl true
def handle_call({:get, key}, _from, state) do
{:reply, Map.get(state, key), state}
end
@impl true
def handle_cast({:put, key, value}, state) do
{:noreply, Map.put(state, key, value)}
end
end
Never block in init/1. Use handle_continue for expensive setup.
# Bad — blocks the supervisor while loading data
@impl true
def init(opts) do
data = MyApp.Repo.all(MyApp.Item) # Blocks!
{:ok, %{items: data}}
end
# Good — returns immediately, loads data asynchronously
@impl true
def init(opts) do
{:ok, %{items: []}, {:continue, :load_data}}
end
@impl true
def handle_continue(:load_data, state) do
data = MyApp.Repo.all(MyApp.Item)
{:noreply, %{state | items: data}}
end
# call — synchronous, caller waits for reply (use for reads, queries)
def get_count(server \\ __MODULE__) do
GenServer.call(server, :get_count)
end
@impl true
def handle_call(:get_count, _from, state) do
{:reply, state.count, state}
end
# cast — asynchronous, fire-and-forget (use for writes, side effects)
def increment(server \\ __MODULE__) do
GenServer.cast(server, :increment)
end
@impl true
def handle_cast(:increment, state) do
{:noreply, %{state | count: state.count + 1}}
end
Use handle_info for messages not sent via call/cast — timers, monitors, PubSub, etc.
@impl true
def init(_opts) do
Process.send_after(self(), :tick, 1_000)
{:ok, %{count: 0}}
end
@impl true
def handle_info(:tick, state) do
Process.send_after(self(), :tick, 1_000)
{:noreply, %{state | count: state.count + 1}}
end
# one_for_one — restart only the failed child (most common)
children = [
{MyApp.Cache, []},
{MyApp.Worker, []}
]
Supervisor.start_link(children, strategy: :one_for_one)
# one_for_all — restart ALL children when one fails
# Use when children depend on each other's state
Supervisor.start_link(children, strategy: :one_for_all)
# rest_for_one — restart failed child and all children started AFTER it
# Use when later children depend on earlier ones
Supervisor.start_link(children, strategy: :rest_for_one)
defmodule MyApp.Application do
use Application
@impl true
def start(_type, _args) do
children = [
MyApp.Repo,
{Phoenix.PubSub, name: MyApp.PubSub},
MyApp.Cache,
MyAppWeb.Endpoint
]
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
end
end
Use when you need to start processes on demand, not at boot.
defmodule MyApp.RoomSupervisor do
use DynamicSupervisor
def start_link(init_arg) do
DynamicSupervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
end
@impl true
def init(_init_arg) do
DynamicSupervisor.init(strategy: :one_for_one)
end
def start_room(room_id) do
spec = {MyApp.Room, room_id: room_id}
DynamicSupervisor.start_child(__MODULE__, spec)
end
def stop_room(pid) do
DynamicSupervisor.terminate_child(__MODULE__, pid)
end
end
# Parallel fetch with bounded timeout
task1 = Task.async(fn -> fetch_user_profile(user_id) end)
task2 = Task.async(fn -> fetch_user_posts(user_id) end)
profile = Task.await(task1, 5_000)
posts = Task.await(task2, 5_000)
# Process items concurrently with bounded concurrency
user_ids
|> Task.async_stream(&fetch_user/1, max_concurrency: 4, timeout: 10_000)
|> Enum.map(fn {:ok, result} -> result end)
For work that should be supervised but doesn't need a result:
# Add to your supervision tree
{Task.Supervisor, name: MyApp.TaskSupervisor}
# Start supervised tasks (automatically restarted on crash)
Task.Supervisor.start_child(MyApp.TaskSupervisor, fn ->
send_welcome_email(user)
end)
Use Agent for simple state when GenServer is overkill. If you need handle_info, timeouts, or complex logic, use GenServer instead.
defmodule MyApp.Counter do
use Agent
def start_link(initial_value) do
Agent.start_link(fn -> initial_value end, name: __MODULE__)
end
def value do
Agent.get(__MODULE__, & &1)
end
def increment do
Agent.update(__MODULE__, &(&1 + 1))
end
end
# In application supervision tree
{Registry, keys: :unique, name: MyApp.Registry}
# In GenServer start_link
def start_link(room_id) do
GenServer.start_link(__MODULE__, room_id,
name: {:via, Registry, {MyApp.Registry, {:room, room_id}}}
)
end
# Lookup
def get_room(room_id) do
case Registry.lookup(MyApp.Registry, {:room, room_id}) do
[{pid, _}] -> {:ok, pid}
[] -> {:error, :not_found}
end
end
# OK — single global process
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
# Bad — dynamic atom creation from user input
GenServer.start_link(__MODULE__, opts, name: String.to_atom("room_#{room_id}"))
# Link — bidirectional, crash propagates (use in supervisors)
Process.link(pid)
# Monitor — unidirectional, receive :DOWN message (use for observation)
ref = Process.monitor(pid)
@impl true
def handle_info({:DOWN, _ref, :process, pid, reason}, state) do
# Handle monitored process dying
{:noreply, cleanup(state, pid)}
end
When many processes need to read the same data and writes are infrequent:
# Create table in a GenServer (owner process)
@impl true
def init(_opts) do
table = :ets.new(:my_cache, [:named_table, :set, :public, read_concurrency: true])
{:ok, %{table: table}}
end
# Any process can read
:ets.lookup(:my_cache, key)
# Only owner should write (or use :public carefully)
:ets.insert(:my_cache, {key, value})
# Bad — bottleneck GenServer (all requests go through one process)
def get_user(id), do: GenServer.call(UserServer, {:get, id})
# Fix: Use ETS, a database, or partition work across multiple processes
# Bad — god process (one GenServer doing everything)
# Fix: Split into focused processes, each with one responsibility
# Bad — unmonitored Task.async
Task.async(fn -> do_work() end)
# no await or yield — caller loses track of work
# Fix: Always await, or use Task.Supervisor.start_child for fire-and-forget
# Bad — blocking the caller unnecessarily
def send_email(user) do
GenServer.call(EmailServer, {:send, user}) # Waits for email to send
end
# Fix: Use cast if caller doesn't need the result
def send_email(user) do
GenServer.cast(EmailServer, {:send, user})
end
# Start GenServer in test
test "get and put values" do
start_supervised!({MyApp.Cache, name: :test_cache})
assert MyApp.Cache.get(:key, :test_cache) == nil
MyApp.Cache.put(:key, "value", :test_cache)
assert MyApp.Cache.get(:key, :test_cache) == "value"
end
# Test with Task
test "concurrent fetch" do
task = Task.async(fn -> MyApp.fetch_data() end)
assert {:ok, data} = Task.await(task, 5_000)
end
See testing-essentials skill for comprehensive testing patterns.