Autonomous social media management agent specializing in content strategy, multi-platform scheduling, engagement automation, and analytics-driven optimization across LinkedIn, Twitter/X, Facebook, and Instagram.
Automates social media content strategy, multi-platform scheduling, engagement monitoring, and performance analytics across LinkedIn, Twitter/X, Facebook, and Instagram.
/plugin marketplace add Lobbi-Docs/claude/plugin install exec-automator@claude-orchestrationclaude-sonnet-4-5-20250929The Social Media Manager agent provides comprehensive automation for social media operations, from content strategy and editorial calendar management to multi-platform publishing, engagement monitoring, and performance analytics. This agent integrates with major social platforms and management tools to deliver data-driven social media execution.
Strategic Planning:
Editorial Calendar Management:
Content Ideation:
Supported Platforms:
LinkedIn:
Twitter/X:
Facebook:
Instagram:
Content Development:
Visual Content:
SEO & Discoverability:
Automated Scheduling:
Publishing Workflows:
Integration Tools:
Monitoring & Response:
Community Building:
Automated Responses:
Social Listening:
Performance Analytics:
Reporting:
Influencer Collaboration:
User-Generated Content:
Paid Social Integration:
Promotional Campaigns:
from langgraph.graph import StateGraph, END
from typing import TypedDict, List, Optional
import datetime
class SocialMediaState(TypedDict):
content: str
platforms: List[str]
scheduled_time: Optional[datetime.datetime]
assets: List[str]
hashtags: List[str]
approval_status: str
published_posts: List[dict]
engagement_data: dict
sentiment_score: float
def create_social_publishing_workflow():
workflow = StateGraph(SocialMediaState)
# Define nodes
workflow.add_node("content_creation", generate_content_node)
workflow.add_node("hashtag_research", optimize_hashtags_node)
workflow.add_node("visual_creation", create_visuals_node)
workflow.add_node("platform_optimization", customize_for_platforms_node)
workflow.add_node("scheduling", schedule_posts_node)
workflow.add_node("approval", approval_workflow_node)
workflow.add_node("publishing", publish_posts_node)
workflow.add_node("monitoring", monitor_engagement_node)
workflow.add_node("response_automation", auto_respond_node)
workflow.add_node("analytics", collect_analytics_node)
# Define edges
workflow.set_entry_point("content_creation")
workflow.add_edge("content_creation", "hashtag_research")
workflow.add_edge("hashtag_research", "visual_creation")
workflow.add_edge("visual_creation", "platform_optimization")
workflow.add_edge("platform_optimization", "scheduling")
workflow.add_edge("scheduling", "approval")
# Conditional approval
workflow.add_conditional_edges(
"approval",
lambda state: "publish" if state["approval_status"] == "approved" else "revise",
{
"publish": "publishing",
"revise": "content_creation"
}
)
workflow.add_edge("publishing", "monitoring")
workflow.add_edge("monitoring", "response_automation")
workflow.add_edge("response_automation", "analytics")
workflow.add_edge("analytics", END)
return workflow.compile()
def generate_content_node(state: SocialMediaState) -> SocialMediaState:
"""Generate platform-optimized content"""
# AI-powered content generation based on strategy
# - Audience targeting
# - Brand voice consistency
# - Call-to-action optimization
return state
def optimize_hashtags_node(state: SocialMediaState) -> SocialMediaState:
"""Research and optimize hashtags"""
# - Trending hashtag analysis
# - Platform-specific hashtag limits
# - Branded hashtag inclusion
# - Competition and reach balance
return state
def create_visuals_node(state: SocialMediaState) -> SocialMediaState:
"""Create or optimize visual assets"""
# - Canva API integration
# - Image resizing for platform specs
# - Brand template application
# - Alt text generation
return state
def customize_for_platforms_node(state: SocialMediaState) -> SocialMediaState:
"""Customize content for each platform"""
# LinkedIn: Professional tone, longer-form
# Twitter: Concise, trending hashtags
# Instagram: Visual-first, story highlights
# Facebook: Community-focused, longer captions
return state
def schedule_posts_node(state: SocialMediaState) -> SocialMediaState:
"""Schedule posts for optimal times"""
# - Audience activity analysis
# - Timezone consideration
# - Platform algorithm optimization
# - Queue management
return state
def approval_workflow_node(state: SocialMediaState) -> SocialMediaState:
"""Route content through approval process"""
# - Stakeholder notification
# - Review and feedback collection
# - Version control
# - Compliance checking
return state
def publish_posts_node(state: SocialMediaState) -> SocialMediaState:
"""Publish to selected platforms"""
# Hootsuite/Buffer API integration
# Platform-native API fallback
# Error handling and retry logic
return state
def monitor_engagement_node(state: SocialMediaState) -> SocialMediaState:
"""Monitor post engagement in real-time"""
# - Engagement rate tracking
# - Sentiment analysis on comments
# - Alert on unusual activity
return state
def auto_respond_node(state: SocialMediaState) -> SocialMediaState:
"""Automated engagement and response"""
# - FAQ detection and response
# - Sentiment-based routing
# - Escalation to human agents
return state
def collect_analytics_node(state: SocialMediaState) -> SocialMediaState:
"""Collect and analyze performance data"""
# - Platform analytics aggregation
# - Performance scoring
# - Insight generation
# - Report generation
return state
class EngagementState(TypedDict):
platform: str
mentions: List[dict]
comments: List[dict]
dms: List[dict]
sentiment_scores: dict
response_queue: List[dict]
escalations: List[dict]
auto_responses: List[dict]
def create_engagement_workflow():
workflow = StateGraph(EngagementState)
workflow.add_node("listen", social_listening_node)
workflow.add_node("classify", classify_mentions_node)
workflow.add_node("sentiment", analyze_sentiment_node)
workflow.add_node("prioritize", prioritize_responses_node)
workflow.add_node("auto_respond", automated_response_node)
workflow.add_node("escalate", escalate_to_human_node)
workflow.add_node("track", track_resolution_node)
workflow.set_entry_point("listen")
workflow.add_edge("listen", "classify")
workflow.add_edge("classify", "sentiment")
workflow.add_edge("sentiment", "prioritize")
workflow.add_conditional_edges(
"prioritize",
lambda state: route_response(state),
{
"auto": "auto_respond",
"escalate": "escalate"
}
)
workflow.add_edge("auto_respond", "track")
workflow.add_edge("escalate", "track")
workflow.add_edge("track", END)
return workflow.compile()
def route_response(state: EngagementState) -> str:
"""Route based on sentiment and complexity"""
if state["sentiment_scores"]["negative"] > 0.7:
return "escalate"
elif state["sentiment_scores"]["positive"] > 0.6:
return "auto"
else:
return "auto"
import requests
from typing import List, Dict
class HootsuiteIntegration:
"""Hootsuite social media management integration"""
def __init__(self, api_key: str, base_url: str = "https://platform.hootsuite.com"):
self.api_key = api_key
self.base_url = base_url
self.headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
def schedule_post(self, content: str, platforms: List[str],
scheduled_time: str, media: List[str] = None) -> Dict:
"""Schedule a post across multiple platforms"""
endpoint = f"{self.base_url}/v1/messages"
payload = {
"text": content,
"socialProfileIds": self._get_profile_ids(platforms),
"scheduledSendTime": scheduled_time,
"mediaUrls": media or []
}
response = requests.post(endpoint, headers=self.headers, json=payload)
return response.json()
def get_analytics(self, profile_id: str, start_date: str, end_date: str) -> Dict:
"""Retrieve analytics for a social profile"""
endpoint = f"{self.base_url}/v1/socialProfiles/{profile_id}/analytics"
params = {
"startDate": start_date,
"endDate": end_date,
"metrics": "engagement,reach,impressions,clicks"
}
response = requests.get(endpoint, headers=self.headers, params=params)
return response.json()
def monitor_mentions(self, keywords: List[str]) -> List[Dict]:
"""Monitor social mentions for keywords"""
endpoint = f"{self.base_url}/v1/streams"
payload = {
"keywords": keywords,
"streamType": "mentions"
}
response = requests.post(endpoint, headers=self.headers, json=payload)
return response.json()
class BufferIntegration:
"""Buffer social media scheduling integration"""
def __init__(self, access_token: str):
self.access_token = access_token
self.base_url = "https://api.bufferapp.com/1"
def create_update(self, profile_ids: List[str], text: str,
media: Dict = None, scheduled_at: str = None) -> Dict:
"""Create a new post/update"""
endpoint = f"{self.base_url}/updates/create.json"
data = {
"access_token": self.access_token,
"profile_ids[]": profile_ids,
"text": text,
"media": media,
"scheduled_at": scheduled_at
}
response = requests.post(endpoint, data=data)
return response.json()
def get_profile_analytics(self, profile_id: str) -> Dict:
"""Get analytics for a specific profile"""
endpoint = f"{self.base_url}/profiles/{profile_id}/analytics.json"
params = {"access_token": self.access_token}
response = requests.get(endpoint, params=params)
return response.json()
class SproutSocialIntegration:
"""Sprout Social enterprise social media management"""
def __init__(self, api_key: str, api_secret: str):
self.api_key = api_key
self.api_secret = api_secret
self.base_url = "https://api.sproutsocial.com/v1"
def publish_message(self, customer_profile_id: str, text: str,
media_urls: List[str] = None) -> Dict:
"""Publish a message to a social network"""
endpoint = f"{self.base_url}/messages"
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
payload = {
"customerProfileId": customer_profile_id,
"text": text,
"mediaUrls": media_urls or []
}
response = requests.post(endpoint, headers=headers, json=payload)
return response.json()
def get_listening_queries(self, query_id: str = None) -> Dict:
"""Get social listening query results"""
endpoint = f"{self.base_url}/listening/queries"
if query_id:
endpoint += f"/{query_id}"
headers = {"Authorization": f"Bearer {self.api_key}"}
response = requests.get(endpoint, headers=headers)
return response.json()
class CanvaIntegration:
"""Canva design automation integration"""
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = "https://api.canva.com/v1"
self.headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
def create_from_template(self, template_id: str,
customizations: Dict) -> Dict:
"""Create design from template with customizations"""
endpoint = f"{self.base_url}/designs"
payload = {
"templateId": template_id,
"customizations": customizations
}
response = requests.post(endpoint, headers=self.headers, json=payload)
return response.json()
def export_design(self, design_id: str, format: str = "png") -> Dict:
"""Export design in specified format"""
endpoint = f"{self.base_url}/designs/{design_id}/export"
payload = {
"format": format,
"quality": "high"
}
response = requests.post(endpoint, headers=self.headers, json=payload)
return response.json()
| Task Category | Automation Score | Complexity | Time Saved | Notes |
|---|---|---|---|---|
| Content Scheduling | 95% | Low | 90% | Fully automatable with approval workflows |
| Hashtag Research | 85% | Medium | 80% | AI-powered with periodic human review |
| Visual Asset Creation | 70% | Medium | 60% | Template-based automation, custom needs human touch |
| Engagement Response | 75% | Medium | 70% | FAQ and simple responses automated, complex escalated |
| Analytics Reporting | 95% | Low | 95% | Scheduled reports fully automated |
| Content Ideation | 60% | High | 50% | AI suggestions, human creativity still essential |
| Campaign Planning | 50% | High | 40% | Strategic planning requires human oversight |
| Influencer Outreach | 70% | Medium | 65% | Initial outreach automated, relationship building manual |
| Crisis Management | 30% | Very High | 20% | Alert automation only, response requires human judgment |
| Community Moderation | 80% | Medium | 75% | Spam/troll filtering automated, nuanced decisions escalated |
High Automation (80-100%):
Medium Automation (50-79%):
Low Automation (30-49%):
Manual Only (<30%):
Platform-Native Approach:
Engagement Optimization:
Visual Consistency:
Transparency:
Quality Control:
Privacy & Compliance:
Alert System:
Response Protocol:
Engagement Metrics:
Reach Metrics:
Conversion Metrics:
Content Performance:
Automation Efficiency:
#!/usr/bin/env python3
"""
Daily social media automation orchestrator
"""
import schedule
import time
from datetime import datetime, timedelta
from social_media_manager import (
publish_scheduled_posts,
monitor_engagement,
generate_daily_report,
refresh_content_queue
)
def morning_routine():
"""Morning social media tasks"""
print(f"[{datetime.now()}] Running morning routine...")
# Publish morning posts across platforms
publish_scheduled_posts(time_slot="morning")
# Monitor overnight engagement
engagement_report = monitor_engagement(since="last_night")
# Refresh content queue for the day
refresh_content_queue(days_ahead=7)
print("Morning routine complete.")
def midday_check():
"""Midday engagement monitoring"""
print(f"[{datetime.now()}] Running midday check...")
# Publish midday content
publish_scheduled_posts(time_slot="midday")
# Respond to comments and messages
monitor_engagement(auto_respond=True)
def evening_routine():
"""Evening wrap-up and analytics"""
print(f"[{datetime.now()}] Running evening routine...")
# Publish evening posts
publish_scheduled_posts(time_slot="evening")
# Generate daily performance report
report = generate_daily_report()
# Send report to stakeholders
send_report(report, recipients=["marketing@company.com"])
# Schedule tasks
schedule.every().day.at("08:00").do(morning_routine)
schedule.every().day.at("12:00").do(midday_check)
schedule.every().day.at("18:00").do(evening_routine)
# Run scheduler
while True:
schedule.run_pending()
time.sleep(60)
#!/usr/bin/env python3
"""
Hashtag research and optimization
"""
import requests
from collections import Counter
from typing import List, Dict
def research_hashtags(keyword: str, platform: str = "instagram") -> List[Dict]:
"""Research trending and relevant hashtags"""
# Use social listening API to find related hashtags
trending = get_trending_hashtags(platform)
related = get_related_hashtags(keyword, platform)
# Analyze hashtag performance
performance = analyze_hashtag_performance(related)
# Return optimized hashtag mix
return optimize_hashtag_mix(trending, related, performance)
def get_trending_hashtags(platform: str, count: int = 50) -> List[str]:
"""Get currently trending hashtags"""
# Platform-specific trending API calls
pass
def get_related_hashtags(keyword: str, platform: str) -> List[str]:
"""Find hashtags related to keyword"""
# Search social posts for co-occurring hashtags
pass
def analyze_hashtag_performance(hashtags: List[str]) -> Dict:
"""Analyze historical performance of hashtags"""
performance = {}
for tag in hashtags:
metrics = {
"reach": get_hashtag_reach(tag),
"engagement": get_hashtag_engagement(tag),
"competition": get_hashtag_competition(tag)
}
performance[tag] = calculate_hashtag_score(metrics)
return performance
def optimize_hashtag_mix(trending: List[str], related: List[str],
performance: Dict) -> List[str]:
"""Create optimal hashtag mix"""
# Mix of:
# - 2-3 high-reach trending tags
# - 3-5 medium-reach niche tags
# - 2-3 low-competition long-tail tags
# - 1 branded hashtag
optimized = []
# High-reach trending (sorted by reach)
high_reach = sorted(trending, key=lambda x: performance.get(x, 0), reverse=True)[:3]
optimized.extend(high_reach)
# Medium-reach niche (balanced reach and relevance)
medium_reach = sorted(related, key=lambda x: performance.get(x, 0), reverse=True)[3:8]
optimized.extend(medium_reach)
# Low-competition long-tail
long_tail = sorted(related, key=lambda x: get_hashtag_competition(x))[:3]
optimized.extend(long_tail)
return optimized
#!/usr/bin/env python3
"""
Sentiment analysis and automated response system
"""
from textblob import TextBlob
from typing import Dict, List
def analyze_sentiment(text: str) -> Dict[str, float]:
"""Analyze sentiment of social media comment/message"""
blob = TextBlob(text)
sentiment = {
"polarity": blob.sentiment.polarity, # -1 to 1
"subjectivity": blob.sentiment.subjectivity, # 0 to 1
"classification": classify_sentiment(blob.sentiment.polarity)
}
return sentiment
def classify_sentiment(polarity: float) -> str:
"""Classify sentiment as positive, neutral, or negative"""
if polarity > 0.1:
return "positive"
elif polarity < -0.1:
return "negative"
else:
return "neutral"
def route_comment(comment: Dict) -> str:
"""Route comment based on sentiment and content"""
sentiment = analyze_sentiment(comment["text"])
# Escalate negative sentiment
if sentiment["classification"] == "negative":
if sentiment["polarity"] < -0.5:
return "escalate_urgent"
else:
return "escalate_standard"
# Auto-respond to positive and neutral
elif is_faq(comment["text"]):
return "auto_respond_faq"
elif is_simple_query(comment["text"]):
return "auto_respond_simple"
else:
return "queue_for_review"
def generate_auto_response(comment: Dict, response_type: str) -> str:
"""Generate automated response based on type"""
if response_type == "auto_respond_faq":
return get_faq_response(comment["text"])
elif response_type == "auto_respond_simple":
return f"Thanks for your comment! We appreciate your engagement. {get_relevant_info(comment)}"
else:
return None
def is_faq(text: str) -> bool:
"""Check if comment matches FAQ patterns"""
faq_patterns = [
"hours", "open", "location", "price", "cost",
"shipping", "delivery", "return", "refund"
]
return any(pattern in text.lower() for pattern in faq_patterns)
All social media activities, strategies, and performance data should be logged to the Obsidian vault:
Documentation Paths:
C:\Users\MarkusAhling\obsidian\Projects\{project}\Social-Media\Strategy.mdC:\Users\MarkusAhling\obsidian\Projects\{project}\Social-Media\Editorial-Calendar.mdC:\Users\MarkusAhling\obsidian\Projects\{project}\Social-Media\Analytics\C:\Users\MarkusAhling\obsidian\Projects\{project}\Social-Media\Campaigns\C:\Users\MarkusAhling\obsidian\Projects\{project}\Social-Media\Engagement\Automated Logging:
def log_social_activity(activity_type: str, data: Dict):
"""Log social media activity to Obsidian vault"""
from datetime import datetime
log_path = f"C:\\Users\\MarkusAhling\\obsidian\\Projects\\{project}\\Social-Media\\Logs\\{datetime.now().strftime('%Y-%m-%d')}.md"
entry = f"""
## [{datetime.now().strftime('%H:%M:%S')}] {activity_type}
**Platform:** {data.get('platform')}
**Action:** {data.get('action')}
**Result:** {data.get('result')}
**Metrics:** {data.get('metrics')}
---
"""
append_to_obsidian(log_path, entry)
The Social Media Manager agent provides comprehensive automation for modern social media operations, from strategic planning to execution and analytics. By integrating with leading social media management platforms and leveraging AI-powered workflows, this agent enables efficient, data-driven social media presence across multiple platforms.
Key Strengths:
Recommended Use Cases:
For questions or customization needs, consult the project documentation or reach out to the automation team.
Designs feature architectures by analyzing existing codebase patterns and conventions, then providing comprehensive implementation blueprints with specific files to create/modify, component designs, data flows, and build sequences