Build Python APIs on Cloudflare Workers using pywrangler CLI and WorkerEntrypoint class pattern. Includes Python Workflows for multi-step DAG automation. Prevents 8 documented errors. Use when: building Python serverless APIs, migrating Python to edge, or troubleshooting async errors, package compatibility, handler pattern mistakes.
/plugin marketplace add jezweb/claude-skills/plugin install jezweb-tooling-skills@jezweb/claude-skillsThis skill inherits all available tools. When active, it can use any tool Claude has access to.
README.mdreferences/common-issues.mdtemplates/pyproject.tomltemplates/src/entry.pytemplates/workflow-example.pytemplates/wrangler.jsoncStatus: Beta (requires python_workers compatibility flag)
Runtime: Pyodide (Python 3.12+ compiled to WebAssembly)
Package Versions: workers-py@1.7.0, workers-runtime-sdk@0.3.1, wrangler@4.58.0
Last Verified: 2026-01-09
Ensure you have installed:
# Create project directory
mkdir my-python-worker && cd my-python-worker
# Initialize Python project
uv init
# Install pywrangler
uv tool install workers-py
# Initialize Worker configuration
uv run pywrangler init
Create src/entry.py:
from workers import WorkerEntrypoint, Response
class Default(WorkerEntrypoint):
async def fetch(self, request):
return Response("Hello from Python Worker!")
{
"name": "my-python-worker",
"main": "src/entry.py",
"compatibility_date": "2025-12-01",
"compatibility_flags": ["python_workers"]
}
uv run pywrangler dev
# Visit http://localhost:8787
uv run pywrangler deploy
As of August 2025, Python Workers use a class-based pattern (not global handlers):
from workers import WorkerEntrypoint, Response
class Default(WorkerEntrypoint):
async def fetch(self, request):
# Access bindings via self.env
value = await self.env.MY_KV.get("key")
# Parse request
url = request.url
method = request.method
return Response(f"Method: {method}, URL: {url}")
All Cloudflare bindings are accessed via self.env:
class Default(WorkerEntrypoint):
async def fetch(self, request):
# D1 Database
result = await self.env.DB.prepare("SELECT * FROM users").all()
# KV Storage
value = await self.env.MY_KV.get("key")
await self.env.MY_KV.put("key", "value")
# R2 Object Storage
obj = await self.env.MY_BUCKET.get("file.txt")
# Workers AI
response = await self.env.AI.run("@cf/meta/llama-2-7b-chat-int8", {
"prompt": "Hello!"
})
return Response("OK")
Supported Bindings:
See Cloudflare Bindings Documentation for details.
from workers import WorkerEntrypoint, Response
import json
class Default(WorkerEntrypoint):
async def fetch(self, request):
# Parse JSON body
if request.method == "POST":
body = await request.json()
return Response(
json.dumps({"received": body}),
headers={"Content-Type": "application/json"}
)
# Query parameters
url = URL(request.url)
name = url.searchParams.get("name", "World")
return Response(f"Hello, {name}!")
from workers import handler
@handler
async def on_scheduled(event, env, ctx):
# Run on cron schedule
print(f"Cron triggered at {event.scheduledTime}")
# Do work...
await env.MY_KV.put("last_run", str(event.scheduledTime))
Configure in wrangler.jsonc:
{
"triggers": {
"crons": ["*/5 * * * *"] // Every 5 minutes
}
}
Python Workflows enable durable, multi-step automation with automatic retries and state persistence.
from workers import WorkflowEntrypoint, WorkerEntrypoint, Response
class MyWorkflow(WorkflowEntrypoint):
async def run(self, event, step):
# Step 1
@step.do("fetch data")
async def fetch_data():
response = await fetch("https://api.example.com/data")
return await response.json()
data = await fetch_data()
# Step 2: Sleep
await step.sleep("wait", "10 seconds")
# Step 3: Process
@step.do("process data")
async def process_data():
return {"processed": True, "count": len(data)}
result = await process_data()
return result
class Default(WorkerEntrypoint):
async def fetch(self, request):
# Create workflow instance
instance = await self.env.MY_WORKFLOW.create()
return Response(f"Workflow started: {instance.id}")
Define step dependencies for parallel execution:
class MyWorkflow(WorkflowEntrypoint):
async def run(self, event, step):
@step.do("step_a")
async def step_a():
return "A done"
@step.do("step_b")
async def step_b():
return "B done"
# step_c waits for both step_a and step_b
@step.do("step_c", depends=[step_a, step_b], concurrent=True)
async def step_c(result_a, result_b):
return f"C received: {result_a}, {result_b}"
return await step_c()
{
"compatibility_flags": ["python_workers", "python_workflows"],
"compatibility_date": "2025-12-01",
"workflows": [
{
"name": "my-workflow",
"binding": "MY_WORKFLOW",
"class_name": "MyWorkflow"
}
]
}
[project]
name = "my-python-worker"
version = "0.1.0"
requires-python = ">=3.12"
dependencies = [
"beautifulsoup4",
"httpx"
]
[dependency-groups]
dev = [
"workers-py",
"workers-runtime-sdk"
]
Python Workers support:
Only async HTTP libraries work:
# ✅ WORKS - httpx (async)
import httpx
async with httpx.AsyncClient() as client:
response = await client.get("https://api.example.com")
# ✅ WORKS - aiohttp
import aiohttp
async with aiohttp.ClientSession() as session:
async with session.get("https://api.example.com") as response:
data = await response.json()
# ❌ DOES NOT WORK - requests (sync)
import requests # Will fail!
Request support for new packages at: https://github.com/cloudflare/workerd/discussions/categories/python-packages
Access JavaScript APIs from Python via Pyodide's FFI:
from js import fetch, console, Response as JSResponse
class Default(WorkerEntrypoint):
async def fetch(self, request):
# Use JavaScript fetch
response = await fetch("https://api.example.com")
data = await response.json()
# Console logging
console.log("Fetched data:", data)
# Return JavaScript Response
return JSResponse.new("Hello!")
from js import Object
from pyodide.ffi import to_js
# Convert Python dict to JavaScript object
python_dict = {"name": "test", "count": 42}
js_object = to_js(python_dict, dict_converter=Object.fromEntries)
# Use in Response
return Response(to_js({"status": "ok"}))
This skill prevents 8 documented issues:
Error: TypeError: on_fetch is not defined
Why: Handler pattern changed in August 2025.
# ❌ OLD (deprecated)
@handler
async def on_fetch(request):
return Response("Hello")
# ✅ NEW (current)
class Default(WorkerEntrypoint):
async def fetch(self, request):
return Response("Hello")
Error: RuntimeError: cannot use blocking call in async context
Why: Python Workers run async-only. Sync libraries block the event loop.
# ❌ FAILS
import requests
response = requests.get("https://api.example.com")
# ✅ WORKS
import httpx
async with httpx.AsyncClient() as client:
response = await client.get("https://api.example.com")
Error: ModuleNotFoundError: No module named 'numpy' (or similar)
Why: Only pure Python packages work. Native C extensions are not supported.
Solution: Use Pyodide-compatible alternatives or check Pyodide packages.
Error: Error: Python Workers require the python_workers compatibility flag
Fix: Add to wrangler.jsonc:
{
"compatibility_flags": ["python_workers"]
}
For Workflows, also add "python_workflows".
Error: Workflow state not persisted correctly
Why: All I/O must happen inside @step.do for durability.
# ❌ BAD - fetch outside step
response = await fetch("https://api.example.com")
@step.do("use data")
async def use_data():
return await response.json() # response may be stale on retry
# ✅ GOOD - fetch inside step
@step.do("fetch and use")
async def fetch_and_use():
response = await fetch("https://api.example.com")
return await response.json()
Error: TypeError: Object of type X is not JSON serializable
Why: Workflow step return values must be JSON-serializable.
Fix: Convert complex objects before returning:
@step.do("process")
async def process():
# Convert datetime to string
return {"timestamp": datetime.now().isoformat()}
Note: Python Workers have higher cold starts than JavaScript (~1s vs ~50ms).
Mitigation:
Error: Failed to install package X
Causes:
Fix: Check package compatibility, use alternatives, or request support.
WorkerEntrypoint class patternpython_workers compatibility flagself.env for all bindings@handler decorator for fetchFastAPI can work with Python Workers but with limitations:
from fastapi import FastAPI
from workers import WorkerEntrypoint
app = FastAPI()
@app.get("/")
async def root():
return {"message": "Hello from FastAPI"}
class Default(WorkerEntrypoint):
async def fetch(self, request):
# Route through FastAPI
return await app(request)
Limitations:
See Cloudflare FastAPI example for details.
{
"workers-py": "1.7.0",
"workers-runtime-sdk": "0.3.1",
"wrangler": "4.58.0"
}
Note: Always pin versions for reproducible builds. Check PyPI workers-py for latest releases.
Compatibility Date Guidance:
2025-12-01 for new projects (latest features including pywrangler improvements)2025-08-01 only if you need to match older production WorkersThis skill should be used when the user asks to "create a slash command", "add a command", "write a custom command", "define command arguments", "use command frontmatter", "organize commands", "create command with file references", "interactive command", "use AskUserQuestion in command", or needs guidance on slash command structure, YAML frontmatter fields, dynamic arguments, bash execution in commands, user interaction patterns, or command development best practices for Claude Code.
This skill should be used when the user asks to "create an agent", "add an agent", "write a subagent", "agent frontmatter", "when to use description", "agent examples", "agent tools", "agent colors", "autonomous agent", or needs guidance on agent structure, system prompts, triggering conditions, or agent development best practices for Claude Code plugins.
This skill should be used when the user asks to "create a hook", "add a PreToolUse/PostToolUse/Stop hook", "validate tool use", "implement prompt-based hooks", "use ${CLAUDE_PLUGIN_ROOT}", "set up event-driven automation", "block dangerous commands", or mentions hook events (PreToolUse, PostToolUse, Stop, SubagentStop, SessionStart, SessionEnd, UserPromptSubmit, PreCompact, Notification). Provides comprehensive guidance for creating and implementing Claude Code plugin hooks with focus on advanced prompt-based hooks API.