From vastai-pack
Applies Vast.ai SDK patterns for Python and REST API: typed GPU queries, managed instance lifecycles, offer scoring, retries. For GPU cloud integrations and refactoring.
npx claudepluginhub jeremylongshore/claude-code-plugins-plus-skills --plugin vastai-packThis skill is limited to using the following tools:
Production-ready patterns for the Vast.ai CLI, Python SDK, and REST API at `cloud.vast.ai/api/v0`. Covers typed search queries, instance lifecycle management, offer scoring, and error handling.
Rents first GPU instance on Vast.ai via CLI, REST API, or Python; runs PyTorch workload and manages full lifecycle for beginners.
Guides Next.js Cache Components and Partial Prerendering (PPR): 'use cache' directives, cacheLife(), cacheTag(), revalidateTag() for caching, invalidation, static/dynamic optimization. Auto-activates on cacheComponents: true.
Guides building MCP servers enabling LLMs to interact with external services via tools. Covers best practices, TypeScript/Node (MCP SDK), Python (FastMCP).
Share bugs, ideas, or general feedback.
Production-ready patterns for the Vast.ai CLI, Python SDK, and REST API at cloud.vast.ai/api/v0. Covers typed search queries, instance lifecycle management, offer scoring, and error handling.
vastai-install-auth setuprequestsfrom dataclasses import dataclass
from typing import Optional
@dataclass
class GPUQuery:
num_gpus: int = 1
gpu_name: Optional[str] = None
gpu_ram_min: Optional[float] = None
reliability_min: float = 0.95
max_dph: Optional[float] = None
def to_filter(self) -> dict:
f = {"rentable": {"eq": True}, "num_gpus": {"eq": self.num_gpus},
"reliability2": {"gte": self.reliability_min}}
if self.gpu_name:
f["gpu_name"] = {"eq": self.gpu_name}
if self.gpu_ram_min:
f["gpu_ram"] = {"gte": self.gpu_ram_min}
if self.max_dph:
f["dph_total"] = {"lte": self.max_dph}
return f
from contextlib import contextmanager
@contextmanager
def managed_instance(client, offer_id, image, disk_gb=20, timeout=300):
"""Auto-destroy instance on exit or exception."""
inst = client.create_instance(offer_id, image, disk_gb)
instance_id = inst["new_contract"]
try:
info = client.poll_until_running(instance_id, timeout)
yield info
finally:
client.destroy_instance(instance_id)
# Usage
with managed_instance(client, offer["id"], "pytorch/pytorch:latest") as inst:
ssh_exec(inst["ssh_host"], inst["ssh_port"], "python train.py")
def score_offer(offer, weights=None):
w = weights or {"cost": 0.4, "reliability": 0.3, "perf": 0.3}
return (w["cost"] * (1.0 / max(offer["dph_total"], 0.01)) +
w["reliability"] * offer.get("reliability2", 0) * 100 +
w["perf"] * offer.get("dlperf", 0))
best = max(offers, key=score_offer)
import time
from functools import wraps
def retry(max_attempts=3, backoff=2):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for i in range(max_attempts):
try:
return func(*args, **kwargs)
except Exception as e:
if i == max_attempts - 1: raise
time.sleep(backoff ** i)
return wrapper
return decorator
import subprocess
def ssh_exec(host, port, cmd, timeout=300):
r = subprocess.run(
["ssh", "-p", str(port), "-o", "StrictHostKeyChecking=no",
f"root@{host}", cmd],
capture_output=True, text=True, timeout=timeout)
if r.returncode != 0:
raise RuntimeError(f"SSH failed: {r.stderr}")
return r.stdout
GPUQuery builder for search filters| Error | Cause | Solution |
|---|---|---|
| Offer unavailable | Already rented | Re-search and pick next best |
| SSH key rejected | Key not uploaded | Upload at cloud.vast.ai > SSH Keys |
| Instance destroyed unexpectedly | Spot preemption | Use managed_instance with checkpoints |
| API timeout | Network or server issue | Apply retry decorator |
See vastai-core-workflow-a for the complete provisioning workflow.
Cost-optimized scoring: Use weights {"cost": 0.7, "reliability": 0.2, "perf": 0.1} for batch jobs where price dominates. Use {"cost": 0.1, "reliability": 0.6, "perf": 0.3} for long training runs where uptime matters.
Auto-cleanup: Wrap any GPU job in managed_instance to guarantee destruction even on crash.