Help us improve
Share bugs, ideas, or general feedback.
From klingai-pack
Builds async Kling AI video generation workflows using Redis queues, callbacks, polling, and Python code for task submission and job processing.
npx claudepluginhub jeremylongshore/claude-code-plugins-plus-skills --plugin klingai-packHow this skill is triggered — by the user, by Claude, or both
Slash command
/klingai-pack:klingai-async-workflowsThis skill is limited to the following tools:
The summary Claude sees in its skill listing — used to decide when to auto-load this skill
Kling AI video generation is inherently async: you submit a task, then poll or receive a callback when done. This skill covers production patterns for integrating this into larger systems using queues, state machines, and event-driven architectures.
Provides production reference architecture for scalable Kling AI video generation platforms, including API gateway, job queues, workers, storage, and monitoring.
Integrates generative video into applications: text-to-video, image-to-video, avatar video. Covers async architecture, cost optimization, cinematographic prompting, provider selection (FAL.ai, Veo, Sora, Runway).
Generates videos from text prompts or images, animates still images, and creates talking avatars from photos with audio using Kling AI models (VIDEO 3.0, Avatar 2.0, etc.). Handles multi-shot storyboards, character consistency, and prompt engineering.
Share bugs, ideas, or general feedback.
Kling AI video generation is inherently async: you submit a task, then poll or receive a callback when done. This skill covers production patterns for integrating this into larger systems using queues, state machines, and event-driven architectures.
import jwt, time, os, requests
BASE = "https://api.klingai.com/v1"
def get_headers():
ak, sk = os.environ["KLING_ACCESS_KEY"], os.environ["KLING_SECRET_KEY"]
token = jwt.encode(
{"iss": ak, "exp": int(time.time()) + 1800, "nbf": int(time.time()) - 5},
sk, algorithm="HS256", headers={"alg": "HS256", "typ": "JWT"}
)
return {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
def submit_async(prompt, callback_url=None, **kwargs):
"""Submit task and return immediately."""
body = {
"model_name": kwargs.get("model", "kling-v2-master"),
"prompt": prompt,
"duration": str(kwargs.get("duration", 5)),
"mode": kwargs.get("mode", "standard"),
}
if callback_url:
body["callback_url"] = callback_url
r = requests.post(f"{BASE}/videos/text2video", headers=get_headers(), json=body)
return r.json()["data"]["task_id"]
import redis
import json
r = redis.Redis()
# Producer: enqueue video generation requests
def enqueue_video_job(prompt, metadata=None):
job = {
"id": f"job_{int(time.time() * 1000)}",
"prompt": prompt,
"metadata": metadata or {},
"status": "queued",
"created_at": time.time(),
}
r.lpush("kling:jobs:pending", json.dumps(job))
return job["id"]
# Worker: process jobs from queue
def process_jobs(max_concurrent=3):
active_tasks = {}
while True:
# Submit new jobs if under concurrency limit
while len(active_tasks) < max_concurrent:
raw = r.rpop("kling:jobs:pending")
if not raw:
break
job = json.loads(raw)
task_id = submit_async(job["prompt"])
active_tasks[task_id] = job
r.hset("kling:jobs:active", task_id, json.dumps(job))
# Check active tasks
completed = []
for task_id, job in active_tasks.items():
result = requests.get(
f"{BASE}/videos/text2video/{task_id}", headers=get_headers()
).json()
status = result["data"]["task_status"]
if status == "succeed":
job["status"] = "completed"
job["video_url"] = result["data"]["task_result"]["videos"][0]["url"]
r.lpush("kling:jobs:completed", json.dumps(job))
completed.append(task_id)
elif status == "failed":
job["status"] = "failed"
job["error"] = result["data"].get("task_status_msg")
r.lpush("kling:jobs:failed", json.dumps(job))
completed.append(task_id)
for tid in completed:
active_tasks.pop(tid)
r.hdel("kling:jobs:active", tid)
time.sleep(10)
from enum import Enum
from dataclasses import dataclass, field
from typing import Optional
class JobState(Enum):
QUEUED = "queued"
SUBMITTING = "submitting"
PROCESSING = "processing"
DOWNLOADING = "downloading"
COMPLETED = "completed"
FAILED = "failed"
RETRYING = "retrying"
@dataclass
class VideoJob:
prompt: str
state: JobState = JobState.QUEUED
task_id: Optional[str] = None
video_url: Optional[str] = None
error: Optional[str] = None
attempts: int = 0
max_attempts: int = 3
def can_retry(self) -> bool:
return self.state == JobState.FAILED and self.attempts < self.max_attempts
def transition(self, new_state: JobState):
valid = {
JobState.QUEUED: {JobState.SUBMITTING},
JobState.SUBMITTING: {JobState.PROCESSING, JobState.FAILED},
JobState.PROCESSING: {JobState.DOWNLOADING, JobState.FAILED},
JobState.DOWNLOADING: {JobState.COMPLETED, JobState.FAILED},
JobState.FAILED: {JobState.RETRYING},
JobState.RETRYING: {JobState.SUBMITTING},
}
if new_state not in valid.get(self.state, set()):
raise ValueError(f"Invalid transition: {self.state} -> {new_state}")
self.state = new_state
async def video_pipeline(prompt, steps=None):
"""Chain: generate -> extend -> download -> upload."""
steps = steps or ["generate", "extend", "download"]
# Step 1: Generate
task_id = submit_async(prompt, duration=5)
result = poll_task("/videos/text2video", task_id) # from job-monitoring skill
video_url = result["videos"][0]["url"]
# Step 2: Extend (optional)
if "extend" in steps:
ext_r = requests.post(f"{BASE}/videos/video-extend", headers=get_headers(), json={
"task_id": task_id,
"prompt": f"Continue: {prompt}",
"duration": "5",
}).json()
ext_result = poll_task("/videos/video-extend", ext_r["data"]["task_id"])
video_url = ext_result["videos"][0]["url"]
# Step 3: Download
if "download" in steps:
video_data = requests.get(video_url).content
filepath = f"output/{task_id}.mp4"
with open(filepath, "wb") as f:
f.write(video_data)
return filepath
return video_url
# Use callback_url to avoid polling entirely
task_id = submit_async(
"Sunset over ocean with sailboats",
callback_url="https://your-app.com/webhooks/kling"
)
# Your webhook handler triggers next pipeline step
# See klingai-webhook-config skill for receiver implementation