Help us improve
Share bugs, ideas, or general feedback.
Configures Django + Celery for async task execution, periodic scheduling via Beat, retries, canvas workflows, monitoring, and testing.
npx claudepluginhub aaione/everything-claude-code-zhHow this skill is triggered — by the user, by Claude, or both
Slash command
/everything-claude-code:django-celeryThe summary Claude sees in its skill listing — used to decide when to auto-load this skill
在 Django 中使用 Celery 配合 Redis 或 RabbitMQ 进行后台任务处理的生产级模式。
Configures Celery async tasks in Django: broker setup, task design, beat scheduling, retries, canvas workflows, monitoring, and testing.
Sets up Celery 5.3+ distributed task queues with Beat scheduler, Redis/RabbitMQ brokers, workflow patterns, and FastAPI integration for background jobs, periodic tasks, and async processing.
Celery configuration templates for all frameworks (Django, Flask, FastAPI, standalone). Use when configuring Celery, setting up task queues, creating Celery apps, integrating with frameworks, or when user mentions Celery configuration, task queue setup, broker configuration, or framework integration.
Share bugs, ideas, or general feedback.
在 Django 中使用 Celery 配合 Redis 或 RabbitMQ 进行后台任务处理的生产级模式。
pip install celery[redis] django-celery-results django-celery-beat
celery.py — 应用入口# config/celery.py
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings.development')
app = Celery('myproject')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks() # 发现每个 INSTALLED_APP 中的 tasks.py
@app.task(bind=True, ignore_result=True)
def debug_task(self):
print(f'请求:{self.request!r}')
# config/__init__.py
from .celery import app as celery_app
__all__ = ('celery_app',)
# config/settings/base.py
# Broker(生产环境推荐 Redis)
CELERY_BROKER_URL = env('CELERY_BROKER_URL', default='redis://localhost:6379/0')
CELERY_RESULT_BACKEND = env('CELERY_RESULT_BACKEND', default='django-db')
# 序列化
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
# 任务行为
CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT = 30 * 60 # 硬限制:30 分钟
CELERY_TASK_SOFT_TIME_LIMIT = 25 * 60 # 软限制:发送 SoftTimeLimitExceeded
CELERY_WORKER_PREFETCH_MULTIPLIER = 1 # 防止 worker 囤积长任务
CELERY_TASK_ACKS_LATE = True # Worker 崩溃时重新入队
# 结果持久化
CELERY_RESULT_EXPIRES = 60 * 60 * 24 # 保留结果 24 小时
# Beat 调度器(用于周期性任务)
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
# 已安装应用
INSTALLED_APPS += [
'django_celery_results',
'django_celery_beat',
]
# 启动 worker(开发环境)
celery -A config worker --loglevel=info
# 启动 beat 调度器(周期性任务)
celery -A config beat --loglevel=info --scheduler django_celery_beat.schedulers:DatabaseScheduler
# 合并 worker + beat(仅开发,绝不用于生产)
celery -A config worker --beat --loglevel=info
# 生产:多 worker 并发
celery -A config worker --loglevel=warning --concurrency=4 -Q default,high_priority
# apps/notifications/tasks.py
from celery import shared_task
import logging
logger = logging.getLogger(__name__)
@shared_task(name='notifications.send_welcome_email')
def send_welcome_email(user_id: int) -> None:
"""向新注册用户发送欢迎邮件。"""
from apps.users.models import User
from apps.notifications.services import EmailService
try:
user = User.objects.get(pk=user_id)
except User.DoesNotExist:
logger.warning('send_welcome_email: 用户 %s 未找到', user_id)
return # 幂等 — 不抛异常,任务已无法完成
EmailService.send_welcome(user)
logger.info('欢迎邮件已发送给用户 %s', user_id)
@shared_task(
bind=True,
name='integrations.sync_to_crm',
max_retries=5,
default_retry_delay=60, # 首次重试前的秒数
autoretry_for=(ConnectionError, TimeoutError),
retry_backoff=True, # 指数退避
retry_backoff_max=600, # 上限 10 分钟
retry_jitter=True, # 随机化以避免惊群效应
)
def sync_contact_to_crm(self, contact_id: int) -> dict:
"""将联系人同步到外部 CRM,瞬时故障时重试。"""
from apps.crm.services import CRMClient
try:
result = CRMClient().sync(contact_id)
return result
except CRMClient.RateLimitError as exc:
# 从响应头获取特定重试延迟
raise self.retry(exc=exc, countdown=int(exc.retry_after))
设计任务使其可以安全地使用相同输入多次运行:
@shared_task(name='orders.mark_shipped')
def mark_order_shipped(order_id: int, tracking_number: str) -> None:
"""标记订单为已发货 — 可安全多次运行。"""
from apps.orders.models import Order
updated = Order.objects.filter(
pk=order_id,
status=Order.Status.PROCESSING, # 守卫:仅在未发货时更新
).update(
status=Order.Status.SHIPPED,
tracking_number=tracking_number,
)
if not updated:
logger.info('mark_order_shipped: 订单 %s 已发货或未找到', order_id)
from celery.exceptions import SoftTimeLimitExceeded
@shared_task(
bind=True,
name='reports.generate_pdf',
soft_time_limit=120,
time_limit=150,
)
def generate_pdf_report(self, report_id: int) -> str:
"""生成 PDF 报告,优雅处理超时。"""
from apps.reports.services import PDFGenerator
try:
path = PDFGenerator.build(report_id)
return path
except SoftTimeLimitExceeded:
# 硬终止前清理部分文件
PDFGenerator.cleanup(report_id)
raise
from datetime import timedelta
from django.utils import timezone
# 触发即忘(异步)
send_welcome_email.delay(user.pk)
# 未来调度
send_reminder.apply_async(args=[user.pk], countdown=3600) # 1 小时后
send_reminder.apply_async(args=[user.pk], eta=timezone.now() + timedelta(days=1))
# 带队列路由应用
sync_contact_to_crm.apply_async(args=[contact.pk], queue='high_priority')
# 同步运行(仅测试/调试)
result = generate_pdf_report.apply(args=[report.pk])
# config/settings/base.py
from celery.schedules import crontab
CELERY_BEAT_SCHEDULE = {
'cleanup-expired-sessions': {
'task': 'users.cleanup_expired_sessions',
'schedule': crontab(hour=2, minute=0), # 每天凌晨 2 点
},
'sync-inventory': {
'task': 'products.sync_inventory',
'schedule': 60.0, # 每 60 秒
},
'weekly-digest': {
'task': 'notifications.send_weekly_digest',
'schedule': crontab(day_of_week='monday', hour=8, minute=0),
},
}
# 从 Django admin 或代码管理周期性任务
from django_celery_beat.models import PeriodicTask, CrontabSchedule
import json
schedule, _ = CrontabSchedule.objects.get_or_create(
hour='*/6', minute='0',
timezone='UTC',
)
PeriodicTask.objects.update_or_create(
name='每 6 小时同步库存',
defaults={
'crontab': schedule,
'task': 'products.sync_inventory',
'args': json.dumps([]),
'enabled': True,
}
)
from celery import chain, group, chord
# Chain:顺序运行任务,传递结果
pipeline = chain(
fetch_data.s(source_id),
transform_data.s(), # 接收 fetch_data 结果作为第一个参数
load_to_warehouse.s(),
)
pipeline.delay()
# Group:并行运行任务
parallel = group(
send_welcome_email.s(user_id)
for user_id in new_user_ids
)
parallel.delay()
# Chord:并行任务 + 全部完成时的回调
result = chord(
group(process_chunk.s(chunk) for chunk in data_chunks),
aggregate_results.s(), # 以分块结果列表作为参数调用
)
result.delay()
# apps/core/tasks.py
from celery.signals import task_failure
@task_failure.connect
def on_task_failure(sender, task_id, exception, args, kwargs, traceback, einfo, **kw):
"""将所有任务失败记录到 Sentry / 告警系统。"""
import sentry_sdk
with sentry_sdk.new_scope() as scope:
scope.set_context('celery', {
'task': sender.name,
'task_id': task_id,
'args': args,
'kwargs': kwargs,
})
sentry_sdk.capture_exception(exception)
# 达到最大重试次数后将失败任务路由到死信队列
@shared_task(
bind=True,
max_retries=3,
name='payments.charge_card',
)
def charge_card(self, order_id: int) -> None:
from apps.payments.models import Order, FailedCharge
try:
_do_charge(order_id)
except Exception as exc:
if self.request.retries >= self.max_retries:
# 持久化到死信表供人工审查
FailedCharge.objects.create(
order_id=order_id,
error=str(exc),
task_id=self.request.id,
)
return # 不抛异常 — 任务永久失败
raise self.retry(exc=exc)
# tests/test_tasks.py
import pytest
from unittest.mock import patch, MagicMock
from apps.notifications.tasks import send_welcome_email
class TestSendWelcomeEmail:
@pytest.mark.django_db
def test_sends_email_to_existing_user(self, user):
with patch('apps.notifications.services.EmailService') as mock_email:
send_welcome_email(user.pk)
mock_email.send_welcome.assert_called_once_with(user)
@pytest.mark.django_db
def test_skips_missing_user_gracefully(self):
"""用户在入队和执行之间被删除时不应抛异常。"""
send_welcome_email(99999) # 不存在的用户 — 不得抛异常
# config/settings/test.py
CELERY_TASK_ALWAYS_EAGER = True # 在测试中同步运行任务
CELERY_TASK_EAGER_PROPAGATES = True # 从任务中重新抛出异常
# tests/test_integration.py
@pytest.mark.django_db
def test_registration_triggers_welcome_email(client):
with patch('apps.notifications.services.EmailService') as mock_email:
response = client.post('/api/users/', {
'email': 'new@example.com',
'password': 'strongpass123',
})
assert response.status_code == 201
mock_email.send_welcome.assert_called_once()
@pytest.mark.django_db
def test_task_retries_on_connection_error():
with patch('apps.crm.services.CRMClient.sync') as mock_sync:
mock_sync.side_effect = ConnectionError('timeout')
with pytest.raises(ConnectionError):
sync_contact_to_crm.apply(args=[1], throw=True)
assert mock_sync.call_count == 1 # eager 模式下仅首次尝试
# 检查活跃的 worker 和队列
celery -A config inspect active
celery -A config inspect stats
celery -A config inspect reserved
# 检查队列长度(Redis)
redis-cli llen celery
# Flower:基于 Web 的实时监控
pip install flower
celery -A config flower --port=5555
# 差:传递模型实例 — 到执行时可能已经过时
send_welcome_email.delay(user) # 绝不传递 ORM 对象
send_welcome_email.delay(user.pk) # 始终传递主键
# 差:在生产视图中同步调用任务
result = generate_report.apply() # 阻塞请求线程
# 差:没有守卫的非幂等任务
@shared_task
def charge_and_fulfill(order_id):
order.charge() # 如果任务重试可能重复扣款!
order.fulfill()
# 好:带状态守卫的幂等任务
@shared_task
def charge_and_fulfill(order_id):
order = Order.objects.select_for_update().get(pk=order_id)
if order.status != Order.Status.PENDING:
return # 已处理
order.charge()
order.fulfill()
| 检查项 | 设置 |
|---|---|
| Worker 崩溃时自动重启 | supervisord 或 systemd unit |
CELERY_TASK_ACKS_LATE = True | Worker 崩溃时重新入队任务 |
CELERY_WORKER_PREFETCH_MULTIPLIER = 1 | 长任务的公平分配 |
| 按优先级分离队列 | -Q default,high_priority,low_priority |
设置 CELERY_TASK_SOFT_TIME_LIMIT | 硬终止前的优雅超时 |
| Sentry 集成 | 捕获所有 task_failure 信号 |
| Flower 或其他监控工具 | 队列深度可视化 |
| Beat 仅在单节点运行 | 防止重复执行定时任务 |
django-patterns — ORM、服务层和项目结构django-tdd — 测试 Django 模型、视图和服务python-testing — pytest 配置和 fixture