From example-skills
Integrate with social media platform APIs for automated posting, scheduling, analytics retrieval, and content syndication. Covers OAuth flows, rate limiting, and multi-platform strategies. Triggers on social media API integration, automated posting, or platform API requests.
npx claudepluginhub organvm-iv-taxis/a-i--skills --plugin document-skillsThis skill uses the workspace's default tool permissions.
Build reliable integrations with social platform APIs for automated content distribution.
Compares coding agents like Claude Code and Aider on custom YAML-defined codebase tasks using git worktrees, measuring pass rate, cost, time, and consistency.
Designs and optimizes AI agent action spaces, tool definitions, observation formats, error recovery, and context for higher task completion rates.
Designs, implements, and audits WCAG 2.2 AA accessible UIs for Web (ARIA/HTML5), iOS (SwiftUI traits), and Android (Compose semantics). Audits code for compliance gaps.
Build reliable integrations with social platform APIs for automated content distribution.
| Platform | API Style | Auth | Rate Limits | Key Constraint |
|---|---|---|---|---|
| Bluesky | AT Protocol | App password / OAuth | 3000/5min | Decentralized, open protocol |
| Mastodon | REST | OAuth 2.0 | 300/5min per IP | Instance-specific endpoints |
| REST | OAuth 2.0 | 100/day posts | Strict content policies | |
| Dev.to | REST | API Key | 30/30s | Article-focused |
| Medium | REST | OAuth 2.0 + Bearer | 100/day | Import API only |
| RSS | Pull-based | None | N/A | Read-only syndication |
from authlib.integrations.httpx_client import AsyncOAuth2Client
class SocialOAuth:
def __init__(self, client_id: str, client_secret: str, redirect_uri: str):
self.client = AsyncOAuth2Client(
client_id=client_id,
client_secret=client_secret,
redirect_uri=redirect_uri,
)
def get_auth_url(self, scope: str) -> str:
url, state = self.client.create_authorization_url(
"https://platform.example.com/oauth/authorize",
scope=scope,
)
return url
async def exchange_code(self, code: str) -> dict:
token = await self.client.fetch_token( # allow-secret
"https://platform.example.com/oauth/token",
code=code,
)
return token
import httpx
class DevToClient:
def __init__(self, api_key: str): # allow-secret
self.client = httpx.AsyncClient(
base_url="https://dev.to/api",
headers={"api-key": api_key, "Accept": "application/json"},
)
async def create_article(self, title: str, body: str, tags: list[str], published: bool = False):
return await self.client.post("/articles", json={
"article": {
"title": title,
"body_markdown": body,
"tags": tags,
"published": published,
}
})
from dataclasses import dataclass
from abc import ABC, abstractmethod
@dataclass
class Post:
title: str
body: str
url: str | None = None
tags: list[str] | None = None
image_url: str | None = None
class PlatformAdapter(ABC):
@abstractmethod
async def publish(self, post: Post) -> str:
"""Publish and return the post URL."""
@abstractmethod
def format_content(self, post: Post) -> dict:
"""Format post for platform constraints."""
class BlueskyAdapter(PlatformAdapter):
async def publish(self, post: Post) -> str:
content = self.format_content(post)
# AT Protocol posting
response = await self.client.post(
"com.atproto.repo.createRecord",
json=content,
)
return response["uri"]
def format_content(self, post: Post) -> dict:
text = post.body[:300] # 300 char limit
if post.url:
text = f"{text}\n\n{post.url}"
return {"collection": "app.bsky.feed.post", "record": {"text": text}}
class MastodonAdapter(PlatformAdapter):
async def publish(self, post: Post) -> str:
content = self.format_content(post)
response = await self.client.post("/api/v1/statuses", json=content)
return response.json()["url"]
def format_content(self, post: Post) -> dict:
text = post.body[:500] # 500 char default
if post.url:
text = f"{text}\n\n{post.url}"
return {"status": text, "visibility": "public"}
class ContentDistributor:
def __init__(self, adapters: dict[str, PlatformAdapter]):
self.adapters = adapters
async def distribute(self, post: Post, platforms: list[str] | None = None) -> dict[str, str]:
targets = platforms or list(self.adapters.keys())
results = {}
for platform in targets:
adapter = self.adapters[platform]
try:
url = await adapter.publish(post)
results[platform] = url
except Exception as e:
results[platform] = f"ERROR: {e}"
return results
import asyncio
import time
class RateLimiter:
def __init__(self, max_requests: int, window_seconds: float):
self.max_requests = max_requests
self.window = window_seconds
self.requests: list[float] = []
self.lock = asyncio.Lock()
async def acquire(self):
async with self.lock:
now = time.time()
self.requests = [t for t in self.requests if now - t < self.window]
if len(self.requests) >= self.max_requests:
wait_time = self.requests[0] + self.window - now
await asyncio.sleep(wait_time)
self.requests.append(time.time())
async def api_call_with_retry(func, *args, max_retries: int = 3):
for attempt in range(max_retries):
try:
return await func(*args)
except httpx.HTTPStatusError as e:
if e.response.status_code == 429:
retry_after = int(e.response.headers.get("Retry-After", 60))
await asyncio.sleep(retry_after)
else:
raise
raise Exception("Max retries exceeded")
from datetime import datetime, timedelta
class PostScheduler:
def __init__(self, distributor: ContentDistributor):
self.distributor = distributor
self.queue: list[tuple[datetime, Post, list[str]]] = []
def schedule(self, post: Post, platforms: list[str], publish_at: datetime):
self.queue.append((publish_at, post, platforms))
self.queue.sort(key=lambda x: x[0])
async def run(self):
while self.queue:
publish_at, post, platforms = self.queue[0]
now = datetime.now()
if now >= publish_at:
self.queue.pop(0)
await self.distributor.distribute(post, platforms)
else:
await asyncio.sleep((publish_at - now).total_seconds())
@dataclass
class PostMetrics:
views: int
likes: int
shares: int
comments: int
clicks: int
async def aggregate_metrics(post_urls: dict[str, str]) -> dict[str, PostMetrics]:
metrics = {}
for platform, url in post_urls.items():
adapter = adapters[platform]
metrics[platform] = await adapter.get_metrics(url)
return metrics