From klingai-pack
Build async video generation workflows with Kling AI using queues, state machines, and event-driven patterns. Trigger with phrases like 'klingai async', 'kling ai workflow', 'klingai pipeline', 'async video generation'.
npx claudepluginhub flight505/skill-forge --plugin klingai-packThis skill is limited to using the following tools:
Kling AI video generation is inherently async: you submit a task, then poll or receive a callback when done. This skill covers production patterns for integrating this into larger systems using queues, state machines, and event-driven architectures.
Mandates invoking relevant skills via tools before any response in coding sessions. Covers access, priorities, and adaptations for Claude Code, Copilot CLI, Gemini CLI.
Share bugs, ideas, or general feedback.
Kling AI video generation is inherently async: you submit a task, then poll or receive a callback when done. This skill covers production patterns for integrating this into larger systems using queues, state machines, and event-driven architectures.
import jwt, time, os, requests
BASE = "https://api.klingai.com/v1"
def get_headers():
ak, sk = os.environ["KLING_ACCESS_KEY"], os.environ["KLING_SECRET_KEY"]
token = jwt.encode(
{"iss": ak, "exp": int(time.time()) + 1800, "nbf": int(time.time()) - 5},
sk, algorithm="HS256", headers={"alg": "HS256", "typ": "JWT"}
)
return {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
def submit_async(prompt, callback_url=None, **kwargs):
"""Submit task and return immediately."""
body = {
"model_name": kwargs.get("model", "kling-v2-master"),
"prompt": prompt,
"duration": str(kwargs.get("duration", 5)),
"mode": kwargs.get("mode", "standard"),
}
if callback_url:
body["callback_url"] = callback_url
r = requests.post(f"{BASE}/videos/text2video", headers=get_headers(), json=body)
return r.json()["data"]["task_id"]
import redis
import json
r = redis.Redis()
# Producer: enqueue video generation requests
def enqueue_video_job(prompt, metadata=None):
job = {
"id": f"job_{int(time.time() * 1000)}",
"prompt": prompt,
"metadata": metadata or {},
"status": "queued",
"created_at": time.time(),
}
r.lpush("kling:jobs:pending", json.dumps(job))
return job["id"]
# Worker: process jobs from queue
def process_jobs(max_concurrent=3):
active_tasks = {}
while True:
# Submit new jobs if under concurrency limit
while len(active_tasks) < max_concurrent:
raw = r.rpop("kling:jobs:pending")
if not raw:
break
job = json.loads(raw)
task_id = submit_async(job["prompt"])
active_tasks[task_id] = job
r.hset("kling:jobs:active", task_id, json.dumps(job))
# Check active tasks
completed = []
for task_id, job in active_tasks.items():
result = requests.get(
f"{BASE}/videos/text2video/{task_id}", headers=get_headers()
).json()
status = result["data"]["task_status"]
if status == "succeed":
job["status"] = "completed"
job["video_url"] = result["data"]["task_result"]["videos"][0]["url"]
r.lpush("kling:jobs:completed", json.dumps(job))
completed.append(task_id)
elif status == "failed":
job["status"] = "failed"
job["error"] = result["data"].get("task_status_msg")
r.lpush("kling:jobs:failed", json.dumps(job))
completed.append(task_id)
for tid in completed:
active_tasks.pop(tid)
r.hdel("kling:jobs:active", tid)
time.sleep(10)
from enum import Enum
from dataclasses import dataclass, field
from typing import Optional
class JobState(Enum):
QUEUED = "queued"
SUBMITTING = "submitting"
PROCESSING = "processing"
DOWNLOADING = "downloading"
COMPLETED = "completed"
FAILED = "failed"
RETRYING = "retrying"
@dataclass
class VideoJob:
prompt: str
state: JobState = JobState.QUEUED
task_id: Optional[str] = None
video_url: Optional[str] = None
error: Optional[str] = None
attempts: int = 0
max_attempts: int = 3
def can_retry(self) -> bool:
return self.state == JobState.FAILED and self.attempts < self.max_attempts
def transition(self, new_state: JobState):
valid = {
JobState.QUEUED: {JobState.SUBMITTING},
JobState.SUBMITTING: {JobState.PROCESSING, JobState.FAILED},
JobState.PROCESSING: {JobState.DOWNLOADING, JobState.FAILED},
JobState.DOWNLOADING: {JobState.COMPLETED, JobState.FAILED},
JobState.FAILED: {JobState.RETRYING},
JobState.RETRYING: {JobState.SUBMITTING},
}
if new_state not in valid.get(self.state, set()):
raise ValueError(f"Invalid transition: {self.state} -> {new_state}")
self.state = new_state
async def video_pipeline(prompt, steps=None):
"""Chain: generate -> extend -> download -> upload."""
steps = steps or ["generate", "extend", "download"]
# Step 1: Generate
task_id = submit_async(prompt, duration=5)
result = poll_task("/videos/text2video", task_id) # from job-monitoring skill
video_url = result["videos"][0]["url"]
# Step 2: Extend (optional)
if "extend" in steps:
ext_r = requests.post(f"{BASE}/videos/video-extend", headers=get_headers(), json={
"task_id": task_id,
"prompt": f"Continue: {prompt}",
"duration": "5",
}).json()
ext_result = poll_task("/videos/video-extend", ext_r["data"]["task_id"])
video_url = ext_result["videos"][0]["url"]
# Step 3: Download
if "download" in steps:
video_data = requests.get(video_url).content
filepath = f"output/{task_id}.mp4"
with open(filepath, "wb") as f:
f.write(video_data)
return filepath
return video_url
# Use callback_url to avoid polling entirely
task_id = submit_async(
"Sunset over ocean with sailboats",
callback_url="https://your-app.com/webhooks/kling"
)
# Your webhook handler triggers next pipeline step
# See klingai-webhook-config skill for receiver implementation