Help us improve
Share bugs, ideas, or general feedback.
From klingai-pack
Provides production reference architecture for scalable Kling AI video generation platforms, including API gateway, job queues, workers, storage, and monitoring.
npx claudepluginhub jeremylongshore/claude-code-plugins-plus-skills --plugin klingai-packHow this skill is triggered — by the user, by Claude, or both
Slash command
/klingai-pack:klingai-reference-architectureThis skill is limited to the following tools:
The summary Claude sees in its skill listing — used to decide when to auto-load this skill
Production architecture for video generation platforms built on Kling AI. Covers API gateway, job queue, worker pool, storage, and monitoring layers.
Builds async Kling AI video generation workflows using Redis queues, callbacks, polling, and Python code for task submission and job processing.
Generates videos from text prompts or images, animates still images, and creates talking avatars from photos with audio using Kling AI models (VIDEO 3.0, Avatar 2.0, etc.). Handles multi-shot storyboards, character consistency, and prompt engineering.
Generates videos from text prompts via fal.ai models like Kling 2.6 Pro, Sora 2, LTX-2 Pro, Runway Gen-3 Turbo, Luma Dream Machine; supplies endpoints, durations, aspect ratios, prompt structures, TypeScript/Python code.
Share bugs, ideas, or general feedback.
Production architecture for video generation platforms built on Kling AI. Covers API gateway, job queue, worker pool, storage, and monitoring layers.
User Request
|
[API Gateway / Load Balancer]
|
[Application Server]
|--- validate prompt & estimate cost
|--- enqueue job to Redis/SQS
|
[Job Queue (Redis / SQS / Pub/Sub)]
|
[Worker Pool (N workers)]
|--- generate JWT token
|--- POST https://api.klingai.com/v1/videos/text2video
|--- receive task_id
|--- register callback_url OR poll
|
[Webhook Receiver / Poller]
|--- receive completion callback
|--- download video from Kling CDN
|--- upload to S3/GCS
|--- update job status in DB
|--- notify user
|
[Object Storage (S3 / GCS)]
|
[CDN (CloudFront / Cloud CDN)]
|
User views video
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
app = FastAPI()
class VideoRequest(BaseModel):
prompt: str
model: str = "kling-v2-master"
duration: int = 5
mode: str = "standard"
@app.post("/api/videos")
async def create_video(req: VideoRequest):
# 1. Validate
if len(req.prompt) > 2500:
raise HTTPException(400, "Prompt exceeds 2500 chars")
# 2. Estimate cost
credits = estimate_credits(req.duration, req.mode)
if not budget_guard.check(credits):
raise HTTPException(402, "Budget exceeded")
# 3. Enqueue
job_id = await queue.enqueue({
"prompt": req.prompt,
"model": req.model,
"duration": str(req.duration),
"mode": req.mode,
})
return {"job_id": job_id, "status": "queued", "estimated_credits": credits}
import redis
import json
class VideoWorker:
def __init__(self, kling_client, storage_client, redis_url="redis://localhost"):
self.kling = kling_client
self.storage = storage_client
self.redis = redis.Redis.from_url(redis_url)
def process_loop(self):
while True:
raw = self.redis.brpop("kling:jobs:pending", timeout=5)
if not raw:
continue
job = json.loads(raw[1])
try:
# Submit to Kling API
result = self.kling.text_to_video(
job["prompt"],
model=job["model"],
duration=int(job["duration"]),
mode=job["mode"],
callback_url=os.environ.get("WEBHOOK_URL"),
)
# If using polling (no callback)
if isinstance(result, dict) and "videos" in result:
video_url = result["videos"][0]["url"]
stored_url = self.storage.download_and_upload(video_url, job["id"])
self.redis.publish("kling:events", json.dumps({
"type": "completed",
"job_id": job["id"],
"video_url": stored_url,
}))
except Exception as e:
self.redis.lpush("kling:jobs:failed", json.dumps({
**job, "error": str(e)
}))
| Component | Scaling Strategy |
|---|---|
| Workers | Scale by queue depth (1 worker per 3 concurrent API tasks) |
| API servers | Horizontal, behind load balancer |
| Redis | Single instance for <1K jobs/day, cluster for more |
| Storage | S3/GCS scales automatically |
| CDN | CloudFront/Cloud CDN for global delivery |
| Tier | Max Concurrent Tasks | Workers Needed |
|---|---|---|
| Free | 1 | 1 |
| Standard | 3 | 1 |
| Pro | 5 | 2 |
| Enterprise | 10+ | 3-4 |
# docker-compose.yml
services:
api:
build: ./api
ports: ["8000:8000"]
environment:
- REDIS_URL=redis://redis:6379
- KLING_ACCESS_KEY=${KLING_ACCESS_KEY}
- KLING_SECRET_KEY=${KLING_SECRET_KEY}
worker:
build: ./worker
deploy:
replicas: 2
environment:
- REDIS_URL=redis://redis:6379
- KLING_ACCESS_KEY=${KLING_ACCESS_KEY}
- KLING_SECRET_KEY=${KLING_SECRET_KEY}
- S3_BUCKET=${S3_BUCKET}
webhook:
build: ./webhook
ports: ["8001:8001"]
environment:
- REDIS_URL=redis://redis:6379
redis:
image: redis:7-alpine
volumes: ["redis-data:/data"]
volumes:
redis-data: