Provides production-grade Django patterns including architecture, DRF REST API design, ORM best practices, caching, signals, middleware, and scalable project structure.
npx claudepluginhub xu-xiang/everything-claude-code-zhThis skill uses the workspace's default tool permissions.
适用于可扩展、可维护应用程序的生产级 Django 架构模式。
Provides Django architecture patterns, DRF REST API design, ORM best practices, caching, signals, middleware for production-grade apps.
Provides Django patterns for project structure, split settings, DRF REST APIs, ORM best practices, caching, signals, middleware, and production setups.
Provides Django patterns for project structure, ORM optimization, DRF serializers, signals, and custom middleware.
Share bugs, ideas, or general feedback.
适用于可扩展、可维护应用程序的生产级 Django 架构模式。
myproject/
├── config/
│ ├── __init__.py
│ ├── settings/
│ │ ├── __init__.py
│ │ ├── base.py # 基础配置
│ │ ├── development.py # 开发配置
│ │ ├── production.py # 生产配置
│ │ └── test.py # 测试配置
│ ├── urls.py
│ ├── wsgi.py
│ └── asgi.py
├── manage.py
└── apps/
├── __init__.py
├── users/
│ ├── __init__.py
│ ├── models.py
│ ├── views.py
│ ├── serializers.py
│ ├── urls.py
│ ├── permissions.py
│ ├── filters.py
│ ├── services.py
│ └── tests/
└── products/
└── ...
# config/settings/base.py
from pathlib import Path
BASE_DIR = Path(__file__).resolve().parent.parent.parent
SECRET_KEY = env('DJANGO_SECRET_KEY')
DEBUG = False
ALLOWED_HOSTS = []
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'rest_framework',
'rest_framework.authtoken',
'corsheaders',
# 本地应用
'apps.users',
'apps.products',
]
MIDDLEWARE = [
'django.middleware.security.SecurityMiddleware',
'whitenoise.middleware.WhiteNoiseMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'corsheaders.middleware.CorsMiddleware',
'django.middleware.common.CommonMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
]
ROOT_URLCONF = 'config.urls'
WSGI_APPLICATION = 'config.wsgi.application'
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.postgresql',
'NAME': env('DB_NAME'),
'USER': env('DB_USER'),
'PASSWORD': env('DB_PASSWORD'),
'HOST': env('DB_HOST'),
'PORT': env('DB_PORT', default='5432'),
}
}
# config/settings/development.py
from .base import *
DEBUG = True
ALLOWED_HOSTS = ['localhost', '127.0.0.1']
DATABASES['default']['NAME'] = 'myproject_dev'
INSTALLED_APPS += ['debug_toolbar']
MIDDLEWARE += ['debug_toolbar.middleware.DebugToolbarMiddleware']
EMAIL_BACKEND = 'django.core.mail.backends.console.EmailBackend'
# config/settings/production.py
from .base import *
DEBUG = False
ALLOWED_HOSTS = env.list('ALLOWED_HOSTS')
SECURE_SSL_REDIRECT = True
SESSION_COOKIE_SECURE = True
CSRF_COOKIE_SECURE = True
SECURE_HSTS_SECONDS = 31536000
SECURE_HSTS_INCLUDE_SUBDOMAINS = True
SECURE_HSTS_PRELOAD = True
# 日志
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'handlers': {
'file': {
'level': 'WARNING',
'class': 'logging.FileHandler',
'filename': '/var/log/django/django.log',
},
},
'loggers': {
'django': {
'handlers': ['file'],
'level': 'WARNING',
'propagate': True,
},
},
}
from django.db import models
from django.contrib.auth.models import AbstractUser
from django.core.validators import MinValueValidator, MaxValueValidator
class User(AbstractUser):
"""扩展 AbstractUser 的自定义用户模型。"""
email = models.EmailField(unique=True)
phone = models.CharField(max_length=20, blank=True)
birth_date = models.DateField(null=True, blank=True)
USERNAME_FIELD = 'email'
REQUIRED_FIELDS = ['username']
class Meta:
db_table = 'users'
verbose_name = 'user'
verbose_name_plural = 'users'
ordering = ['-date_joined']
def __str__(self):
return self.email
def get_full_name(self):
return f"{self.first_name} {self.last_name}".strip()
class Product(models.Model):
"""具有正确字段配置的产品模型。"""
name = models.CharField(max_length=200)
slug = models.SlugField(unique=True, max_length=250)
description = models.TextField(blank=True)
price = models.DecimalField(
max_digits=10,
decimal_places=2,
validators=[MinValueValidator(0)]
)
stock = models.PositiveIntegerField(default=0)
is_active = models.BooleanField(default=True)
category = models.ForeignKey(
'Category',
on_delete=models.CASCADE,
related_name='products'
)
tags = models.ManyToManyField('Tag', blank=True, related_name='products')
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
db_table = 'products'
ordering = ['-created_at']
indexes = [
models.Index(fields=['slug']),
models.Index(fields=['-created_at']),
models.Index(fields=['category', 'is_active']),
]
constraints = [
models.CheckConstraint(
check=models.Q(price__gte=0),
name='price_non_negative'
)
]
def __str__(self):
return self.name
def save(self, *args, **kwargs):
if not self.slug:
self.slug = slugify(self.name)
super().save(*args, **kwargs)
from django.db import models
class ProductQuerySet(models.QuerySet):
"""Product 模型的自定义查询集。"""
def active(self):
"""只返回活跃的产品。"""
return self.filter(is_active=True)
def with_category(self):
"""使用 select_related 加载关联类目,避免 N+1 查询。"""
return self.select_related('category')
def with_tags(self):
"""为多对多关系预加载标签。"""
return self.prefetch_related('tags')
def in_stock(self):
"""返回库存 > 0 的产品。"""
return self.filter(stock__gt=0)
def search(self, query):
"""根据名称或描述搜索产品。"""
return self.filter(
models.Q(name__icontains=query) |
models.Q(description__icontains=query)
)
class Product(models.Model):
# ... 字段 ...
objects = ProductQuerySet.as_manager() # 使用自定义查询集
# 使用示例
Product.objects.active().with_category().in_stock()
class ProductManager(models.Manager):
"""复杂查询的自定义管理器。"""
def get_or_none(self, **kwargs):
"""返回对象,如果不存在则返回 None,而不是抛出 DoesNotExist。"""
try:
return self.get(**kwargs)
except self.model.DoesNotExist:
return None
def create_with_tags(self, name, price, tag_names):
"""创建产品并关联标签。"""
product = self.create(name=name, price=price)
tags = [Tag.objects.get_or_create(name=name)[0] for name in tag_names]
product.tags.set(tags)
return product
def bulk_update_stock(self, product_ids, quantity):
"""批量更新多个产品的库存。"""
return self.filter(id__in=product_ids).update(stock=quantity)
# 在模型中引用
class Product(models.Model):
# ... 字段 ...
custom = ProductManager()
from rest_framework import serializers
from django.contrib.auth.password_validation import validate_password
from .models import Product, User
class ProductSerializer(serializers.ModelSerializer):
"""Product 模型的序列化器。"""
category_name = serializers.CharField(source='category.name', read_only=True)
average_rating = serializers.FloatField(read_only=True)
discount_price = serializers.SerializerMethodField()
class Meta:
model = Product
fields = [
'id', 'name', 'slug', 'description', 'price',
'discount_price', 'stock', 'category_name',
'average_rating', 'created_at'
]
read_only_fields = ['id', 'slug', 'created_at']
def get_discount_price(self, obj):
"""计算折扣价(如果适用)。"""
if hasattr(obj, 'discount') and obj.discount:
return obj.price * (1 - obj.discount.percent / 100)
return obj.price
def validate_price(self, value):
"""确保价格非负。"""
if value < 0:
raise serializers.ValidationError("价格不能为负数。")
return value
class ProductCreateSerializer(serializers.ModelSerializer):
"""创建产品的序列化器。"""
class Meta:
model = Product
fields = ['name', 'description', 'price', 'stock', 'category']
def validate(self, data):
"""多字段自定义校验。"""
if data['price'] > 10000 and data['stock'] > 100:
raise serializers.ValidationError(
"高价值产品不能拥有大批量库存。"
)
return data
class UserRegistrationSerializer(serializers.ModelSerializer):
"""用户注册序列化器。"""
password = serializers.CharField(
write_only=True,
required=True,
validators=[validate_password],
style={'input_type': 'password'}
)
password_confirm = serializers.CharField(write_only=True, style={'input_type': 'password'})
class Meta:
model = User
fields = ['email', 'username', 'password', 'password_confirm']
def validate(self, data):
"""校验两次输入的密码是否匹配。"""
if data['password'] != data['password_confirm']:
raise serializers.ValidationError({
"password_confirm": "两次密码输入不一致。"
})
return data
def create(self, validated_data):
"""创建带有哈希密码的用户。"""
validated_data.pop('password_confirm')
password = validated_data.pop('password')
user = User.objects.create(**validated_data)
user.set_password(password)
user.save()
return user
from rest_framework import viewsets, status, filters
from rest_framework.decorators import action
from rest_framework.response import Response
from rest_framework.permissions import IsAuthenticated, IsAdminUser
from django_filters.rest_framework import DjangoFilterBackend
from .models import Product
from .serializers import ProductSerializer, ProductCreateSerializer
from .permissions import IsOwnerOrReadOnly
from .filters import ProductFilter
from .services import ProductService
class ProductViewSet(viewsets.ModelViewSet):
"""Product 模型的视图集。"""
queryset = Product.objects.select_related('category').prefetch_related('tags')
permission_classes = [IsAuthenticated, IsOwnerOrReadOnly]
filter_backends = [DjangoFilterBackend, filters.SearchFilter, filters.OrderingFilter]
filterset_class = ProductFilter
search_fields = ['name', 'description']
ordering_fields = ['price', 'created_at', 'name']
ordering = ['-created_at']
def get_serializer_class(self):
"""根据动作 (action) 返回相应的序列化器。"""
if self.action == 'create':
return ProductCreateSerializer
return ProductSerializer
def perform_create(self, serializer):
"""保存时附带用户上下文。"""
serializer.save(created_by=self.request.user)
@action(detail=False, methods=['get'])
def featured(self, request):
"""返回推荐产品。"""
featured = self.queryset.filter(is_featured=True)[:10]
serializer = self.get_serializer(featured, many=True)
return Response(serializer.data)
@action(detail=True, methods=['post'])
def purchase(self, request, pk=None):
"""购买产品。"""
product = self.get_object()
service = ProductService()
result = service.purchase(product, request.user)
return Response(result, status=status.HTTP_201_CREATED)
@action(detail=False, methods=['get'], permission_classes=[IsAuthenticated])
def my_products(self, request):
"""返回当前用户创建的产品。"""
products = self.queryset.filter(created_by=request.user)
page = self.paginate_queryset(products)
serializer = self.get_serializer(page, many=True)
return self.get_paginated_response(serializer.data)
from rest_framework.decorators import api_view, permission_classes
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
@api_view(['POST'])
@permission_classes([IsAuthenticated])
def add_to_cart(request):
"""将产品加入用户购物车。"""
product_id = request.data.get('product_id')
quantity = request.data.get('quantity', 1)
try:
product = Product.objects.get(id=product_id)
except Product.DoesNotExist:
return Response(
{'error': '产品未找到'},
status=status.HTTP_404_NOT_FOUND
)
cart, _ = Cart.objects.get_or_create(user=request.user)
CartItem.objects.create(
cart=cart,
product=product,
quantity=quantity
)
return Response({'message': '已加入购物车'}, status=status.HTTP_201_CREATED)
# apps/orders/services.py
from typing import Optional
from django.db import transaction
from .models import Order, OrderItem
class OrderService:
"""订单相关业务逻辑的服务层。"""
@staticmethod
@transaction.atomic
def create_order(user, cart: Cart) -> Order:
"""从购物车创建订单。"""
order = Order.objects.create(
user=user,
total_price=cart.total_price
)
for item in cart.items.all():
OrderItem.objects.create(
order=order,
product=item.product,
quantity=item.quantity,
price=item.product.price
)
# 清空购物车
cart.items.all().delete()
return order
@staticmethod
def process_payment(order: Order, payment_data: dict) -> bool:
"""处理订单支付。"""
# 与支付网关集成
payment = PaymentGateway.charge(
amount=order.total_price,
token=payment_data['token']
)
if payment.success:
order.status = Order.Status.PAID
order.save()
# 发送确认邮件
OrderService.send_confirmation_email(order)
return True
return False
@staticmethod
def send_confirmation_email(order: Order):
"""发送订单确认邮件。"""
# 邮件发送逻辑
pass
from django.views.decorators.cache import cache_page
from django.utils.decorators import method_decorator
@method_decorator(cache_page(60 * 15), name='dispatch') # 15 分钟
class ProductListView(generic.ListView):
model = Product
template_name = 'products/list.html'
context_object_name = 'products'
{% load cache %}
{% cache 500 sidebar %}
... 昂贵的侧边栏内容 ...
{% endcache %}
from django.core.cache import cache
def get_featured_products():
"""获取带缓存的推荐产品。"""
cache_key = 'featured_products'
products = cache.get(cache_key)
if products is None:
products = list(Product.objects.filter(is_featured=True))
cache.set(cache_key, products, timeout=60 * 15) # 15 分钟
return products
from django.core.cache import cache
def get_popular_categories():
cache_key = 'popular_categories'
categories = cache.get(cache_key)
if categories is None:
categories = list(Category.objects.annotate(
product_count=Count('products')
).filter(product_count__gt=10).order_by('-product_count')[:20])
cache.set(cache_key, categories, timeout=60 * 60) # 1 小时
return categories
# apps/users/signals.py
from django.db.models.signals import post_save
from django.dispatch import receiver
from django.contrib.auth import get_user_model
from .models import Profile
User = get_user_model()
@receiver(post_save, sender=User)
def create_user_profile(sender, instance, created, **kwargs):
"""创建用户时自动创建 Profile。"""
if created:
Profile.objects.create(user=instance)
@receiver(post_save, sender=User)
def save_user_profile(sender, instance, **kwargs):
"""保存用户时同步保存 Profile。"""
instance.profile.save()
# apps/users/apps.py
from django.apps import AppConfig
class UsersConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'apps.users'
def ready(self):
"""应用就绪时导入信号。"""
import apps.users.signals
# middleware/active_user_middleware.py
import time
from django.utils.deprecation import MiddlewareMixin
class ActiveUserMiddleware(MiddlewareMixin):
"""用于追踪活跃用户的中间件。"""
def process_request(self, request):
"""处理传入的请求。"""
if request.user.is_authenticated:
# 更新最后活跃时间
request.user.last_active = timezone.now()
request.user.save(update_fields=['last_active'])
class RequestLoggingMiddleware(MiddlewareMixin):
"""用于记录请求日志的中间件。"""
def process_request(self, request):
"""记录请求开始时间。"""
request.start_time = time.time()
def process_response(self, request, response):
"""记录请求时长。"""
if hasattr(request, 'start_time'):
duration = time.time() - request.start_time
logger.info(f'{request.method} {request.path} - {response.status_code} - {duration:.3f}s')
return response
# 差 - N+1 查询
products = Product.objects.all()
for product in products:
print(product.category.name) # 每个产品都会触发单独查询
# 好 - 使用 select_related 的单次查询
products = Product.objects.select_related('category').all()
for product in products:
print(product.category.name)
# 好 - 多对多关系使用 prefetch_related
products = Product.objects.prefetch_related('tags').all()
for product in products:
for tag in product.tags.all():
print(tag.name)
class Product(models.Model):
name = models.CharField(max_length=200, db_index=True)
slug = models.SlugField(unique=True)
category = models.ForeignKey('Category', on_delete=models.CASCADE)
created_at = models.DateTimeField(auto_now_add=True)
class Meta:
indexes = [
models.Index(fields=['name']),
models.Index(fields=['-created_at']),
models.Index(fields=['category', 'created_at']),
]
# 批量创建
Product.objects.bulk_create([
Product(name=f'Product {i}', price=10.00)
for i in range(1000)
])
# 批量更新
products = Product.objects.all()[:100]
for product in products:
product.is_active = True
Product.objects.bulk_update(products, ['is_active'])
# 批量删除
Product.objects.filter(stock=0).delete()
| 模式 | 描述 |
|---|---|
| 配置拆分 (Split settings) | 分离开发/生产/测试配置 |
| 自定义查询集 (Custom QuerySet) | 可复用的查询方法 |
| 服务层 (Service Layer) | 业务逻辑分离 |
| 视图集 (ViewSet) | REST API 端点 |
| 序列化器校验 (Serializer validation) | 请求/响应转换 |
| select_related | 外键优化 |
| prefetch_related | 多对多优化 |
| 缓存优先 (Cache first) | 缓存高耗时操作 |
| 信号 (Signals) | 事件驱动的动作 |
| 中间件 (Middleware) | 请求/响应处理 |
请记住:Django 提供了许多便捷方式,但对于生产级应用程序,结构和组织比简洁的代码更重要。为可维护性而构建。