npx claudepluginhub itsmostafa/aws-agent-skills --plugin aws-agent-skillsThis skill uses the workspace's default tool permissions.
Amazon DynamoDB is a fully managed NoSQL database service providing fast, predictable performance at any scale. It supports key-value and document data structures.
Conducts multi-round deep research on GitHub repos via API and web searches, generating markdown reports with executive summaries, timelines, metrics, and Mermaid diagrams.
Dynamically discovers and combines enabled skills into cohesive, unexpected delightful experiences like interactive HTML or themed artifacts. Activates on 'surprise me', inspiration, or boredom cues.
Generates images from structured JSON prompts via Python script execution. Supports reference images and aspect ratios for characters, scenes, products, visuals.
Amazon DynamoDB is a fully managed NoSQL database service providing fast, predictable performance at any scale. It supports key-value and document data structures.
| Key Type | Description |
|---|---|
| Partition Key (PK) | Required. Determines data distribution |
| Sort Key (SK) | Optional. Enables range queries within partition |
| Composite Key | PK + SK combination |
| Index Type | Description |
|---|---|
| GSI (Global Secondary Index) | Different PK/SK, separate throughput, eventually consistent |
| LSI (Local Secondary Index) | Same PK, different SK, shares table throughput, strongly consistent option |
| Mode | Use Case |
|---|---|
| On-Demand | Unpredictable traffic, pay-per-request |
| Provisioned | Predictable traffic, lower cost, can use auto-scaling |
AWS CLI:
aws dynamodb create-table \
--table-name Users \
--attribute-definitions \
AttributeName=PK,AttributeType=S \
AttributeName=SK,AttributeType=S \
--key-schema \
AttributeName=PK,KeyType=HASH \
AttributeName=SK,KeyType=RANGE \
--billing-mode PAY_PER_REQUEST
boto3:
import boto3
dynamodb = boto3.resource('dynamodb')
table = dynamodb.create_table(
TableName='Users',
KeySchema=[
{'AttributeName': 'PK', 'KeyType': 'HASH'},
{'AttributeName': 'SK', 'KeyType': 'RANGE'}
],
AttributeDefinitions=[
{'AttributeName': 'PK', 'AttributeType': 'S'},
{'AttributeName': 'SK', 'AttributeType': 'S'}
],
BillingMode='PAY_PER_REQUEST'
)
table.wait_until_exists()
import boto3
from boto3.dynamodb.conditions import Key, Attr
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('Users')
# Put item
table.put_item(
Item={
'PK': 'USER#123',
'SK': 'PROFILE',
'name': 'John Doe',
'email': 'john@example.com',
'created_at': '2024-01-15T10:30:00Z'
}
)
# Get item
response = table.get_item(
Key={'PK': 'USER#123', 'SK': 'PROFILE'}
)
item = response.get('Item')
# Update item
table.update_item(
Key={'PK': 'USER#123', 'SK': 'PROFILE'},
UpdateExpression='SET #name = :name, updated_at = :updated',
ExpressionAttributeNames={'#name': 'name'},
ExpressionAttributeValues={
':name': 'John Smith',
':updated': '2024-01-16T10:30:00Z'
}
)
# Delete item
table.delete_item(
Key={'PK': 'USER#123', 'SK': 'PROFILE'}
)
# Query by partition key
response = table.query(
KeyConditionExpression=Key('PK').eq('USER#123')
)
# Query with sort key condition
response = table.query(
KeyConditionExpression=Key('PK').eq('USER#123') & Key('SK').begins_with('ORDER#')
)
# Query with filter
response = table.query(
KeyConditionExpression=Key('PK').eq('USER#123'),
FilterExpression=Attr('status').eq('active')
)
# Query with projection
response = table.query(
KeyConditionExpression=Key('PK').eq('USER#123'),
ProjectionExpression='PK, SK, #name, email',
ExpressionAttributeNames={'#name': 'name'}
)
# Paginated query
paginator = dynamodb.meta.client.get_paginator('query')
for page in paginator.paginate(
TableName='Users',
KeyConditionExpression='PK = :pk',
ExpressionAttributeValues={':pk': {'S': 'USER#123'}}
):
for item in page['Items']:
print(item)
# Batch write (up to 25 items)
with table.batch_writer() as batch:
for i in range(100):
batch.put_item(Item={
'PK': f'USER#{i}',
'SK': 'PROFILE',
'name': f'User {i}'
})
# Batch get (up to 100 items)
dynamodb = boto3.resource('dynamodb')
response = dynamodb.batch_get_item(
RequestItems={
'Users': {
'Keys': [
{'PK': 'USER#1', 'SK': 'PROFILE'},
{'PK': 'USER#2', 'SK': 'PROFILE'}
]
}
}
)
aws dynamodb update-table \
--table-name Users \
--attribute-definitions AttributeName=email,AttributeType=S \
--global-secondary-index-updates '[
{
"Create": {
"IndexName": "email-index",
"KeySchema": [{"AttributeName": "email", "KeyType": "HASH"}],
"Projection": {"ProjectionType": "ALL"}
}
}
]'
from botocore.exceptions import ClientError
# Only put if item doesn't exist
try:
table.put_item(
Item={'PK': 'USER#123', 'SK': 'PROFILE', 'name': 'John'},
ConditionExpression='attribute_not_exists(PK)'
)
except ClientError as e:
if e.response['Error']['Code'] == 'ConditionalCheckFailedException':
print("Item already exists")
# Optimistic locking with version
table.update_item(
Key={'PK': 'USER#123', 'SK': 'PROFILE'},
UpdateExpression='SET #name = :name, version = version + :inc',
ConditionExpression='version = :current_version',
ExpressionAttributeNames={'#name': 'name'},
ExpressionAttributeValues={
':name': 'New Name',
':inc': 1,
':current_version': 5
}
)
| Command | Description |
|---|---|
aws dynamodb create-table | Create table |
aws dynamodb describe-table | Get table info |
aws dynamodb update-table | Modify table/indexes |
aws dynamodb delete-table | Delete table |
aws dynamodb list-tables | List all tables |
| Command | Description |
|---|---|
aws dynamodb put-item | Create/replace item |
aws dynamodb get-item | Read single item |
aws dynamodb update-item | Update item attributes |
aws dynamodb delete-item | Delete item |
aws dynamodb query | Query by key |
aws dynamodb scan | Full table scan |
| Command | Description |
|---|---|
aws dynamodb batch-write-item | Batch write (25 max) |
aws dynamodb batch-get-item | Batch read (100 max) |
aws dynamodb transact-write-items | Transaction write |
aws dynamodb transact-get-items | Transaction read |
Symptom: ProvisionedThroughputExceededException
Causes:
Solutions:
# Use exponential backoff
import time
from botocore.config import Config
config = Config(
retries={
'max_attempts': 10,
'mode': 'adaptive'
}
)
dynamodb = boto3.resource('dynamodb', config=config)
Debug:
# Check consumed capacity by partition
aws cloudwatch get-metric-statistics \
--namespace AWS/DynamoDB \
--metric-name ConsumedReadCapacityUnits \
--dimensions Name=TableName,Value=Users \
--start-time $(date -d '1 hour ago' -u +%Y-%m-%dT%H:%M:%SZ) \
--end-time $(date -u +%Y-%m-%dT%H:%M:%SZ) \
--period 60 \
--statistics Sum
Solutions:
Debug checklist:
Issue: Scans are slow and expensive
Solutions:
# Parallel scan
import concurrent.futures
def scan_segment(segment, total_segments):
return table.scan(
Segment=segment,
TotalSegments=total_segments
)
with concurrent.futures.ThreadPoolExecutor() as executor:
results = list(executor.map(
lambda s: scan_segment(s, 4),
range(4)
))