From python-development
Master Python asyncio, concurrent programming, and async/await patterns for high-performance applications. TRIGGER WHEN: building async APIs, concurrent systems, or I/O-bound applications requiring non-blocking operations. DO NOT TRIGGER WHEN: the task is outside the specific scope of this component.
npx claudepluginhub acaprino/alfio-claude-plugins --plugin python-developmentThis skill uses the workspace's default tool permissions.
Implement asynchronous Python applications using asyncio, concurrent programming patterns, and async/await for building high-performance, non-blocking systems.
Searches, retrieves, and installs Agent Skills from prompts.chat registry using MCP tools like search_skills and get_skill. Activates for finding skills, browsing catalogs, or extending Claude.
Searches prompts.chat for AI prompt templates by keyword or category, retrieves by ID with variable handling, and improves prompts via AI. Use for discovering or enhancing prompts.
Guides MCP server integration in Claude Code plugins via .mcp.json or plugin.json configs for stdio, SSE, HTTP types, enabling external services as tools.
Implement asynchronous Python applications using asyncio, concurrent programming patterns, and async/await for building high-performance, non-blocking systems.
Functions defined with async def that can be paused and resumed.
async def my_coroutine():
result = await some_async_operation()
return result
Scheduled coroutines that run concurrently on the event loop.
Low-level objects representing eventual results of async operations.
Resources that support async with for proper cleanup.
Objects that support async for for iterating over async data sources.
import asyncio
async def main():
print("Hello")
await asyncio.sleep(1)
print("World")
asyncio.run(main())
import asyncio
async def fetch_data(url: str) -> dict:
await asyncio.sleep(1) # Simulate I/O
return {"url": url, "data": "result"}
async def main():
result = await fetch_data("https://api.example.com")
print(result)
asyncio.run(main())
import asyncio
from typing import List
async def fetch_user(user_id: int) -> dict:
await asyncio.sleep(0.5)
return {"id": user_id, "name": f"User {user_id}"}
async def fetch_all_users(user_ids: List[int]) -> List[dict]:
tasks = [fetch_user(uid) for uid in user_ids]
return await asyncio.gather(*tasks)
asyncio.run(fetch_all_users([1, 2, 3, 4, 5]))
import asyncio
async def background_task(name: str, delay: int):
await asyncio.sleep(delay)
return f"Result from {name}"
async def main():
task1 = asyncio.create_task(background_task("Task 1", 2))
task2 = asyncio.create_task(background_task("Task 2", 1))
# Do other work while tasks run
await asyncio.sleep(0.5)
result1 = await task1
result2 = await task2
print(f"Results: {result1}, {result2}")
asyncio.run(main())
import asyncio
from typing import Optional
async def safe_operation(item_id: int) -> Optional[dict]:
try:
return await risky_operation(item_id)
except ValueError as e:
print(f"Error: {e}")
return None
async def process_items(item_ids):
tasks = [safe_operation(iid) for iid in item_ids]
results = await asyncio.gather(*tasks, return_exceptions=True)
successful = [r for r in results if r is not None and not isinstance(r, Exception)]
failed = [r for r in results if isinstance(r, Exception)]
return successful
import asyncio
async def with_timeout():
try:
result = await asyncio.wait_for(slow_operation(5), timeout=2.0)
except asyncio.TimeoutError:
print("Operation timed out")
import asyncio
from typing import List
async def api_call(url: str, semaphore: asyncio.Semaphore) -> dict:
async with semaphore:
await asyncio.sleep(0.5) # Simulate API call
return {"url": url, "status": 200}
async def rate_limited_requests(urls: List[str], max_concurrent: int = 5):
semaphore = asyncio.Semaphore(max_concurrent)
tasks = [api_call(url, semaphore) for url in urls]
return await asyncio.gather(*tasks)
# Wrong - returns coroutine object
result = async_function()
# Correct
result = await async_function()
# Wrong - blocks event loop
import time
async def bad():
time.sleep(1) # Blocks!
# Correct
async def good():
await asyncio.sleep(1) # Non-blocking
async def cancelable_task():
try:
while True:
await asyncio.sleep(1)
except asyncio.CancelledError:
# Perform cleanup
raise # Re-raise to propagate
# Wrong
def sync_function():
result = await async_function() # SyntaxError!
# Correct
def sync_function():
result = asyncio.run(async_function())
import asyncio
import pytest
@pytest.mark.asyncio
async def test_async_function():
result = await fetch_data("https://api.example.com")
assert result is not None
@pytest.mark.asyncio
async def test_with_timeout():
with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(slow_operation(5), timeout=1.0)
references/async-patterns.md - async context managers, async iterators/generators, producer-consumer pattern, async locks and synchronization, web scraping with aiohttp, async database operations, WebSocket server implementation, connection pools, batch operations, running blocking operations in executors