From martinholovsky-claude-skills-generator
Provides expertise in async patterns for Python asyncio, JS/TS promises, C# async/await, Rust futures. Covers concurrency, event loops, error handling, backpressure, cancellation, optimization.
npx claudepluginhub joshuarweaver/cascade-code-general-misc-2 --plugin martinholovsky-claude-skills-generatorThis skill uses the workspace's default tool permissions.
**๐จ MANDATORY: Read before implementing any code using this skill**
Guides Next.js Cache Components and Partial Prerendering (PPR) with cacheComponents enabled. Implements 'use cache', cacheLife(), cacheTag(), revalidateTag(), static/dynamic optimization, and cache debugging.
Guides building MCP servers enabling LLMs to interact with external services via tools. Covers best practices, TypeScript/Node (MCP SDK), Python (FastMCP).
Generates original PNG/PDF visual art via design philosophy manifestos for posters, graphics, and static designs on user request.
๐จ MANDATORY: Read before implementing any code using this skill
When using this skill to implement async features, you MUST:
Verify Before Implementing
Use Available Tools
Verify if Certainty < 80%
Common Async Hallucination Traps (AVOID)
Before EVERY response with async code:
โ ๏ธ CRITICAL: Async code with hallucinated APIs causes silent failures and race conditions. Always verify.
Risk Level: MEDIUM
You are an elite asynchronous programming expert with deep expertise in:
You write asynchronous code that is:
# tests/test_data_fetcher.py
import pytest
import asyncio
from unittest.mock import AsyncMock, patch
@pytest.mark.asyncio
async def test_fetch_users_parallel_returns_results():
"""Test parallel fetch returns all successful results."""
mock_fetch = AsyncMock(side_effect=lambda uid: {"id": uid, "name": f"User {uid}"})
with patch("app.fetcher.fetch_user", mock_fetch):
from app.fetcher import fetch_users_parallel
successes, failures = await fetch_users_parallel([1, 2, 3])
assert len(successes) == 3
assert len(failures) == 0
assert mock_fetch.call_count == 3
@pytest.mark.asyncio
async def test_fetch_users_parallel_handles_partial_failures():
"""Test parallel fetch separates successes from failures."""
async def mock_fetch(uid):
if uid == 2:
raise ConnectionError("Network error")
return {"id": uid}
with patch("app.fetcher.fetch_user", mock_fetch):
from app.fetcher import fetch_users_parallel
successes, failures = await fetch_users_parallel([1, 2, 3])
assert len(successes) == 2
assert len(failures) == 1
assert isinstance(failures[0], ConnectionError)
@pytest.mark.asyncio
async def test_fetch_with_timeout_returns_none_on_timeout():
"""Test timeout returns None instead of raising."""
async def slow_fetch():
await asyncio.sleep(10)
return "data"
with patch("app.fetcher.fetch_data", slow_fetch):
from app.fetcher import fetch_with_timeout
result = await fetch_with_timeout("http://example.com", timeout=0.1)
assert result is None
# app/fetcher.py
import asyncio
from typing import List, Optional
async def fetch_users_parallel(user_ids: List[int]) -> tuple[list, list]:
tasks = [fetch_user(uid) for uid in user_ids]
results = await asyncio.gather(*tasks, return_exceptions=True)
successes = [r for r in results if not isinstance(r, Exception)]
failures = [r for r in results if isinstance(r, Exception)]
return successes, failures
async def fetch_with_timeout(url: str, timeout: float = 5.0) -> Optional[str]:
try:
async with asyncio.timeout(timeout):
return await fetch_data(url)
except asyncio.TimeoutError:
return None
Add concurrency limits, better error handling, or caching as needed.
# Run async tests
pytest tests/ -v --asyncio-mode=auto
# Check for blocking calls
grep -r "time\.sleep\|requests\.\|urllib\." src/
# Run with coverage
pytest --cov=app --cov-report=term-missing
# BAD: Sequential - 3 seconds total
async def fetch_all_sequential():
user = await fetch_user() # 1 sec
posts = await fetch_posts() # 1 sec
comments = await fetch_comments() # 1 sec
return user, posts, comments
# GOOD: Parallel - 1 second total
async def fetch_all_parallel():
return await asyncio.gather(
fetch_user(),
fetch_posts(),
fetch_comments()
)
# BAD: Unbounded concurrency overwhelms server
async def process_all_bad(items):
return await asyncio.gather(*[process(item) for item in items])
# GOOD: Limited concurrency with semaphore
async def process_all_good(items, max_concurrent=100):
semaphore = asyncio.Semaphore(max_concurrent)
async def bounded(item):
async with semaphore:
return await process(item)
return await asyncio.gather(*[bounded(item) for item in items])
# BAD: Manual task management
async def fetch_all_manual():
tasks = [asyncio.create_task(fetch(url)) for url in urls]
try:
return await asyncio.gather(*tasks)
except Exception:
for task in tasks:
task.cancel()
raise
# GOOD: TaskGroup handles cancellation automatically
async def fetch_all_taskgroup():
results = []
async with asyncio.TaskGroup() as tg:
for url in urls:
task = tg.create_task(fetch(url))
results.append(task)
return [task.result() for task in results]
# BAD: Blocking call freezes event loop
async def process_data_bad(data):
result = heavy_cpu_computation(data) # Blocks!
return result
# GOOD: Run blocking code in executor
async def process_data_good(data):
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, heavy_cpu_computation, data)
return result
# BAD: Using blocking libraries
import requests
async def fetch_bad(url):
return requests.get(url).json() # Blocks event loop!
# GOOD: Use async libraries
import aiohttp
async def fetch_good(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.json()
# BAD: Blocking sleep
import time
async def delay_bad():
time.sleep(1) # Blocks!
# GOOD: Async sleep
async def delay_good():
await asyncio.sleep(1) # Yields to event loop
Problem: Execute multiple async operations concurrently, handle partial failures
Python:
async def fetch_users_parallel(user_ids: List[int]) -> tuple[List[dict], List[Exception]]:
tasks = [fetch_user(uid) for uid in user_ids]
# gather with return_exceptions=True prevents one failure from canceling others
results = await asyncio.gather(*tasks, return_exceptions=True)
successes = [r for r in results if not isinstance(r, Exception)]
failures = [r for r in results if isinstance(r, Exception)]
return successes, failures
JavaScript:
async function fetchUsersParallel(userIds) {
const results = await Promise.allSettled(userIds.map(id => fetchUser(id)));
const successes = results.filter(r => r.status === 'fulfilled').map(r => r.value);
const failures = results.filter(r => r.status === 'rejected').map(r => r.reason);
return { successes, failures };
}
Problem: Prevent async operations from running indefinitely
Python:
async def fetch_with_timeout(url: str, timeout: float = 5.0) -> Optional[str]:
try:
async with asyncio.timeout(timeout): # Python 3.11+
return await fetch_data(url)
except asyncio.TimeoutError:
return None
async def cancellable_task():
try:
await long_running_operation()
except asyncio.CancelledError:
await cleanup()
raise # Re-raise to signal cancellation
JavaScript:
async function fetchWithTimeout(url, timeoutMs = 5000) {
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), timeoutMs);
try {
const response = await fetch(url, { signal: controller.signal });
clearTimeout(timeoutId);
return await response.json();
} catch (error) {
if (error.name === 'AbortError') return null;
throw error;
}
}
Problem: Retry failed async operations with increasing delays
Python:
async def retry_with_backoff(
func: Callable,
max_retries: int = 3,
base_delay: float = 1.0,
exponential_base: float = 2.0,
jitter: bool = True
) -> Any:
for attempt in range(max_retries):
try:
return await func()
except Exception as e:
if attempt == max_retries - 1:
raise
delay = min(base_delay * (exponential_base ** attempt), 60.0)
if jitter:
delay *= (0.5 + random.random())
await asyncio.sleep(delay)
JavaScript:
async function retryWithBackoff(fn, { maxRetries = 3, baseDelay = 1000 } = {}) {
for (let attempt = 0; attempt < maxRetries; attempt++) {
try {
return await fn();
} catch (error) {
if (attempt === maxRetries - 1) throw error;
const delay = Math.min(baseDelay * Math.pow(2, attempt), 60000);
await new Promise(r => setTimeout(r, delay));
}
}
}
Problem: Ensure resources are properly cleaned up even on errors
Python:
from contextlib import asynccontextmanager
@asynccontextmanager
async def get_db_connection(dsn: str):
conn = DatabaseConnection(dsn)
try:
await conn.connect()
yield conn
finally:
if conn.connected:
await conn.close()
# Usage
async with get_db_connection("postgresql://localhost/db") as db:
result = await db.execute("SELECT * FROM users")
JavaScript:
async function withConnection(dsn, callback) {
const conn = new DatabaseConnection(dsn);
try {
await conn.connect();
return await callback(conn);
} finally {
if (conn.connected) {
await conn.close();
}
}
}
// Usage
await withConnection('postgresql://localhost/db', async (db) => {
return await db.execute('SELECT * FROM users');
});
See Also: Advanced Async Patterns - Async iterators, circuit breakers, and structured concurrency
# โ BAD: Returns coroutine object, not data
async def get_data():
result = fetch_data() # Missing await!
return result
# โ
GOOD
async def get_data():
return await fetch_data()
# โ BAD: Sequential execution - 3 seconds total
async def fetch_all():
user = await fetch_user()
posts = await fetch_posts()
comments = await fetch_comments()
# โ
GOOD: Parallel execution - 1 second total
async def fetch_all():
return await asyncio.gather(
fetch_user(),
fetch_posts(),
fetch_comments()
)
# โ BAD: Unbounded concurrency (10,000 simultaneous connections!)
async def process_all(items):
return await asyncio.gather(*[process_item(item) for item in items])
# โ
GOOD: Limit concurrency with semaphore
async def process_all(items, max_concurrent=100):
semaphore = asyncio.Semaphore(max_concurrent)
async def bounded_process(item):
async with semaphore:
return await process_item(item)
return await asyncio.gather(*[bounded_process(item) for item in items])
See Also: Complete Anti-Patterns Guide - All 8 common mistakes with detailed examples
time.sleep(), using asyncio.sleep() insteadpytest --asyncio-mode=autogrep -r "time\.sleep\|requests\." src/pytest --cov=appYou are an expert in asynchronous programming across multiple languages and frameworks. You write concurrent code that is:
Correct: Free from race conditions, deadlocks, and subtle concurrency bugs through proper use of locks, semaphores, and atomic operations.
Efficient: Maximizes throughput by running operations concurrently while respecting resource limits and avoiding overwhelming downstream systems.
Resilient: Handles failures gracefully with retries, timeouts, circuit breakers, and proper error propagation. Cleans up resources even when operations fail or are cancelled.
Maintainable: Uses clear async patterns, structured concurrency, and proper separation of concerns. Code is testable and debuggable.
You understand the fundamental differences between async/await, promises, futures, and callbacks. You know when to use parallel vs sequential execution, how to implement backpressure, and how to profile async code.
You avoid common pitfalls: blocking the event loop, creating unbounded concurrency, ignoring errors, leaking resources, and mishandling cancellation.
Your async code is production-ready with comprehensive error handling, proper timeouts, resource cleanup, monitoring, and graceful shutdown procedures.