From antigravity-awesome-skills
Azure Tables SDK for Python (Storage and Cosmos DB). Use for NoSQL key-value storage, entity CRUD, and batch operations.
npx claudepluginhub mit-network/antigravity-awesome-skillsThis skill uses the workspace's default tool permissions.
NoSQL key-value store for structured data (Azure Storage Tables or Cosmos DB Table API).
Mandates invoking relevant skills via tools before any response in coding sessions. Covers access, priorities, and adaptations for Claude Code, Copilot CLI, Gemini CLI.
Share bugs, ideas, or general feedback.
NoSQL key-value store for structured data (Azure Storage Tables or Cosmos DB Table API).
pip install azure-data-tables azure-identity
# Azure Storage Tables
AZURE_STORAGE_ACCOUNT_URL=https://<account>.table.core.windows.net
# Cosmos DB Table API
COSMOS_TABLE_ENDPOINT=https://<account>.table.cosmos.azure.com
from azure.identity import DefaultAzureCredential
from azure.data.tables import TableServiceClient, TableClient
credential = DefaultAzureCredential()
endpoint = "https://<account>.table.core.windows.net"
# Service client (manage tables)
service_client = TableServiceClient(endpoint=endpoint, credential=credential)
# Table client (work with entities)
table_client = TableClient(endpoint=endpoint, table_name="mytable", credential=credential)
| Client | Purpose |
|---|---|
TableServiceClient | Create/delete tables, list tables |
TableClient | Entity CRUD, queries |
# Create table
service_client.create_table("mytable")
# Create if not exists
service_client.create_table_if_not_exists("mytable")
# Delete table
service_client.delete_table("mytable")
# List tables
for table in service_client.list_tables():
print(table.name)
# Get table client
table_client = service_client.get_table_client("mytable")
Important: Every entity requires PartitionKey and RowKey (together form unique ID).
entity = {
"PartitionKey": "sales",
"RowKey": "order-001",
"product": "Widget",
"quantity": 5,
"price": 9.99,
"shipped": False
}
# Create (fails if exists)
table_client.create_entity(entity=entity)
# Upsert (create or replace)
table_client.upsert_entity(entity=entity)
# Get by key (fastest)
entity = table_client.get_entity(
partition_key="sales",
row_key="order-001"
)
print(f"Product: {entity['product']}")
# Replace entire entity
entity["quantity"] = 10
table_client.update_entity(entity=entity, mode="replace")
# Merge (update specific fields only)
update = {
"PartitionKey": "sales",
"RowKey": "order-001",
"shipped": True
}
table_client.update_entity(entity=update, mode="merge")
table_client.delete_entity(
partition_key="sales",
row_key="order-001"
)
# Query by partition (efficient)
entities = table_client.query_entities(
query_filter="PartitionKey eq 'sales'"
)
for entity in entities:
print(entity)
# Filter by properties
entities = table_client.query_entities(
query_filter="PartitionKey eq 'sales' and quantity gt 3"
)
# With parameters (safer)
entities = table_client.query_entities(
query_filter="PartitionKey eq @pk and price lt @max_price",
parameters={"pk": "sales", "max_price": 50.0}
)
entities = table_client.query_entities(
query_filter="PartitionKey eq 'sales'",
select=["RowKey", "product", "price"]
)
# List all (cross-partition - use sparingly)
for entity in table_client.list_entities():
print(entity)
from azure.data.tables import TableTransactionError
# Batch operations (same partition only!)
operations = [
("create", {"PartitionKey": "batch", "RowKey": "1", "data": "first"}),
("create", {"PartitionKey": "batch", "RowKey": "2", "data": "second"}),
("upsert", {"PartitionKey": "batch", "RowKey": "3", "data": "third"}),
]
try:
table_client.submit_transaction(operations)
except TableTransactionError as e:
print(f"Transaction failed: {e}")
from azure.data.tables.aio import TableServiceClient, TableClient
from azure.identity.aio import DefaultAzureCredential
async def table_operations():
credential = DefaultAzureCredential()
async with TableClient(
endpoint="https://<account>.table.core.windows.net",
table_name="mytable",
credential=credential
) as client:
# Create
await client.create_entity(entity={
"PartitionKey": "async",
"RowKey": "1",
"data": "test"
})
# Query
async for entity in client.query_entities("PartitionKey eq 'async'"):
print(entity)
import asyncio
asyncio.run(table_operations())
| Python Type | Table Storage Type |
|---|---|
str | String |
int | Int64 |
float | Double |
bool | Boolean |
datetime | DateTime |
bytes | Binary |
UUID | Guid |
upsert_entity for idempotent writesThis skill is applicable to execute the workflow or actions described in the overview.