From faos-data-engineer
<!-- AUTO-GENERATED by export-plugins.py — DO NOT EDIT -->
npx claudepluginhub frank-luongt/faos-skills-marketplace --plugin faos-data-engineerThis skill uses the workspace's default tool permissions.
Provides UI/UX resources: 50+ styles, color palettes, font pairings, guidelines, charts for web/mobile across React, Next.js, Vue, Svelte, Tailwind, React Native, Flutter. Aids planning, building, reviewing interfaces.
Fetches up-to-date documentation from Context7 for libraries and frameworks like React, Next.js, Prisma. Use for setup questions, API references, and code examples.
Guides Payload CMS config (payload.config.ts), collections, fields, hooks, access control, APIs. Debugs validation errors, security, relationships, queries, transactions, hook behavior.
Client library for Azure Cosmos DB NoSQL API — globally distributed, multi-model database.
pip install azure-cosmos azure-identity
COSMOS_ENDPOINT=https://<account>.documents.azure.com:443/
COSMOS_DATABASE=mydb
COSMOS_CONTAINER=mycontainer
from azure.identity import DefaultAzureCredential
from azure.cosmos import CosmosClient
credential = DefaultAzureCredential()
endpoint = "https://<account>.documents.azure.com:443/"
client = CosmosClient(url=endpoint, credential=credential)
| Client | Purpose | Get From |
|---|---|---|
CosmosClient | Account-level operations | Direct instantiation |
DatabaseProxy | Database operations | client.get_database_client() |
ContainerProxy | Container/item operations | database.get_container_client() |
# Get or create database
database = client.create_database_if_not_exists(id="mydb")
# Get or create container with partition key
container = database.create_container_if_not_exists(
id="mycontainer",
partition_key=PartitionKey(path="/category")
)
# Get existing
database = client.get_database_client("mydb")
container = database.get_container_client("mycontainer")
item = {
"id": "item-001", # Required: unique within partition
"category": "electronics", # Partition key value
"name": "Laptop",
"price": 999.99,
"tags": ["computer", "portable"]
}
created = container.create_item(body=item)
print(f"Created: {created['id']}")
# Read requires id AND partition key
item = container.read_item(
item="item-001",
partition_key="electronics"
)
print(f"Name: {item['name']}")
item = container.read_item(item="item-001", partition_key="electronics")
item["price"] = 899.99
item["on_sale"] = True
updated = container.replace_item(item=item["id"], body=item)
# Create if not exists, replace if exists
item = {
"id": "item-002",
"category": "electronics",
"name": "Tablet",
"price": 499.99
}
result = container.upsert_item(body=item)
container.delete_item(
item="item-001",
partition_key="electronics"
)
# Query within a partition (efficient)
query = "SELECT * FROM c WHERE c.price < @max_price"
items = container.query_items(
query=query,
parameters=[{"name": "@max_price", "value": 500}],
partition_key="electronics"
)
for item in items:
print(f"{item['name']}: ${item['price']}")
# Cross-partition (more expensive, use sparingly)
query = "SELECT * FROM c WHERE c.price < @max_price"
items = container.query_items(
query=query,
parameters=[{"name": "@max_price", "value": 500}],
enable_cross_partition_query=True
)
for item in items:
print(item)
query = "SELECT c.id, c.name, c.price FROM c WHERE c.category = @category"
items = container.query_items(
query=query,
parameters=[{"name": "@category", "value": "electronics"}],
partition_key="electronics"
)
# Read all in a partition
items = container.read_all_items() # Cross-partition
# Or with partition key
items = container.query_items(
query="SELECT * FROM c",
partition_key="electronics"
)
Critical: Always include partition key for efficient operations.
from azure.cosmos import PartitionKey
# Single partition key
container = database.create_container_if_not_exists(
id="orders",
partition_key=PartitionKey(path="/customer_id")
)
# Hierarchical partition key (preview)
container = database.create_container_if_not_exists(
id="events",
partition_key=PartitionKey(path=["/tenant_id", "/user_id"])
)
# Create container with provisioned throughput
container = database.create_container_if_not_exists(
id="mycontainer",
partition_key=PartitionKey(path="/pk"),
offer_throughput=400 # RU/s
)
# Read current throughput
offer = container.read_offer()
print(f"Throughput: {offer.offer_throughput} RU/s")
# Update throughput
container.replace_throughput(throughput=1000)
from azure.cosmos.aio import CosmosClient
from azure.identity.aio import DefaultAzureCredential
async def cosmos_operations():
credential = DefaultAzureCredential()
async with CosmosClient(endpoint, credential=credential) as client:
database = client.get_database_client("mydb")
container = database.get_container_client("mycontainer")
# Create
await container.create_item(body={"id": "1", "pk": "test"})
# Read
item = await container.read_item(item="1", partition_key="test")
# Query
async for item in container.query_items(
query="SELECT * FROM c",
partition_key="test"
):
print(item)
import asyncio
asyncio.run(cosmos_operations())
from azure.cosmos.exceptions import CosmosHttpResponseError
try:
item = container.read_item(item="nonexistent", partition_key="pk")
except CosmosHttpResponseError as e:
if e.status_code == 404:
print("Item not found")
elif e.status_code == 429:
print(f"Rate limited. Retry after: {e.headers.get('x-ms-retry-after-ms')}ms")
else:
raise
upsert_item for idempotent writesread_item instead of query for single document retrieval| File | Contents |
|---|---|
| references/partitioning.md | Partition key strategies, hierarchical keys, hot partition detection and mitigation |
| references/query-patterns.md | Query optimization, aggregations, pagination, transactions, change feed |
| scripts/setup_cosmos_container.py | CLI tool for creating containers with partitioning, throughput, and indexing |