Complete Python Cloudflare deployment system. PROACTIVELY activate for: (1) Python Workers with Pyodide, (2) FastAPI on Workers, (3) Cloudflare Containers for heavy compute, (4) Service bindings (RPC), (5) Environment variables in Workers, (6) Cold start optimization, (7) Workflows for durable execution, (8) GPU containers for AI. Provides: Worker code patterns, wrangler config, Dockerfile templates, FastAPI integration. Ensures edge deployment with proper architecture choices.
Deploys Python to Cloudflare Workers and Containers with FastAPI, service bindings, and GPU support.
npx claudepluginhub josiahsiegel/claude-plugin-marketplaceThis skill inherits all available tools. When active, it can use any tool Claude has access to.
| Platform | Cold Start | Packages | Best For |
|---|---|---|---|
| Workers (Pyodide) | ~50ms | Limited | API endpoints |
| Containers | ~10s | Any | Heavy compute, AI |
| Worker Pattern | Code |
|---|---|
| Basic handler | class Default(WorkerEntrypoint): |
| FastAPI | await asgi.fetch(app, request, self.env) |
| Env vars | self.env.API_KEY |
| Service binding | await self.env.WORKER_B.method() |
| Command | Purpose |
|---|---|
pywrangler init | Create Python Worker |
pywrangler dev | Local development |
pywrangler deploy | Deploy to Cloudflare |
| Container vs Worker | Recommendation |
|---|---|
| Simple API | Worker |
| pandas/numpy | Container |
| GPU/AI | Container |
| <50ms latency | Worker |
Use for Cloudflare edge deployment:
Related skills:
python-fastapipython-asynciopython-github-actionsCloudflare provides two ways to run Python:
# Install pywrangler (Python Workers CLI)
pip install pywrangler
# Create new Python Worker
pywrangler init my-worker
cd my-worker
# Project structure
my-worker/
├── src/
│ └── entry.py
├── pyproject.toml
└── wrangler.toml
# src/entry.py
from workers import Response, WorkerEntrypoint
class Default(WorkerEntrypoint):
async def fetch(self, request):
return Response("Hello from Python Worker!")
# wrangler.toml
name = "my-python-worker"
main = "src/entry.py"
compatibility_date = "2024-12-01"
[build]
command = ""
# pyproject.toml
[project]
name = "my-python-worker"
version = "0.1.0"
requires-python = ">=3.12"
dependencies = [
"fastapi",
"pydantic",
]
[dependency-groups]
dev = ["workers-py"]
# src/entry.py
from workers import WorkerEntrypoint
from fastapi import FastAPI, Request
from pydantic import BaseModel
app = FastAPI()
class Default(WorkerEntrypoint):
async def fetch(self, request):
import asgi
return await asgi.fetch(app, request, self.env)
@app.get("/")
async def root():
return {"message": "Hello from FastAPI on Cloudflare!"}
@app.get("/env")
async def get_env(req: Request):
env = req.scope.get("env")
if env and hasattr(env, "API_KEY"):
return {"has_api_key": True}
return {"has_api_key": False}
class Item(BaseModel):
name: str
price: float
description: str | None = None
@app.post("/items/")
async def create_item(item: Item):
return item
@app.get("/items/{item_id}")
async def read_item(item_id: int):
return {"item_id": item_id}
from workers import WorkerEntrypoint, Response
class Default(WorkerEntrypoint):
async def fetch(self, request):
# Access environment variables via self.env
api_key = self.env.API_KEY
database_url = self.env.DATABASE_URL
return Response(f"API Key exists: {bool(api_key)}")
# wrangler.toml
[vars]
API_KEY = "your-api-key"
DATABASE_URL = "your-database-url"
# Worker A - Caller
from workers import WorkerEntrypoint, Response
class Default(WorkerEntrypoint):
async def fetch(self, request):
# Call Worker B's add method
result = await self.env.WORKER_B.add(1, 2)
return Response(f"Result: {result}")
# Worker B - Callee
from workers import WorkerEntrypoint, Response
class Default(WorkerEntrypoint):
async def fetch(self, request):
return Response("Hello from Worker B")
def add(self, a: int, b: int) -> int:
return a + b
# Worker A wrangler.toml
[[services]]
binding = "WORKER_B"
service = "worker-b"
from workers import WorkerEntrypoint
from pyodide.ffi import to_js as _to_js
from js import Response, URL, Object, fetch
def to_js(x):
return _to_js(x, dict_converter=Object.fromEntries)
class Default(WorkerEntrypoint):
async def fetch(self, request):
request_url = URL.new(request.url)
params = request_url.searchParams
tags = params["tags"].split(",") if "tags" in params else []
url = params["uri"] or None
if url is None:
return Response.json(to_js({"error": "URL required"}), status=400)
# Fetch with cache tags
options = {"cf": {"cacheTags": tags}}
result = await fetch(url, to_js(options))
cache_status = result.headers["cf-cache-status"]
return Response.json(to_js({
"cache": cache_status,
"status": result.status
}))
# HTTP clients (async only)
import aiohttp
import httpx
# Data processing
import numpy # Limited support
import pandas # Limited support
# Standard library works well
import json
import re
import urllib
import base64
import hashlib
# Check Pyodide package list for full support
# https://pyodide.org/en/stable/usage/packages-in-pyodide.html
# Imports at module level are cached in memory snapshot
import json
import hashlib
from pydantic import BaseModel
# This code runs during snapshot creation
CACHED_CONFIG = load_config()
class Default(WorkerEntrypoint):
async def fetch(self, request):
# Fast access to pre-loaded modules and data
return Response(json.dumps(CACHED_CONFIG))
| Feature | Workers | Containers |
|---|---|---|
| Cold start | ~50ms | ~10s (with prewarming) |
| Package support | Pyodide-compatible | Any |
| Memory | Limited | Configurable |
| File system | No | Yes |
| Native binaries | No | Yes |
| Best for | API endpoints | Batch jobs, AI, heavy compute |
# Dockerfile
FROM python:3.12-slim
WORKDIR /app
# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application
COPY . .
# Run application
CMD ["python", "main.py"]
# main.py
import pandas as pd
import numpy as np
from fastapi import FastAPI
import uvicorn
app = FastAPI()
@app.get("/process")
async def process_data():
# Heavy computation that wouldn't work in Workers
df = pd.DataFrame(np.random.randn(10000, 4))
result = df.describe().to_dict()
return result
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8080)
# Build container
wrangler containers build . --tag my-app:latest
# Push to registry
wrangler containers push my-app:latest
# Deploy
wrangler containers deploy my-app
# List containers
wrangler containers list
# Delete
wrangler containers delete my-app
# Multi-stage build for smaller images
FROM python:3.12-slim as builder
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt --target=/app/deps
FROM python:3.12-slim
WORKDIR /app
COPY --from=builder /app/deps /app/deps
COPY . .
ENV PYTHONPATH=/app/deps
CMD ["python", "main.py"]
# Optimize for container cold starts
import asyncio
# Lazy imports for faster startup
def get_pandas():
import pandas as pd
return pd
async def process_large_data():
pd = get_pandas() # Import only when needed
# Process data...
# Container with GPU support
import torch
def run_inference(data):
device = "cuda" if torch.cuda.is_available() else "cpu"
model = load_model().to(device)
with torch.no_grad():
result = model(data.to(device))
return result.cpu().numpy()
Cloudflare Workflows now supports Python for multi-step, long-running applications with automatic retries and state persistence.
from cloudflare.workflows import Workflow, step
class DataPipeline(Workflow):
@step
async def fetch_data(self, url: str) -> dict:
"""Fetch data from external API."""
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.json()
@step
async def process_data(self, data: dict) -> dict:
"""Process the fetched data."""
# Heavy processing - will retry if fails
processed = transform(data)
return processed
@step
async def save_results(self, data: dict) -> str:
"""Save to database."""
result_id = await save_to_db(data)
return result_id
async def run(self, input_url: str) -> str:
data = await self.fetch_data(input_url)
processed = await self.process_data(data)
result_id = await self.save_results(processed)
return f"Saved as {result_id}"
# Do expensive imports at module level
import json
import hashlib
from pydantic import BaseModel
# Pre-compute static data
STATIC_CONFIG = {"version": "1.0", "features": ["a", "b"]}
class Default(WorkerEntrypoint):
async def fetch(self, request):
# Fast path - no initialization needed
return Response(json.dumps(STATIC_CONFIG))
# Workers only support async HTTP
import httpx
class Default(WorkerEntrypoint):
async def fetch(self, request):
async with httpx.AsyncClient() as client:
response = await client.get("https://api.example.com/data")
return Response(response.text)
from workers import WorkerEntrypoint, Response
class Default(WorkerEntrypoint):
async def fetch(self, request):
try:
result = await self.process_request(request)
return Response(json.dumps(result), headers={
"Content-Type": "application/json"
})
except ValueError as e:
return Response(json.dumps({"error": str(e)}), status=400)
except Exception as e:
# Log error (sent to Workers Logs)
print(f"Error: {e}")
return Response(json.dumps({"error": "Internal error"}), status=500)
# src/handlers.py - Pure Python logic
async def handle_create_item(data: dict) -> dict:
# Testable without Worker runtime
validated = validate_item(data)
return {"id": generate_id(), **validated}
# src/entry.py - Worker entrypoint
from workers import WorkerEntrypoint, Response
from handlers import handle_create_item
class Default(WorkerEntrypoint):
async def fetch(self, request):
if request.method == "POST":
data = await request.json()
result = await handle_create_item(data)
return Response(json.dumps(result))
# Start local development server
pywrangler dev
# Test locally
curl http://localhost:8787/
# Deploy to Cloudflare
pywrangler deploy
Activates when the user asks about AI prompts, needs prompt templates, wants to search for prompts, or mentions prompts.chat. Use for discovering, retrieving, and improving prompts.
Search, retrieve, and install Agent Skills from the prompts.chat registry using MCP tools. Use when the user asks to find skills, browse skill catalogs, install a skill for Claude, or extend Claude's capabilities with reusable AI agent components.
Creating algorithmic art using p5.js with seeded randomness and interactive parameter exploration. Use this when users request creating art using code, generative art, algorithmic art, flow fields, or particle systems. Create original algorithmic art rather than copying existing artists' work to avoid copyright violations.