From klingai-pack
Provides Python client patterns for Kling AI API: JWT auto-refresh, exponential backoff polling, typed models, and error handling. For production integrations.
npx claudepluginhub jeremylongshore/claude-code-plugins-plus-skills --plugin klingai-packThis skill is limited to using the following tools:
Production-ready client patterns for the Kling AI API. Covers auto-refreshing JWT, typed request/response models, exponential backoff polling, async batch submission, and structured error handling.
Creates isolated Git worktrees for feature branches with prioritized directory selection, gitignore safety checks, auto project setup for Node/Python/Rust/Go, and baseline verification.
Executes implementation plans in current session by dispatching fresh subagents per independent task, with two-stage reviews: spec compliance then code quality.
Dispatches parallel agents to independently tackle 2+ tasks like separate test failures or subsystems without shared state or dependencies.
Production-ready client patterns for the Kling AI API. Covers auto-refreshing JWT, typed request/response models, exponential backoff polling, async batch submission, and structured error handling.
import jwt
import time
import os
import requests
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class KlingConfig:
access_key: str = field(default_factory=lambda: os.environ["KLING_ACCESS_KEY"])
secret_key: str = field(default_factory=lambda: os.environ["KLING_SECRET_KEY"])
base_url: str = "https://api.klingai.com/v1"
token_buffer_sec: int = 300
poll_interval_sec: int = 10
max_poll_attempts: int = 120 # 20 minutes max
timeout_sec: int = 30
class KlingClient:
"""Production Kling AI client with auto-refreshing JWT."""
def __init__(self, config: Optional[KlingConfig] = None):
self.config = config or KlingConfig()
self._token = None
self._token_expires = 0
@property
def _headers(self) -> dict:
now = int(time.time())
if now >= (self._token_expires - self.config.token_buffer_sec):
payload = {"iss": self.config.access_key, "exp": now + 1800, "nbf": now - 5}
self._token = jwt.encode(payload, self.config.secret_key,
algorithm="HS256",
headers={"alg": "HS256", "typ": "JWT"})
self._token_expires = now + 1800
return {"Authorization": f"Bearer {self._token}",
"Content-Type": "application/json"}
def _post(self, path: str, body: dict) -> dict:
r = requests.post(f"{self.config.base_url}{path}",
headers=self._headers, json=body,
timeout=self.config.timeout_sec)
r.raise_for_status()
return r.json()
def _get(self, path: str) -> dict:
r = requests.get(f"{self.config.base_url}{path}",
headers=self._headers,
timeout=self.config.timeout_sec)
r.raise_for_status()
return r.json()
def _poll_task(self, endpoint: str, task_id: str) -> dict:
"""Poll with exponential backoff until task completes."""
interval = self.config.poll_interval_sec
for attempt in range(self.config.max_poll_attempts):
time.sleep(interval)
result = self._get(f"{endpoint}/{task_id}")
status = result["data"]["task_status"]
if status == "succeed":
return result["data"]["task_result"]
elif status == "failed":
raise KlingGenerationError(result["data"].get("task_status_msg", "Unknown"))
# Increase interval up to 30s max
interval = min(interval * 1.2, 30)
raise KlingTimeoutError(f"Task {task_id} did not complete in time")
# --- Public API ---
def text_to_video(self, prompt: str, **kwargs) -> dict:
body = {"model_name": kwargs.get("model", "kling-v2-master"),
"prompt": prompt,
"duration": str(kwargs.get("duration", 5)),
"aspect_ratio": kwargs.get("aspect_ratio", "16:9"),
"mode": kwargs.get("mode", "standard")}
if kwargs.get("negative_prompt"):
body["negative_prompt"] = kwargs["negative_prompt"]
if kwargs.get("cfg_scale") is not None:
body["cfg_scale"] = kwargs["cfg_scale"]
if kwargs.get("callback_url"):
body["callback_url"] = kwargs["callback_url"]
task = self._post("/videos/text2video", body)
task_id = task["data"]["task_id"]
if kwargs.get("wait", True):
return self._poll_task("/videos/text2video", task_id)
return {"task_id": task_id}
def image_to_video(self, image_url: str, **kwargs) -> dict:
body = {"model_name": kwargs.get("model", "kling-v2-1"),
"image": image_url,
"duration": str(kwargs.get("duration", 5)),
"mode": kwargs.get("mode", "standard")}
if kwargs.get("prompt"):
body["prompt"] = kwargs["prompt"]
task = self._post("/videos/image2video", body)
task_id = task["data"]["task_id"]
if kwargs.get("wait", True):
return self._poll_task("/videos/image2video", task_id)
return {"task_id": task_id}
def extend_video(self, task_id: str, **kwargs) -> dict:
body = {"task_id": task_id,
"prompt": kwargs.get("prompt", ""),
"duration": str(kwargs.get("duration", 5)),
"mode": kwargs.get("mode", "standard")}
result = self._post("/videos/video-extend", body)
new_task_id = result["data"]["task_id"]
if kwargs.get("wait", True):
return self._poll_task("/videos/video-extend", new_task_id)
return {"task_id": new_task_id}
class KlingError(Exception):
pass
class KlingGenerationError(KlingError):
pass
class KlingTimeoutError(KlingError):
pass
client = KlingClient()
# Synchronous (waits for result)
result = client.text_to_video(
"A cat playing piano in a jazz club",
model="kling-v2-6",
mode="professional",
duration=5,
)
print(result["videos"][0]["url"])
# Fire-and-forget (returns task_id)
task = client.text_to_video("Ocean waves at sunset", wait=False)
print(f"Submitted: {task['task_id']}")
import jwt from "jsonwebtoken";
class KlingClient {
#token = null;
#tokenExp = 0;
constructor(ak = process.env.KLING_ACCESS_KEY, sk = process.env.KLING_SECRET_KEY) {
this.ak = ak;
this.sk = sk;
this.base = "https://api.klingai.com/v1";
}
#getHeaders() {
const now = Math.floor(Date.now() / 1000);
if (now >= this.#tokenExp - 300) {
this.#token = jwt.sign(
{ iss: this.ak, exp: now + 1800, nbf: now - 5 },
this.sk, { algorithm: "HS256", header: { typ: "JWT" } }
);
this.#tokenExp = now + 1800;
}
return { Authorization: `Bearer ${this.#token}`, "Content-Type": "application/json" };
}
async textToVideo(prompt, opts = {}) {
const res = await fetch(`${this.base}/videos/text2video`, {
method: "POST",
headers: this.#getHeaders(),
body: JSON.stringify({
model_name: opts.model ?? "kling-v2-master",
prompt,
duration: String(opts.duration ?? 5),
aspect_ratio: opts.aspectRatio ?? "16:9",
mode: opts.mode ?? "standard",
}),
});
const { data } = await res.json();
return opts.wait === false ? data : this.#poll("/videos/text2video", data.task_id);
}
async #poll(endpoint, taskId, interval = 10000) {
for (let i = 0; i < 120; i++) {
await new Promise((r) => setTimeout(r, interval));
const res = await fetch(`${this.base}${endpoint}/${taskId}`, {
headers: this.#getHeaders(),
});
const { data } = await res.json();
if (data.task_status === "succeed") return data.task_result;
if (data.task_status === "failed") throw new Error(data.task_status_msg);
interval = Math.min(interval * 1.2, 30000);
}
throw new Error(`Timeout: task ${taskId}`);
}
}
import functools
def retry_on_transient(max_retries=3, backoff_base=2):
"""Retry on 429 (rate limit) and 5xx (server) errors."""
def decorator(fn):
@functools.wraps(fn)
def wrapper(*args, **kwargs):
for attempt in range(max_retries + 1):
try:
return fn(*args, **kwargs)
except requests.HTTPError as e:
if e.response.status_code in (429, 500, 502, 503) and attempt < max_retries:
wait = backoff_base ** attempt
time.sleep(wait)
continue
raise
return wrapper
return decorator
# Apply to client methods
KlingClient._post = retry_on_transient()(KlingClient._post)