From azure-sdk-python
Uses Azure Cosmos DB Python SDK for NoSQL: setup clients, manage databases/containers, document CRUD with partition keys, parameterized queries.
npx claudepluginhub microsoft/skills --plugin azure-sdk-pythonThis skill uses the workspace's default tool permissions.
Client library for Azure Cosmos DB NoSQL API — globally distributed, multi-model database.
Implements Azure Cosmos DB NoSQL API in Python for document CRUD, parameterized queries, container and database management, partition keys, and global distribution.
Perform document CRUD, queries, bulk operations, and container management in Azure Cosmos DB using @azure/cosmos TypeScript/JavaScript SDK.
Guides Azure Cosmos DB partition key design, consistency levels, change feed, and TypeScript SDK patterns for scalable NoSQL data modeling and queries.
Share bugs, ideas, or general feedback.
Client library for Azure Cosmos DB NoSQL API — globally distributed, multi-model database.
pip install azure-cosmos azure-identity
COSMOS_ENDPOINT=https://<account>.documents.azure.com:443/
COSMOS_DATABASE=mydb
COSMOS_CONTAINER=mycontainer
from azure.identity import DefaultAzureCredential
from azure.cosmos import CosmosClient
credential = DefaultAzureCredential()
endpoint = "https://<account>.documents.azure.com:443/"
client = CosmosClient(url=endpoint, credential=credential)
| Client | Purpose | Get From |
|---|---|---|
CosmosClient | Account-level operations | Direct instantiation |
DatabaseProxy | Database operations | client.get_database_client() |
ContainerProxy | Container/item operations | database.get_container_client() |
# Get or create database
database = client.create_database_if_not_exists(id="mydb")
# Get or create container with partition key
container = database.create_container_if_not_exists(
id="mycontainer",
partition_key=PartitionKey(path="/category")
)
# Get existing
database = client.get_database_client("mydb")
container = database.get_container_client("mycontainer")
item = {
"id": "item-001", # Required: unique within partition
"category": "electronics", # Partition key value
"name": "Laptop",
"price": 999.99,
"tags": ["computer", "portable"]
}
created = container.create_item(body=item)
print(f"Created: {created['id']}")
# Read requires id AND partition key
item = container.read_item(
item="item-001",
partition_key="electronics"
)
print(f"Name: {item['name']}")
item = container.read_item(item="item-001", partition_key="electronics")
item["price"] = 899.99
item["on_sale"] = True
updated = container.replace_item(item=item["id"], body=item)
# Create if not exists, replace if exists
item = {
"id": "item-002",
"category": "electronics",
"name": "Tablet",
"price": 499.99
}
result = container.upsert_item(body=item)
container.delete_item(
item="item-001",
partition_key="electronics"
)
# Query within a partition (efficient)
query = "SELECT * FROM c WHERE c.price < @max_price"
items = container.query_items(
query=query,
parameters=[{"name": "@max_price", "value": 500}],
partition_key="electronics"
)
for item in items:
print(f"{item['name']}: ${item['price']}")
# Cross-partition (more expensive, use sparingly)
query = "SELECT * FROM c WHERE c.price < @max_price"
items = container.query_items(
query=query,
parameters=[{"name": "@max_price", "value": 500}],
enable_cross_partition_query=True
)
for item in items:
print(item)
query = "SELECT c.id, c.name, c.price FROM c WHERE c.category = @category"
items = container.query_items(
query=query,
parameters=[{"name": "@category", "value": "electronics"}],
partition_key="electronics"
)
# Read all in a partition
items = container.read_all_items() # Cross-partition
# Or with partition key
items = container.query_items(
query="SELECT * FROM c",
partition_key="electronics"
)
Critical: Always include partition key for efficient operations.
from azure.cosmos import PartitionKey
# Single partition key
container = database.create_container_if_not_exists(
id="orders",
partition_key=PartitionKey(path="/customer_id")
)
# Hierarchical partition key (preview)
container = database.create_container_if_not_exists(
id="events",
partition_key=PartitionKey(path=["/tenant_id", "/user_id"])
)
# Create container with provisioned throughput
container = database.create_container_if_not_exists(
id="mycontainer",
partition_key=PartitionKey(path="/pk"),
offer_throughput=400 # RU/s
)
# Read current throughput
offer = container.read_offer()
print(f"Throughput: {offer.offer_throughput} RU/s")
# Update throughput
container.replace_throughput(throughput=1000)
from azure.cosmos.aio import CosmosClient
from azure.identity.aio import DefaultAzureCredential
async def cosmos_operations():
credential = DefaultAzureCredential()
async with CosmosClient(endpoint, credential=credential) as client:
database = client.get_database_client("mydb")
container = database.get_container_client("mycontainer")
# Create
await container.create_item(body={"id": "1", "pk": "test"})
# Read
item = await container.read_item(item="1", partition_key="test")
# Query
async for item in container.query_items(
query="SELECT * FROM c",
partition_key="test"
):
print(item)
import asyncio
asyncio.run(cosmos_operations())
from azure.cosmos.exceptions import CosmosHttpResponseError
try:
item = container.read_item(item="nonexistent", partition_key="pk")
except CosmosHttpResponseError as e:
if e.status_code == 404:
print("Item not found")
elif e.status_code == 429:
print(f"Rate limited. Retry after: {e.headers.get('x-ms-retry-after-ms')}ms")
else:
raise
upsert_item for idempotent writesread_item instead of query for single document retrieval| File | Contents |
|---|---|
| references/partitioning.md | Partition key strategies, hierarchical keys, hot partition detection and mitigation |
| references/query-patterns.md | Query optimization, aggregations, pagination, transactions, change feed |
| scripts/setup_cosmos_container.py | CLI tool for creating containers with partitioning, throughput, and indexing |