Help us improve
Share bugs, ideas, or general feedback.
From faos-data-engineer
<!-- AUTO-GENERATED by export-plugins.py — DO NOT EDIT -->
npx claudepluginhub frank-luongt/faos-skills-marketplace --plugin faos-data-engineerHow this skill is triggered — by the user, by Claude, or both
Slash command
/faos-data-engineer:azure-cosmos-pyThe summary Claude sees in its skill listing — used to decide when to auto-load this skill
<!-- AUTO-GENERATED by export-plugins.py — DO NOT EDIT -->
Uses Azure Cosmos DB Python SDK for NoSQL: setup clients, manage databases/containers, document CRUD with partition keys, parameterized queries.
Implements Azure Cosmos DB NoSQL API in Python for document CRUD, parameterized queries, container and database management, partition keys, and global distribution.
Perform document CRUD, queries, bulk operations, and container management in Azure Cosmos DB using @azure/cosmos TypeScript/JavaScript SDK.
Share bugs, ideas, or general feedback.
Client library for Azure Cosmos DB NoSQL API — globally distributed, multi-model database.
pip install azure-cosmos azure-identity
COSMOS_ENDPOINT=https://<account>.documents.azure.com:443/
COSMOS_DATABASE=mydb
COSMOS_CONTAINER=mycontainer
from azure.identity import DefaultAzureCredential
from azure.cosmos import CosmosClient
credential = DefaultAzureCredential()
endpoint = "https://<account>.documents.azure.com:443/"
client = CosmosClient(url=endpoint, credential=credential)
| Client | Purpose | Get From |
|---|---|---|
CosmosClient | Account-level operations | Direct instantiation |
DatabaseProxy | Database operations | client.get_database_client() |
ContainerProxy | Container/item operations | database.get_container_client() |
# Get or create database
database = client.create_database_if_not_exists(id="mydb")
# Get or create container with partition key
container = database.create_container_if_not_exists(
id="mycontainer",
partition_key=PartitionKey(path="/category")
)
# Get existing
database = client.get_database_client("mydb")
container = database.get_container_client("mycontainer")
item = {
"id": "item-001", # Required: unique within partition
"category": "electronics", # Partition key value
"name": "Laptop",
"price": 999.99,
"tags": ["computer", "portable"]
}
created = container.create_item(body=item)
print(f"Created: {created['id']}")
# Read requires id AND partition key
item = container.read_item(
item="item-001",
partition_key="electronics"
)
print(f"Name: {item['name']}")
item = container.read_item(item="item-001", partition_key="electronics")
item["price"] = 899.99
item["on_sale"] = True
updated = container.replace_item(item=item["id"], body=item)
# Create if not exists, replace if exists
item = {
"id": "item-002",
"category": "electronics",
"name": "Tablet",
"price": 499.99
}
result = container.upsert_item(body=item)
container.delete_item(
item="item-001",
partition_key="electronics"
)
# Query within a partition (efficient)
query = "SELECT * FROM c WHERE c.price < @max_price"
items = container.query_items(
query=query,
parameters=[{"name": "@max_price", "value": 500}],
partition_key="electronics"
)
for item in items:
print(f"{item['name']}: ${item['price']}")
# Cross-partition (more expensive, use sparingly)
query = "SELECT * FROM c WHERE c.price < @max_price"
items = container.query_items(
query=query,
parameters=[{"name": "@max_price", "value": 500}],
enable_cross_partition_query=True
)
for item in items:
print(item)
query = "SELECT c.id, c.name, c.price FROM c WHERE c.category = @category"
items = container.query_items(
query=query,
parameters=[{"name": "@category", "value": "electronics"}],
partition_key="electronics"
)
# Read all in a partition
items = container.read_all_items() # Cross-partition
# Or with partition key
items = container.query_items(
query="SELECT * FROM c",
partition_key="electronics"
)
Critical: Always include partition key for efficient operations.
from azure.cosmos import PartitionKey
# Single partition key
container = database.create_container_if_not_exists(
id="orders",
partition_key=PartitionKey(path="/customer_id")
)
# Hierarchical partition key (preview)
container = database.create_container_if_not_exists(
id="events",
partition_key=PartitionKey(path=["/tenant_id", "/user_id"])
)
# Create container with provisioned throughput
container = database.create_container_if_not_exists(
id="mycontainer",
partition_key=PartitionKey(path="/pk"),
offer_throughput=400 # RU/s
)
# Read current throughput
offer = container.read_offer()
print(f"Throughput: {offer.offer_throughput} RU/s")
# Update throughput
container.replace_throughput(throughput=1000)
from azure.cosmos.aio import CosmosClient
from azure.identity.aio import DefaultAzureCredential
async def cosmos_operations():
credential = DefaultAzureCredential()
async with CosmosClient(endpoint, credential=credential) as client:
database = client.get_database_client("mydb")
container = database.get_container_client("mycontainer")
# Create
await container.create_item(body={"id": "1", "pk": "test"})
# Read
item = await container.read_item(item="1", partition_key="test")
# Query
async for item in container.query_items(
query="SELECT * FROM c",
partition_key="test"
):
print(item)
import asyncio
asyncio.run(cosmos_operations())
from azure.cosmos.exceptions import CosmosHttpResponseError
try:
item = container.read_item(item="nonexistent", partition_key="pk")
except CosmosHttpResponseError as e:
if e.status_code == 404:
print("Item not found")
elif e.status_code == 429:
print(f"Rate limited. Retry after: {e.headers.get('x-ms-retry-after-ms')}ms")
else:
raise
upsert_item for idempotent writesread_item instead of query for single document retrieval| File | Contents |
|---|---|
| references/partitioning.md | Partition key strategies, hierarchical keys, hot partition detection and mitigation |
| references/query-patterns.md | Query optimization, aggregations, pagination, transactions, change feed |
| scripts/setup_cosmos_container.py | CLI tool for creating containers with partitioning, throughput, and indexing |