From elixir-dev
Provides Oban patterns for JSON args, error handling with let-it-crash, snoozing, job chaining, and avoiding bugs in background jobs, retries, cron, workflows.
npx claudepluginhub gsmlg-dev/code-agent --plugin elixir-devThis skill uses the workspace's default tool permissions.
Paradigm shifts for Oban job processing. These insights prevent common bugs and guide proper patterns.
Searches, retrieves, and installs Agent Skills from prompts.chat registry using MCP tools like search_skills and get_skill. Activates for finding skills, browsing catalogs, or extending Claude.
Searches prompts.chat for AI prompt templates by keyword or category, retrieves by ID with variable handling, and improves prompts via AI. Use for discovering or enhancing prompts.
Guides MCP server integration in Claude Code plugins via .mcp.json or plugin.json configs for stdio, SSE, HTTP types, enabling external services as tools.
Paradigm shifts for Oban job processing. These insights prevent common bugs and guide proper patterns.
JOB ARGS ARE JSON. ATOMS BECOME STRINGS.
This single fact causes most Oban debugging headaches.
# Creating - atom keys are fine
MyWorker.new(%{user_id: 123})
# Processing - must use string keys (JSON converted atoms to strings)
def perform(%Oban.Job{args: %{"user_id" => user_id}}) do
# ...
end
Don't catch errors in Oban jobs. Let them bubble up to Oban for proper handling.
# Bad: Swallowing errors
def perform(%Oban.Job{} = job) do
case do_work(job.args) do
{:ok, result} -> {:ok, result}
{:error, reason} ->
Logger.error("Failed: #{reason}")
{:ok, :failed} # Silently marks as complete!
end
end
# Good: Let errors propagate
def perform(%Oban.Job{} = job) do
result = do_work!(job.args) # Raises on failure
{:ok, result}
end
# Or return error tuple - Oban treats as failure
def perform(%Oban.Job{} = job) do
case do_work(job.args) do
{:ok, result} -> {:ok, result}
{:error, reason} -> {:error, reason} # Oban will retry
end
end
Only catch errors when you need custom retry logic or want to mark a job as permanently failed:
def perform(%Oban.Job{} = job) do
case external_api_call(job.args) do
{:ok, result} -> {:ok, result}
{:error, :not_found} -> {:cancel, :resource_not_found} # Don't retry
{:error, :rate_limited} -> {:snooze, 60} # Retry in 60 seconds
{:error, _} -> {:error, :will_retry} # Normal retry
end
end
Use {:snooze, seconds} for polling external state instead of manual retry logic:
def perform(%Oban.Job{} = job) do
if external_thing_finished?(job.args) do
{:ok, :done}
else
{:snooze, 5} # Check again in 5 seconds
end
end
For simple sequential chains (JobA → JobB → JobC), have each job enqueue the next:
def perform(%Oban.Job{} = job) do
result = do_work(job.args)
# Enqueue next job on success
NextWorker.new(%{data: result}) |> Oban.insert()
{:ok, result}
end
Don't reach for Oban Pro Workflows for linear chains.
Prevent duplicate jobs with the unique option:
use Oban.Worker,
queue: :default,
unique: [period: 60] # Only one job with same args per 60 seconds
# Or scope uniqueness to specific fields
unique: [period: 300, keys: [:user_id]]
Gotcha: Uniqueness is checked on insert, not execution. Two identical jobs inserted 61 seconds apart will both run.
For millions of records, chunk work into batches rather than one job per item:
# Bad: One job per contact (millions of jobs = database strain)
Enum.each(contacts, &ContactWorker.new(%{id: &1.id}) |> Oban.insert())
# Good: Chunk into batches
contacts
|> Enum.chunk_every(100)
|> Enum.each(&BatchWorker.new(%{contact_ids: Enum.map(&1, fn c -> c.id end)}) |> Oban.insert())
Use bulk inserts without uniqueness constraints for maximum throughput.
Unlike regular job args, cascade context preserves atoms:
# Creating - atom keys
Workflow.put_context(%{score_run_id: id})
# Processing - atom keys still work!
def my_cascade(%{score_run_id: id}) do
# ...
end
# Dot notation works too
def later_step(context) do
context.score_run_id
context.previous_result
end
| Creating | Processing | |
|---|---|---|
| Regular jobs | atoms ok | strings only |
| Cascade context | atoms ok | atoms ok |
Reserve Workflows for:
Don't use Workflows for simple A → B → C chains.
When you need a parent workflow to wait for a sub-workflow to complete before continuing, use add_graft instead of add_workflow.
| Method | Sub-workflow completes before deps run? | Output accessible? |
|---|---|---|
add_workflow | No - just inserts jobs | No |
add_graft | Yes - waits for all jobs | Yes, via recorded values |
Don't couple unrelated concerns (e.g., notifications) to domain-specific workflows (e.g., scoring). Instead, create a higher-level orchestrator:
# Bad: Notification logic buried in AggregateScores
defmodule AggregateScores do
def workflow(score_run_id) do
Workflow.new()
|> Workflow.add(:aggregate, AggregateJob.new(...))
|> Workflow.add(:send_notification, SendEmail.new(...), deps: :aggregate) # Wrong place!
end
end
# Good: Higher-level workflow composes scoring + notification
defmodule FullRunWithNotifications do
def workflow(site_url, opts) do
notification_opts = build_notification_opts(opts)
Workflow.new()
|> Workflow.put_context(%{notification_opts: notification_opts})
|> Workflow.add_graft(:scoring, &graft_full_run/1)
|> Workflow.add_cascade(:send_notification, &send_notification/1, deps: :scoring)
end
defp graft_full_run(context) do
# Sub-workflow doesn't know about notifications
FullRun.workflow(context.site_url, context.opts)
|> Workflow.apply_graft()
|> Oban.insert_all()
end
end
For a grafted workflow's output to be available to dependent steps, the final job must use recorded: true:
defmodule FinalJob do
use Oban.Pro.Worker, queue: :default, recorded: true
def perform(%Oban.Job{} = job) do
# Return value becomes available in context
{:ok, %{score_run_id: score_run_id, composite_score: score}}
end
end
Add jobs to a running workflow with Workflow.append/2:
def perform(%Oban.Job{} = job) do
if needs_extra_step?(job.args) do
job
|> Workflow.append()
|> Workflow.add(:extra, ExtraWorker.new(%{}), deps: [:current_step])
|> Oban.insert_all()
end
{:ok, :done}
end
Caveat: Cannot override context or add dependencies to already-running jobs. For complex dynamic scenarios, check external state in the job itself.
To run a final job after multiple paginated workflows complete, use Batch callbacks:
# Wrap workflows in a shared batch
batch_id = "import-#{import_id}"
pages
|> Enum.each(fn page ->
PageWorkflow.workflow(page)
|> Batch.from_workflow(batch_id: batch_id)
|> Oban.insert_all()
end)
# Add completion callback
Batch.new(batch_id: batch_id)
|> Batch.add_callback(:completed, CompletionWorker)
|> Oban.insert()
Tip: Include pagination workers in the batch to prevent premature completion.
Don't use inline testing mode - workflows need database interaction.
# Use run_workflow/1 for integration tests
assert %{completed: 3} =
Workflow.new()
|> Workflow.add(:a, WorkerA.new(%{}))
|> Workflow.add(:b, WorkerB.new(%{}), deps: [:a])
|> Workflow.add(:c, WorkerC.new(%{}), deps: [:b])
|> run_workflow()
For testing recorded values between workers, insert predecessor jobs with pre-filled metadata.
Non-Pro:
perform/1{:ok, _}Pro:
add_workflow when you need to wait for completionrecorded: true when you need output from grafted workflowsAny of these? Re-read the serialization rules.