From databricks-pack
Implements exponential backoff with jitter, retries, and idempotency for Databricks API rate limits using Python databricks-sdk.
npx claudepluginhub jeremylongshore/claude-code-plugins-plus-skills --plugin databricks-packThis skill is limited to using the following tools:
Handle Databricks API rate limits with exponential backoff, token-bucket queuing, and idempotent job submissions. The API returns HTTP 429 with a `Retry-After` header when limits are exceeded. The SDK has built-in retries for transient errors, but custom logic is needed for bulk operations.
Conducts multi-round deep research on GitHub repos via API and web searches, generating markdown reports with executive summaries, timelines, metrics, and Mermaid diagrams.
Dynamically discovers and combines enabled skills into cohesive, unexpected delightful experiences like interactive HTML or themed artifacts. Activates on 'surprise me', inspiration, or boredom cues.
Generates images from structured JSON prompts via Python script execution. Supports reference images and aspect ratios for characters, scenes, products, visuals.
Handle Databricks API rate limits with exponential backoff, token-bucket queuing, and idempotent job submissions. The API returns HTTP 429 with a Retry-After header when limits are exceeded. The SDK has built-in retries for transient errors, but custom logic is needed for bulk operations.
databricks-sdk installedDatabricks enforces per-endpoint, per-workspace rate limits.
| API Category | Approx. Limit | Notes |
|---|---|---|
| Jobs API (create/run) | ~10 req/sec | Per workspace |
| Jobs API (list/get) | ~30 req/sec | Read endpoints more generous |
| Clusters API | ~10 req/sec | Create/start are expensive |
| DBFS / Files API | ~10 req/sec | Uploads have 1MB/5MB size limits |
| SQL Statement API | ~10 concurrent | Concurrent execution limit |
| Unity Catalog | ~100 req/min | Permission checks add up fast |
| Model Serving | Varies | ITPM/OTPM/QPH limits per endpoint |
from databricks.sdk.errors import TooManyRequests, ResourceConflict
w = WorkspaceClient()
try:
w.jobs.run_now(job_id=123)
except TooManyRequests as e:
print(f"Rate limited. Retry after: {e.retry_after_secs}s")
except ResourceConflict as e:
print(f"Conflict (409): {e.message}") # Job already running
import time
import random
from functools import wraps
from databricks.sdk.errors import TooManyRequests, TemporarilyUnavailable
def retry_with_backoff(max_retries=5, base_delay=1.0, max_delay=60.0):
"""Decorator for Databricks API calls with exponential backoff + jitter."""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except TooManyRequests as e:
if attempt == max_retries - 1:
raise
delay = min(base_delay * (2 ** attempt), max_delay)
jitter = random.uniform(0, delay * 0.5)
wait = e.retry_after_secs or (delay + jitter)
print(f"429 (attempt {attempt + 1}/{max_retries}), waiting {wait:.1f}s")
time.sleep(wait)
except TemporarilyUnavailable:
if attempt == max_retries - 1:
raise
delay = min(base_delay * (2 ** attempt), max_delay)
print(f"503 (attempt {attempt + 1}/{max_retries}), waiting {delay:.1f}s")
time.sleep(delay)
return func(*args, **kwargs)
return wrapper
return decorator
@retry_with_backoff(max_retries=5)
def get_job_status(w, job_id):
return w.jobs.get(job_id)
Prevent bursts when iterating over hundreds of resources.
import threading
import time
class RateLimiter:
"""Token-bucket rate limiter for Databricks API calls."""
def __init__(self, requests_per_second: float = 8.0):
self._interval = 1.0 / requests_per_second
self._lock = threading.Lock()
self._last_request = 0.0
def acquire(self):
"""Block until the next request slot is available."""
with self._lock:
now = time.monotonic()
wait = self._last_request + self._interval - now
if wait > 0:
time.sleep(wait)
self._last_request = time.monotonic()
# Usage: enumerate jobs without hitting limits
limiter = RateLimiter(requests_per_second=8)
def list_all_job_runs(w, job_ids: list[int]) -> dict:
results = {}
for job_id in job_ids:
limiter.acquire()
runs = list(w.jobs.list_runs(job_id=job_id, limit=5))
results[job_id] = runs
return results
from concurrent.futures import ThreadPoolExecutor, as_completed
def batch_run_jobs(w, job_ids: list[int], max_concurrent: int = 5) -> dict:
"""Run multiple jobs with concurrency throttling."""
results = {}
def run_one(job_id):
limiter.acquire()
try:
run = w.jobs.run_now(job_id=job_id)
return job_id, {"run_id": run.run_id, "status": "submitted"}
except TooManyRequests:
time.sleep(5)
run = w.jobs.run_now(job_id=job_id)
return job_id, {"run_id": run.run_id, "status": "submitted_after_retry"}
except ResourceConflict:
return job_id, {"status": "already_running"}
with ThreadPoolExecutor(max_workers=max_concurrent) as executor:
futures = {executor.submit(run_one, jid): jid for jid in job_ids}
for future in as_completed(futures):
job_id, result = future.result()
results[job_id] = result
return results
Prevent duplicate runs when retrying failed submissions using idempotency_token.
import hashlib
from datetime import datetime
def submit_idempotent(w, job_id: int, params: dict | None = None) -> int:
"""Submit a job run with idempotency — safe to retry."""
# Deterministic token: same job + date + params = same token
token_input = f"{job_id}-{datetime.utcnow().strftime('%Y-%m-%d')}-{sorted(params.items()) if params else ''}"
idempotency_token = hashlib.sha256(token_input.encode()).hexdigest()[:32]
run = w.jobs.run_now(
job_id=job_id,
idempotency_token=idempotency_token,
notebook_params=params or {},
)
return run.run_id
# Calling twice with same inputs on the same day returns the same run_id
run1 = submit_idempotent(w, 456, params={"date": "2025-03-01"})
run2 = submit_idempotent(w, 456, params={"date": "2025-03-01"})
assert run1 == run2 # No duplicate run created
| Error | HTTP | Solution |
|---|---|---|
TooManyRequests | 429 | Use Retry-After header, fall back to exponential backoff |
TemporarilyUnavailable | 503 | Retry with 5-10s delay; check status.databricks.com |
ResourceConflict | 409 | Job already running — check list_runs() before submitting |
TimeoutError | - | Increase SDK timeout: WorkspaceClient(timeout=120) |
| Sustained rate limiting | 429 | Reduce concurrency, spread load across time windows |
import requests
resp = requests.get(
f"{w.config.host}/api/2.1/jobs/list",
headers={"Authorization": f"Bearer {w.config.token}"},
)
print(f"Status: {resp.status_code}")
print(f"Retry-After: {resp.headers.get('Retry-After', 'N/A')}")
limiter = RateLimiter(requests_per_second=5)
terminated = 0
for cluster in w.clusters.list():
if cluster.state.value == "TERMINATED" and cluster.cluster_name.startswith("dev-"):
limiter.acquire()
w.clusters.permanent_delete(cluster_id=cluster.cluster_id)
terminated += 1
print(f"Cleaned up {terminated} dev clusters")
For security configuration, see databricks-security-basics.