From vastai-pack
Apply production-ready Vast.ai SDK patterns for Python and REST API. Use when implementing Vast.ai integrations, refactoring SDK usage, or establishing coding standards for GPU cloud operations. Trigger with phrases like "vastai SDK patterns", "vastai best practices", "vastai code patterns", "idiomatic vastai".
npx claudepluginhub flight505/skill-forge --plugin vastai-packThis skill is limited to using the following tools:
Production-ready patterns for the Vast.ai CLI, Python SDK, and REST API at `cloud.vast.ai/api/v0`. Covers typed search queries, instance lifecycle management, offer scoring, and error handling.
Guides Next.js Cache Components and Partial Prerendering (PPR): 'use cache' directives, cacheLife(), cacheTag(), revalidateTag() for caching, invalidation, static/dynamic optimization. Auto-activates on cacheComponents: true.
Guides building MCP servers enabling LLMs to interact with external services via tools. Covers best practices, TypeScript/Node (MCP SDK), Python (FastMCP).
Share bugs, ideas, or general feedback.
Production-ready patterns for the Vast.ai CLI, Python SDK, and REST API at cloud.vast.ai/api/v0. Covers typed search queries, instance lifecycle management, offer scoring, and error handling.
vastai-install-auth setuprequestsfrom dataclasses import dataclass
from typing import Optional
@dataclass
class GPUQuery:
num_gpus: int = 1
gpu_name: Optional[str] = None
gpu_ram_min: Optional[float] = None
reliability_min: float = 0.95
max_dph: Optional[float] = None
def to_filter(self) -> dict:
f = {"rentable": {"eq": True}, "num_gpus": {"eq": self.num_gpus},
"reliability2": {"gte": self.reliability_min}}
if self.gpu_name:
f["gpu_name"] = {"eq": self.gpu_name}
if self.gpu_ram_min:
f["gpu_ram"] = {"gte": self.gpu_ram_min}
if self.max_dph:
f["dph_total"] = {"lte": self.max_dph}
return f
from contextlib import contextmanager
@contextmanager
def managed_instance(client, offer_id, image, disk_gb=20, timeout=300):
"""Auto-destroy instance on exit or exception."""
inst = client.create_instance(offer_id, image, disk_gb)
instance_id = inst["new_contract"]
try:
info = client.poll_until_running(instance_id, timeout)
yield info
finally:
client.destroy_instance(instance_id)
# Usage
with managed_instance(client, offer["id"], "pytorch/pytorch:latest") as inst:
ssh_exec(inst["ssh_host"], inst["ssh_port"], "python train.py")
def score_offer(offer, weights=None):
w = weights or {"cost": 0.4, "reliability": 0.3, "perf": 0.3}
return (w["cost"] * (1.0 / max(offer["dph_total"], 0.01)) +
w["reliability"] * offer.get("reliability2", 0) * 100 +
w["perf"] * offer.get("dlperf", 0))
best = max(offers, key=score_offer)
import time
from functools import wraps
def retry(max_attempts=3, backoff=2):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for i in range(max_attempts):
try:
return func(*args, **kwargs)
except Exception as e:
if i == max_attempts - 1: raise
time.sleep(backoff ** i)
return wrapper
return decorator
import subprocess
def ssh_exec(host, port, cmd, timeout=300):
r = subprocess.run(
["ssh", "-p", str(port), "-o", "StrictHostKeyChecking=no",
f"root@{host}", cmd],
capture_output=True, text=True, timeout=timeout)
if r.returncode != 0:
raise RuntimeError(f"SSH failed: {r.stderr}")
return r.stdout
GPUQuery builder for search filters| Error | Cause | Solution |
|---|---|---|
| Offer unavailable | Already rented | Re-search and pick next best |
| SSH key rejected | Key not uploaded | Upload at cloud.vast.ai > SSH Keys |
| Instance destroyed unexpectedly | Spot preemption | Use managed_instance with checkpoints |
| API timeout | Network or server issue | Apply retry decorator |
See vastai-core-workflow-a for the complete provisioning workflow.
Cost-optimized scoring: Use weights {"cost": 0.7, "reliability": 0.2, "perf": 0.1} for batch jobs where price dominates. Use {"cost": 0.1, "reliability": 0.6, "perf": 0.3} for long training runs where uptime matters.
Auto-cleanup: Wrap any GPU job in managed_instance to guarantee destruction even on crash.