Background task management system using APScheduler for scheduling and Celery for distributed execution.
Manages background tasks with APScheduler for scheduling and Celery for distributed execution.
/plugin marketplace add adelabdelgawad/fullstack-agents/plugin install adelabdelgawad-fullstack-agents-plugins-fullstack-agents@adelabdelgawad/fullstack-agentsThis skill inherits all available tools. When active, it can use any tool Claude has access to.
examples.mdreferences/scheduler-pattern.mdreferences/task-pattern.mdscripts/helper.pyBackground task management system using APScheduler for scheduling and Celery for distributed execution.
Use this skill when asked to:
┌─────────────────────────────────────────────────────────────┐
│ FastAPI Application │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Scheduler │ │ Tasks │ │ API │ │
│ │ Router │ │ Bridge │ │ Endpoints │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
└─────────┼────────────────┼────────────────┼─────────────────┘
│ │ │
▼ ▼ ▼
┌─────────────────────────────────────────────────────────────┐
│ Scheduler Service │
│ • Job CRUD operations │
│ • Execution tracking │
│ • Distributed locking │
│ • Instance management │
└─────────────────────────────┬───────────────────────────────┘
│
┌─────────────────────┼─────────────────────┐
│ │ │
▼ ▼ ▼
┌───────────────┐ ┌─────────────────┐ ┌───────────────┐
│ APScheduler │ │ Celery Bridge │ │ Database │
│ (Scheduling) │ │ (Dispatch) │ │ (Persistence)│
└───────────────┘ └────────┬────────┘ └───────────────┘
│
▼
┌─────────────────┐
│ Redis Broker │
└────────┬────────┘
│
▼
┌─────────────────┐
│ Celery Worker │
│ (Execution) │
└─────────────────┘
project/
├── celery_app.py # Celery configuration
├── tasks/
│ ├── __init__.py # Task exports
│ ├── celery_bridge.py # APScheduler-Celery bridge
│ ├── email.py # Email tasks
│ ├── attendance.py # Domain-specific tasks
│ ├── hris.py # Integration tasks
│ └── scheduler.py # Scheduler maintenance
├── api/
│ ├── v1/
│ │ └── scheduler.py # Scheduler endpoints
│ ├── services/
│ │ └── scheduler_service.py
│ ├── repositories/
│ │ └── scheduler_repository.py
│ └── schemas/
│ └── scheduler_schemas.py
└── db/
└── models.py # Job & execution models
# celery_app.py
from celery import Celery
from settings import settings
celery_app = Celery(
"app_name",
broker=settings.CELERY_BROKER_URL,
backend=settings.CELERY_RESULT_BACKEND,
include=[
"tasks.email",
"tasks.attendance",
"tasks.scheduler",
],
)
celery_app.conf.update(
# Serialization
task_serializer="json",
result_serializer="json",
accept_content=["json"],
# Reliability
task_acks_late=True,
task_reject_on_worker_lost=True,
task_track_started=True,
# Result expiration
result_expires=86400,
# Worker settings
worker_prefetch_multiplier=1,
worker_concurrency=10,
# Time limits
task_soft_time_limit=300,
task_time_limit=360,
# Retry defaults
task_default_retry_delay=60,
# Timezone
timezone="UTC",
enable_utc=True,
)
# tasks/celery_bridge.py
from typing import Optional
from settings import settings
_CELERY_TASK_REGISTRY = {}
def register_celery_task(job_key: str, task):
"""Register a Celery task for a job key."""
_CELERY_TASK_REGISTRY[job_key] = task
def dispatch_to_celery(
job_key: str,
execution_id: Optional[str] = None,
**kwargs
) -> Optional[str]:
"""Dispatch job to Celery if enabled."""
if not settings.CELERY_ENABLED:
return None # Fall back to inline
task = _CELERY_TASK_REGISTRY.get(job_key)
if not task:
return None # Fall back to inline
result = task.delay(execution_id=execution_id, **kwargs)
return result.id
def initialize_celery_tasks():
"""Register all Celery tasks at startup."""
from tasks.attendance import sync_attendance_task
from tasks.scheduler import cleanup_history_task
register_celery_task("attendance_sync", sync_attendance_task)
register_celery_task("history_cleanup", cleanup_history_task)
# tasks/scheduler.py
import asyncio
from celery import shared_task
def _run_async(coro):
"""Run async code in Celery worker."""
try:
loop = asyncio.get_running_loop()
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
return executor.submit(asyncio.run, coro).result()
except RuntimeError:
return asyncio.run(coro)
@shared_task(
bind=True,
max_retries=3,
default_retry_delay=60,
autoretry_for=(Exception,),
retry_backoff=True,
retry_backoff_max=300,
retry_jitter=True,
soft_time_limit=120,
time_limit=180,
)
def cleanup_history_task(
self,
execution_id: str = None,
retention_days: int = 30,
) -> dict:
"""Celery task with automatic retries."""
async def _execute():
from db.database import get_session
from api.services.scheduler_service import get_scheduler_service
async with get_session() as session:
service = get_scheduler_service()
result = await service.cleanup_history(session, retention_days)
# Update execution status
if execution_id:
await update_execution_status(session, execution_id, "success")
return result
return _run_async(_execute())
# api/services/scheduler_service.py
class SchedulerService:
"""Manages APScheduler jobs with database persistence."""
_instance: Optional["SchedulerService"] = None
_scheduler: Optional[AsyncIOScheduler] = None
def __init__(self):
self._repo = SchedulerRepository()
self._job_functions: Dict[str, Callable] = {}
@classmethod
def get_instance(cls) -> "SchedulerService":
if cls._instance is None:
cls._instance = cls()
return cls._instance
async def initialize(self, session: AsyncSession, mode: str = "embedded"):
"""Initialize scheduler and register instance."""
instance = await self._repo.register_instance(session, mode)
self._scheduler = AsyncIOScheduler()
return instance.id
async def start(self, session: AsyncSession):
"""Start scheduler and load jobs from database."""
if settings.CELERY_ENABLED:
from tasks.celery_bridge import initialize_celery_tasks
initialize_celery_tasks()
jobs = await self._repo.get_enabled_jobs(session)
for job in jobs:
await self._add_job_to_scheduler(job)
self._scheduler.start()
async def create_job(
self,
session: AsyncSession,
job_data: ScheduledJobCreate
) -> ScheduledJob:
"""Create new scheduled job."""
job = await self._repo.create_job(session, job_data)
if job.is_enabled:
await self._add_job_to_scheduler(job)
return job
async def trigger_job(
self,
session: AsyncSession,
job_id: str
) -> JobExecution:
"""Manually trigger job execution."""
job = await self._repo.get_job(session, job_id)
execution = await self._repo.create_execution(session, job_id)
# Dispatch to Celery or run inline
from tasks.celery_bridge import dispatch_to_celery
task_id = dispatch_to_celery(
job.job_key,
execution_id=execution.id
)
if not task_id:
# Run inline
await self._execute_job(session, job, execution)
return execution
# db/models.py
class ScheduledJob(Base):
__tablename__ = "scheduled_jobs"
id = Column(String(36), primary_key=True)
job_key = Column(String(100), unique=True, nullable=False)
name_en = Column(String(200))
name_ar = Column(String(200))
job_type = Column(String(20)) # "interval" or "cron"
# Interval config
interval_seconds = Column(Integer)
interval_minutes = Column(Integer)
interval_hours = Column(Integer)
interval_days = Column(Integer)
# Cron config
cron_expression = Column(String(100))
# APScheduler config
max_instances = Column(Integer, default=1)
misfire_grace_time = Column(Integer, default=60)
coalesce = Column(Boolean, default=True)
is_enabled = Column(Boolean, default=True)
is_active = Column(Boolean, default=True)
executions = relationship("ScheduledJobExecution", back_populates="job")
class ScheduledJobExecution(Base):
__tablename__ = "scheduled_job_executions"
id = Column(String(36), primary_key=True)
job_id = Column(String(36), ForeignKey("scheduled_jobs.id"))
execution_id = Column(String(36)) # Unique per execution
scheduled_at = Column(DateTime(timezone=True))
started_at = Column(DateTime(timezone=True))
completed_at = Column(DateTime(timezone=True))
duration_ms = Column(Integer)
status = Column(String(20)) # pending, running, success, failed
error_message = Column(Text)
result_summary = Column(Text)
job = relationship("ScheduledJob", back_populates="executions")
See the references/ directory for:
celery-pattern.md - Celery configuration and tasksscheduler-pattern.md - APScheduler integrationtask-pattern.md - Task definition patternsapi-pattern.md - Scheduler API endpointsThis skill should be used when the user asks to "create a slash command", "add a command", "write a custom command", "define command arguments", "use command frontmatter", "organize commands", "create command with file references", "interactive command", "use AskUserQuestion in command", or needs guidance on slash command structure, YAML frontmatter fields, dynamic arguments, bash execution in commands, user interaction patterns, or command development best practices for Claude Code.
This skill should be used when the user asks to "create an agent", "add an agent", "write a subagent", "agent frontmatter", "when to use description", "agent examples", "agent tools", "agent colors", "autonomous agent", or needs guidance on agent structure, system prompts, triggering conditions, or agent development best practices for Claude Code plugins.
This skill should be used when the user asks to "create a hook", "add a PreToolUse/PostToolUse/Stop hook", "validate tool use", "implement prompt-based hooks", "use ${CLAUDE_PLUGIN_ROOT}", "set up event-driven automation", "block dangerous commands", or mentions hook events (PreToolUse, PostToolUse, Stop, SubagentStop, SessionStart, SessionEnd, UserPromptSubmit, PreCompact, Notification). Provides comprehensive guidance for creating and implementing Claude Code plugin hooks with focus on advanced prompt-based hooks API.