Provides expertise on n8n workflow automation including API integration, nodes, triggers, expressions, and MCP setup for direct Claude integration.
How this skill is triggered — by the user, by Claude, or both
Slash command
/integrations:n8nThe summary Claude sees in its skill listing — used to decide when to auto-load this skill
Expert skill for using n8n - powerful workflow automation platform with 400+ integrations.
Expert skill for using n8n - powerful workflow automation platform with 400+ integrations.
Two versions available:
import os
# API keys: ~/.claude/.credentials.master.env
# 1. SERVER VERSION (self-hosted on your-server)
N8N_SERVER_URL = os.getenv('N8N_SERVER_URL') # http://YOUR_SERVER_IP:5678/api/v1
N8N_SERVER_API_KEY = os.getenv('N8N_SERVER_API_KEY')
# 2. CLOUD VERSION (n8n.cloud - managed)
N8N_CLOUD_URL = os.getenv('N8N_CLOUD_URL') # https://your-name.app.n8n.cloud/api/v1
N8N_CLOUD_API_KEY = os.getenv('N8N_CLOUD_API_KEY')
# 3. MCP SERVER (for Claude integration)
N8N_MCP_SERVER_URL = os.getenv('N8N_MCP_SERVER_URL') # https://your-name.app.n8n.cloud/mcp-server/http
N8N_MCP_ACCESS_TOKEN = os.getenv('N8N_MCP_ACCESS_TOKEN')
n8n supports MCP (Model Context Protocol) for direct integration with Claude:
// Add to mcp.json for Claude Desktop/Code
{
"mcpServers": {
"n8n": {
"url": "https://your-name.app.n8n.cloud/mcp-server/http",
"transport": "http",
"headers": {
"Authorization": "Bearer <N8N_MCP_ACCESS_TOKEN>"
}
}
}
}
Important: MCP access must be enabled in workflow settings in n8n.
Best for:
Advantages:
Trigger Node → Processing Node → Action Node
↓ ↓ ↓
Webhook Transform Send Email
Schedule Filter Slack Message
App Event Merge Database Write
{
"nodes": [
{
"name": "Webhook",
"type": "n8n-nodes-base.webhook",
"position": [250, 300],
"parameters": {
"path": "my-webhook",
"httpMethod": "POST"
}
},
{
"name": "Transform",
"type": "n8n-nodes-base.set",
"position": [450, 300],
"parameters": {
"values": {
"string": [
{
"name": "message",
"value": "={{ $json.body.text }}"
}
]
}
}
}
],
"connections": {
"Webhook": {
"main": [
[{ "node": "Transform", "type": "main", "index": 0 }]
]
}
}
}
// Access current item data
{{ $json.fieldName }}
{{ $json.nested.field }}
{{ $json.array[0] }}
// Access data from other nodes
{{ $node["NodeName"].json.field }}
{{ $("NodeName").item.json.field }}
// Access all items from a node
{{ $node["NodeName"].data }}
// String manipulation
{{ $json.name.toUpperCase() }}
{{ $json.email.split('@')[0] }}
{{ `Hello ${$json.name}!` }}
// Date operations
{{ new Date().toISOString() }}
{{ DateTime.now().toFormat('yyyy-MM-dd') }}
{{ DateTime.fromISO($json.date).plus({ days: 7 }) }}
// Conditionals
{{ $json.status === 'active' ? 'Yes' : 'No' }}
{{ $json.amount > 100 ? 'large' : 'small' }}
// Array operations
{{ $json.items.length }}
{{ $json.items.map(i => i.name).join(', ') }}
{{ $json.items.filter(i => i.active) }}
// String
{{ $json.text.trim() }}
{{ $json.text.replace('old', 'new') }}
{{ $json.text.includes('keyword') }}
// Numbers
{{ Math.round($json.amount * 100) / 100 }}
{{ parseInt($json.value) }}
// JSON
{{ JSON.stringify($json.data) }}
{{ JSON.parse($json.jsonString) }}
// URL
{{ encodeURIComponent($json.query) }}
// Crypto
{{ $uuid() }}
{{ $hash('sha256', $json.data) }}
// Set new fields
{
"values": {
"string": [
{ "name": "fullName", "value": "={{ $json.firstName }} {{ $json.lastName }}" }
],
"number": [
{ "name": "total", "value": "={{ $json.price * $json.quantity }}" }
],
"boolean": [
{ "name": "isVIP", "value": "={{ $json.totalPurchases > 1000 }}" }
]
}
}
// Process each item
for (const item of $input.all()) {
item.json.processed = true;
item.json.timestamp = new Date().toISOString();
}
return $input.all();
// Create new items
return [
{ json: { name: 'Item 1', value: 100 } },
{ json: { name: 'Item 2', value: 200 } }
];
// Filter items
return $input.all().filter(item => item.json.status === 'active');
// Aggregate
const total = $input.all().reduce((sum, item) => sum + item.json.amount, 0);
return [{ json: { total } }];
// Merge modes:
// - Append: Combine all items
// - Merge by Index: Match items by position
// - Merge by Key: Match items by field value
// - Multiplex: Create all combinations
// Merge by Key configuration
{
"mode": "mergeByKey",
"propertyName1": "id",
"propertyName2": "userId",
"options": {
"clash_handling": "overwrite"
}
}
pip install requests
import requests
import os
# Choose version
USE_CLOUD = False # True for cloud, False for server
if USE_CLOUD:
BASE_URL = os.getenv('N8N_CLOUD_URL')
API_KEY = os.getenv('N8N_CLOUD_API_KEY')
else:
BASE_URL = os.getenv('N8N_SERVER_URL')
API_KEY = os.getenv('N8N_SERVER_API_KEY')
headers = {
"X-N8N-API-KEY": API_KEY,
"Content-Type": "application/json"
}
def list_workflows(active_only: bool = False):
"""Get all workflows."""
params = {}
if active_only:
params["active"] = "true"
response = requests.get(
f"{BASE_URL}/workflows",
headers=headers,
params=params
)
workflows = response.json()["data"]
return [
{
"id": w["id"],
"name": w["name"],
"active": w["active"],
"updatedAt": w["updatedAt"]
}
for w in workflows
]
def get_workflow(workflow_id: str):
"""Get workflow by ID."""
response = requests.get(
f"{BASE_URL}/workflows/{workflow_id}",
headers=headers
)
return response.json()
def create_workflow(name: str, nodes: list, connections: dict):
"""
Create new workflow.
Args:
name: Workflow name
nodes: List of node definitions
connections: Node connections map
"""
payload = {
"name": name,
"nodes": nodes,
"connections": connections,
"settings": {}
}
response = requests.post(
f"{BASE_URL}/workflows",
headers=headers,
json=payload
)
return response.json()
# Example: Simple HTTP Request workflow
webhook_node = {
"name": "Webhook",
"type": "n8n-nodes-base.webhook",
"typeVersion": 1,
"position": [250, 300],
"parameters": {
"path": "my-webhook",
"httpMethod": "POST"
}
}
response_node = {
"name": "Respond",
"type": "n8n-nodes-base.respondToWebhook",
"typeVersion": 1,
"position": [450, 300],
"parameters": {
"respondWith": "json",
"responseBody": "={{ JSON.stringify({ success: true }) }}"
}
}
connections = {
"Webhook": {
"main": [[{"node": "Respond", "type": "main", "index": 0}]]
}
}
workflow = create_workflow("My Webhook", [webhook_node, response_node], connections)
def activate_workflow(workflow_id: str, activate: bool = True):
"""Activate or deactivate workflow."""
response = requests.patch(
f"{BASE_URL}/workflows/{workflow_id}",
headers=headers,
json={"active": activate}
)
return response.json()
def execute_workflow(workflow_id: str, data: dict = None):
"""
Execute workflow manually.
Args:
workflow_id: Workflow to execute
data: Input data for workflow
"""
payload = {}
if data:
payload["data"] = data
response = requests.post(
f"{BASE_URL}/workflows/{workflow_id}/execute",
headers=headers,
json=payload
)
return response.json()
# Usage
result = execute_workflow("123", {"name": "John", "email": "[email protected]"})
def get_executions(workflow_id: str = None, status: str = None, limit: int = 20):
"""
Get workflow executions.
Args:
workflow_id: Filter by workflow
status: "success", "error", "waiting"
limit: Max results
"""
params = {"limit": limit}
if workflow_id:
params["workflowId"] = workflow_id
if status:
params["status"] = status
response = requests.get(
f"{BASE_URL}/executions",
headers=headers,
params=params
)
return response.json()["data"]
def delete_workflow(workflow_id: str):
"""Delete workflow."""
response = requests.delete(
f"{BASE_URL}/workflows/{workflow_id}",
headers=headers
)
return response.status_code == 200
http_api_workflow = {
"name": "API Wrapper",
"nodes": [
{
"name": "Webhook",
"type": "n8n-nodes-base.webhook",
"position": [250, 300],
"parameters": {
"path": "api",
"httpMethod": "={{$parameter.httpMethod}}",
"responseMode": "responseNode"
}
},
{
"name": "HTTP Request",
"type": "n8n-nodes-base.httpRequest",
"position": [450, 300],
"parameters": {
"url": "https://api.example.com/data",
"method": "GET"
}
},
{
"name": "Respond",
"type": "n8n-nodes-base.respondToWebhook",
"position": [650, 300],
"parameters": {
"respondWith": "json"
}
}
]
}
sync_workflow = {
"name": "Daily Data Sync",
"nodes": [
{
"name": "Schedule",
"type": "n8n-nodes-base.scheduleTrigger",
"position": [250, 300],
"parameters": {
"rule": {
"interval": [{"field": "cronExpression", "expression": "0 9 * * *"}]
}
}
},
{
"name": "Fetch Data",
"type": "n8n-nodes-base.httpRequest",
"position": [450, 300],
"parameters": {
"url": "https://api.source.com/data",
"method": "GET"
}
},
{
"name": "Transform",
"type": "n8n-nodes-base.code",
"position": [650, 300],
"parameters": {
"jsCode": "return items.map(item => ({ json: { ...item.json, processed: true } }));"
}
},
{
"name": "Save",
"type": "n8n-nodes-base.httpRequest",
"position": [850, 300],
"parameters": {
"url": "https://api.destination.com/import",
"method": "POST"
}
}
]
}
// Basic GET
{
"method": "GET",
"url": "https://api.example.com/users",
"authentication": "predefinedCredential",
"credential": "myApiCredential"
}
// POST with JSON body
{
"method": "POST",
"url": "https://api.example.com/users",
"sendBody": true,
"bodyContentType": "json",
"body": {
"name": "={{ $json.name }}",
"email": "={{ $json.email }}"
}
}
// With headers
{
"options": {
"headers": {
"X-Custom-Header": "value"
}
}
}
// Loop through pages
{
"options": {
"pagination": {
"type": "offset",
"paginationCompleteWhen": "receiveSpecificStatusCodes",
"statusCodes": "404"
}
}
}
// Catch errors from workflow
{
"errorTriggerType": "workflow"
}
// Access error info
{{ $json.error.message }}
{{ $json.error.node }}
{{ $json.execution.id }}
Start → HTTP Request → Success Path
↓
Error Handler → Slack Alert
// In Code node
const maxRetries = 3;
let retries = 0;
while (retries < maxRetries) {
try {
// Make request
return result;
} catch (error) {
retries++;
if (retries === maxRetries) throw error;
await new Promise(r => setTimeout(r, 1000 * retries));
}
}
Webhook → Validate → Transform → Database → Respond
// Webhook with custom response
{
"responseMode": "lastNode",
"responseData": "firstEntryJson"
}
Schedule → Fetch API → Transform → Upsert DB → Log
// Every hour sync
{
"rule": { "cronExpression": "0 * * * *" }
}
Form Submit → Create Ticket → Wait for Approval → Process
↓
Send Notification
Trigger → Extract (API) → Transform → Load (DB) → Report
↓
Error Handler → Alert
| Node | Description |
|---|---|
webhook | HTTP webhook trigger |
schedule | Cron/schedule trigger |
httpRequest | Make HTTP requests |
code | Custom JS/Python code |
set | Set/transform data |
if | Conditional logic |
switch | Multi-branch routing |
merge | Combine data streams |
splitInBatches | Batch processing |
function | Custom JS function |
// Webhook node configuration
{
"path": "my-webhook",
"httpMethod": "POST",
"responseMode": "onReceived",
"responseData": "allEntries"
}
// Access webhook data
{{ $json.body }} // POST body
{{ $json.query }} // Query params
{{ $json.headers }} // Headers
// Cron expressions
{
"rule": {
"cronExpression": "0 9 * * *" // Every day at 9 AM
}
}
// Common patterns:
// "*/5 * * * *" - Every 5 minutes
// "0 * * * *" - Every hour
// "0 9 * * 1-5" - Weekdays at 9 AM
// "0 0 1 * *" - First of month
// Gmail Trigger
{
"pollTimes": {
"item": [{ "mode": "everyMinute" }]
},
"filters": {
"readStatus": "unread"
}
}
// Slack Trigger
{
"channel": "#general",
"event": "message"
}
import os
def trigger_webhook(webhook_path: str, data: dict, use_cloud: bool = False):
"""Trigger n8n webhook."""
if use_cloud:
# Cloud version
webhook_url = f"https://your-name.app.n8n.cloud/webhook/{webhook_path}"
else:
# Server version
webhook_url = f"http://YOUR_SERVER_IP:5678/webhook/{webhook_path}"
response = requests.post(webhook_url, json=data)
return response.json()
# Usage
result = trigger_webhook("my-webhook", {"message": "Hello from Python!"})
| Category | Nodes |
|---|---|
| AI | OpenAI, Claude, Gemini, Ollama |
| Communication | Telegram, Slack, Discord, Email |
| CRM | HubSpot, Salesforce, Pipedrive |
| Databases | PostgreSQL, MySQL, MongoDB, Redis |
| Storage | Google Drive, Dropbox, S3 |
| Dev | GitHub, GitLab, Jira, Linear |
| Marketing | Mailchimp, Sendgrid, ActiveCampaign |
| Productivity | Notion, Airtable, Google Sheets |
| Feature | Server | Cloud |
|---|---|---|
| Control | Full | Limited |
| Updates | Manual | Auto |
| Scaling | Manual | Auto |
| Cost | Server costs | Subscription |
| Uptime | Your responsibility | 99.9% SLA |
| Custom nodes | Yes | Limited |
| Endpoint | Method | Purpose |
|---|---|---|
/workflows | GET | List workflows |
/workflows | POST | Create workflow |
/workflows/{id} | GET | Get workflow |
/workflows/{id} | PATCH | Update workflow |
/workflows/{id} | DELETE | Delete workflow |
/workflows/{id}/execute | POST | Execute workflow |
/executions | GET | List executions |
/executions/{id} | GET | Get execution |
/credentials | GET | List credentials |
npx claudepluginhub jhamidun/claude-code-config-pack --plugin integrationsSets up isolated workspaces using native worktree tools or git worktree fallback. Use before starting feature work to protect the current branch.