myfy TasksModule for background job processing with SQL-based queue. Use when working with TasksModule, @task decorator, background jobs, task workers, TaskContext, task retries, or async task dispatch.
/plugin marketplace add psincraian/myfy/plugin install myfy@myfy-pluginsThis skill inherits all available tools. When active, it can use any tool Claude has access to.
TasksModule provides SQL-based async task processing with DI injection and automatic retries.
from myfy.core import Application
from myfy.data import DataModule
from myfy.tasks import TasksModule, task
app = Application()
app.add_module(DataModule())
app.add_module(TasksModule(auto_create_tables=True))
# Define a task
@task
async def send_email(to: str, subject: str, body: str) -> None:
await email_service.send(to, subject, body)
# Dispatch from a route
@route.post("/notifications")
async def notify_user(body: NotifyRequest) -> dict:
task_id = await send_email.send(
to=body.email,
subject="Welcome!",
body="Thanks for signing up.",
)
return {"task_id": task_id}
Environment variables use the MYFY_TASKS_ prefix:
| Variable | Default | Description |
|---|---|---|
MYFY_TASKS_DEFAULT_MAX_RETRIES | 3 | Default retry attempts |
MYFY_TASKS_RETRY_DELAY_SECONDS | 60.0 | Seconds between retries |
MYFY_TASKS_WORKER_CONCURRENCY | 4 | Concurrent tasks per worker |
MYFY_TASKS_POLL_INTERVAL | 1.0 | Seconds between queue polls |
MYFY_TASKS_TASK_TIMEOUT | 300.0 | Max seconds per task |
from myfy.tasks import task
@task
async def process_order(order_id: int) -> str:
# Process the order
return f"Processed order {order_id}"
Services are automatically injected at runtime:
from myfy.tasks import task
from myfy.data import AsyncSession
@task
async def sync_user_data(user_id: int, session: AsyncSession) -> None:
# session is TASK-scoped (injected per task execution)
user = await session.get(User, user_id)
await sync_to_external_service(user)
@task(max_retries=5, retry_on=[ConnectionError, TimeoutError])
async def upload_file(file_path: str) -> str:
# Retries up to 5 times on connection/timeout errors
return await s3.upload(file_path)
# Returns immediately with task_id
task_id = await send_email.send(to="user@example.com", subject="Hi")
task_id = await send_email.send(
to="user@example.com",
subject="Hi",
_priority=10, # Higher priority = executes first
_delay=60, # Wait 60 seconds before executing
_max_retries=5, # Override default retries
)
result = await send_email.get_result(task_id, timeout=60)
if result.is_completed:
print(f"Success: {result.value}")
elif result.is_failed:
print(f"Error: {result.error}")
elif result.is_pending:
print("Still processing...")
Report progress from long-running tasks:
from myfy.tasks import task, TaskContext
@task
async def import_users(file_path: str, ctx: TaskContext) -> int:
users = load_users_from_file(file_path)
total = len(users)
for i, user in enumerate(users):
await create_user(user)
await ctx.update_progress(
current=i + 1,
total=total,
message=f"Importing user {i + 1}/{total}",
)
return total
Check progress from caller:
result = await import_users.get_result(task_id)
if result.progress:
current, total = result.progress
print(f"Progress: {current}/{total} - {result.progress_message}")
Start a worker process:
myfy tasks worker
With options:
myfy tasks worker --concurrency 8 --poll-interval 0.5
Workers:
| Status | Description |
|---|---|
pending | Queued, waiting for worker |
running | Being executed by worker |
completed | Finished successfully |
failed | Failed after all retries |
cancelled | Manually cancelled |
Tasks automatically retry on failure:
@task(max_retries=3, retry_on=[APIError])
async def call_api(url: str) -> dict:
response = await http.get(url)
if response.status >= 500:
raise APIError("Server error") # Will retry
return response.json()
After all retries fail:
failedget_result()| Type | Behavior |
|---|---|
Primitives (str, int, float, bool) | Serialized as task args |
| Lists, dicts | Serialized as task args |
| TaskContext | Injected by worker |
| Services (other types) | DI injected at runtime |
@task
async def complex_task(
order_id: int, # Serialized (primitive)
items: list[str], # Serialized (list)
ctx: TaskContext, # Injected (context)
session: AsyncSession, # DI injected (service)
settings: AppSettings, # DI injected (service)
) -> None:
...
This skill should be used when the user asks to "create an agent", "add an agent", "write a subagent", "agent frontmatter", "when to use description", "agent examples", "agent tools", "agent colors", "autonomous agent", or needs guidance on agent structure, system prompts, triggering conditions, or agent development best practices for Claude Code plugins.
This skill should be used when the user asks to "create a slash command", "add a command", "write a custom command", "define command arguments", "use command frontmatter", "organize commands", "create command with file references", "interactive command", "use AskUserQuestion in command", or needs guidance on slash command structure, YAML frontmatter fields, dynamic arguments, bash execution in commands, user interaction patterns, or command development best practices for Claude Code.
This skill should be used when the user asks to "create a hook", "add a PreToolUse/PostToolUse/Stop hook", "validate tool use", "implement prompt-based hooks", "use ${CLAUDE_PLUGIN_ROOT}", "set up event-driven automation", "block dangerous commands", or mentions hook events (PreToolUse, PostToolUse, Stop, SubagentStop, SessionStart, SessionEnd, UserPromptSubmit, PreCompact, Notification). Provides comprehensive guidance for creating and implementing Claude Code plugin hooks with focus on advanced prompt-based hooks API.