From atum-stack-backend
Real-time messaging pattern library for backend services — native WebSocket (ws library for Node, FastAPI WebSocket for Python, Spring Boot STOMP for Java), Socket.IO (fallback for legacy clients + auto-reconnect + rooms + namespaces), Server-Sent Events (SSE for one-way server-to-client streams), WebRTC (peer-to-peer audio/video/data channels via simple-peer or livekit-server-sdk), managed real-time services (Pusher, Ably, PubNub, Supabase Realtime, LiveKit for video), and Cloudflare Durable Objects for stateful edge WebSocket. Covers connection lifecycle (connect, heartbeat, reconnect with exponential backoff), auth patterns (JWT in query param vs subprotocol, cookie-based), message routing (rooms, channels, presence), scaling (horizontal scaling with Redis pub/sub or NATS adapter), load balancing sticky sessions, and monitoring connection counts. Use when building chat, live notifications, collaborative editors, multiplayer games, live dashboards, or real-time updates in any backend.
npx claudepluginhub arnwaldn/atum-plugins-collection --plugin atum-stack-backendThis skill uses the workspace's default tool permissions.
Ce skill couvre tous les moyens d'obtenir du **real-time** entre un backend et un client web/mobile.
Searches, retrieves, and installs Agent Skills from prompts.chat registry using MCP tools like search_skills and get_skill. Activates for finding skills, browsing catalogs, or extending Claude.
Searches prompts.chat for AI prompt templates by keyword or category, retrieves by ID with variable handling, and improves prompts via AI. Use for discovering or enhancing prompts.
Guides implementation of event-driven hooks in Claude Code plugins using prompt-based validation and bash commands for PreToolUse, Stop, and session events.
Ce skill couvre tous les moyens d'obtenir du real-time entre un backend et un client web/mobile.
Quel type de communication ?
├── Server → Client uniquement (notifications, live scores)
│ └── Server-Sent Events (SSE) — plus simple, firewall-friendly
├── Bidirectionnel (chat, collab)
│ ├── < 1000 connexions simultanées → WebSocket natif (ws, Socket.IO)
│ ├── Besoin fallback legacy → Socket.IO
│ ├── Multi-node scale → WebSocket + Redis pub/sub adapter
│ └── > 10k connexions → service managed (Pusher, Ably, Supabase Realtime)
└── Peer-to-peer audio/video
└── WebRTC (signaling via WebSocket)
npm install ws
import { WebSocketServer, WebSocket } from 'ws'
import { createServer } from 'node:http'
import { verify } from 'jsonwebtoken'
const httpServer = createServer()
const wss = new WebSocketServer({ server: httpServer })
interface Client extends WebSocket {
userId?: string
rooms: Set<string>
isAlive: boolean
}
const clients = new Map<string, Client>()
wss.on('connection', (ws: Client, req) => {
const token = new URL(req.url!, 'http://localhost').searchParams.get('token')
if (!token) return ws.close(1008, 'Missing token')
try {
const payload = verify(token, process.env.JWT_SECRET!) as { userId: string }
ws.userId = payload.userId
ws.rooms = new Set()
ws.isAlive = true
clients.set(payload.userId, ws)
} catch {
return ws.close(1008, 'Invalid token')
}
ws.on('pong', () => { ws.isAlive = true })
ws.on('message', async (raw) => {
try {
const msg = JSON.parse(raw.toString())
switch (msg.type) {
case 'join':
ws.rooms.add(msg.room)
break
case 'leave':
ws.rooms.delete(msg.room)
break
case 'message':
await broadcastToRoom(msg.room, { from: ws.userId, text: msg.text })
break
}
} catch (err) {
ws.send(JSON.stringify({ error: 'invalid message' }))
}
})
ws.on('close', () => {
clients.delete(ws.userId!)
})
})
// Heartbeat — détecte les clients morts
setInterval(() => {
wss.clients.forEach((ws) => {
const client = ws as Client
if (!client.isAlive) return client.terminate()
client.isAlive = false
client.ping()
})
}, 30000)
async function broadcastToRoom(room: string, payload: any) {
const json = JSON.stringify({ type: 'message', room, ...payload })
for (const client of clients.values()) {
if (client.readyState === WebSocket.OPEN && client.rooms.has(room)) {
client.send(json)
}
}
}
httpServer.listen(3001)
npm install socket.io
import { Server } from 'socket.io'
import { createServer } from 'node:http'
import { createAdapter } from '@socket.io/redis-adapter'
import { createClient } from 'redis'
const httpServer = createServer()
const io = new Server(httpServer, {
cors: { origin: process.env.FRONTEND_URL },
})
// Multi-node scaling with Redis
const pubClient = createClient({ url: process.env.REDIS_URL })
const subClient = pubClient.duplicate()
await Promise.all([pubClient.connect(), subClient.connect()])
io.adapter(createAdapter(pubClient, subClient))
// Auth middleware
io.use((socket, next) => {
const token = socket.handshake.auth.token
try {
const payload = verify(token, process.env.JWT_SECRET!) as { userId: string }
socket.data.userId = payload.userId
next()
} catch {
next(new Error('Unauthorized'))
}
})
io.on('connection', (socket) => {
console.log(`User ${socket.data.userId} connected`)
socket.on('join', (room: string) => {
socket.join(room)
io.to(room).emit('user-joined', socket.data.userId)
})
socket.on('message', (data: { room: string; text: string }) => {
io.to(data.room).emit('message', {
from: socket.data.userId,
text: data.text,
at: new Date(),
})
})
socket.on('disconnect', () => {
console.log(`User ${socket.data.userId} disconnected`)
})
})
httpServer.listen(3001)
import { io } from 'socket.io-client'
const socket = io('https://api.example.com', {
auth: { token: jwt },
reconnection: true,
reconnectionDelay: 1000,
reconnectionDelayMax: 5000,
reconnectionAttempts: Infinity,
})
socket.on('connect', () => console.log('Connected'))
socket.on('message', (msg) => console.log(msg))
socket.emit('join', 'room1')
socket.emit('message', { room: 'room1', text: 'Hello' })
// Next.js App Router route
export async function GET(request: Request) {
const stream = new ReadableStream({
start(controller) {
const encoder = new TextEncoder()
const send = (data: any) => {
controller.enqueue(encoder.encode(`data: ${JSON.stringify(data)}\n\n`))
}
const interval = setInterval(() => {
send({ time: new Date().toISOString() })
}, 1000)
request.signal.addEventListener('abort', () => {
clearInterval(interval)
controller.close()
})
},
})
return new Response(stream, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
},
})
}
const source = new EventSource('/api/events')
source.onmessage = (e) => {
const data = JSON.parse(e.data)
console.log(data)
}
source.onerror = () => {
source.close()
}
Avantages SSE :
Inconvénients :
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import Dict, Set
app = FastAPI()
class ConnectionManager:
def __init__(self):
self.rooms: Dict[str, Set[WebSocket]] = {}
async def connect(self, ws: WebSocket, room: str):
await ws.accept()
if room not in self.rooms:
self.rooms[room] = set()
self.rooms[room].add(ws)
def disconnect(self, ws: WebSocket, room: str):
if room in self.rooms:
self.rooms[room].discard(ws)
async def broadcast(self, room: str, message: dict):
if room not in self.rooms:
return
for ws in self.rooms[room]:
await ws.send_json(message)
manager = ConnectionManager()
@app.websocket("/ws/{room}")
async def websocket_endpoint(ws: WebSocket, room: str):
await manager.connect(ws, room)
try:
while True:
data = await ws.receive_json()
await manager.broadcast(room, data)
except WebSocketDisconnect:
manager.disconnect(ws, room)
import { createClient } from '@supabase/supabase-js'
const supabase = createClient(url, anonKey)
// Subscribe to table changes (Postgres CDC)
const channel = supabase
.channel('room-1')
.on('postgres_changes', { event: '*', schema: 'public', table: 'messages' }, (payload) => {
console.log('Change:', payload)
})
.on('presence', { event: 'sync' }, () => {
const users = channel.presenceState()
console.log('Users in room:', users)
})
.subscribe()
Supabase Realtime supporte : Postgres CDC, Broadcast (pub/sub), Presence (user online tracking).
// Server
import Pusher from 'pusher'
const pusher = new Pusher({
appId: process.env.PUSHER_APP_ID!,
key: process.env.PUSHER_KEY!,
secret: process.env.PUSHER_SECRET!,
cluster: 'eu',
})
await pusher.trigger('room-1', 'new-message', {
text: 'Hello',
user: 'Arnaud',
})
// Client
import PusherClient from 'pusher-js'
const pusher = new PusherClient(key, { cluster: 'eu' })
const channel = pusher.subscribe('room-1')
channel.bind('new-message', (data: any) => console.log(data))
Pour de la vidéo temps-réel (visioconf, streaming) :
npm install livekit-server-sdk
import { AccessToken } from 'livekit-server-sdk'
const token = new AccessToken(process.env.LIVEKIT_API_KEY!, process.env.LIVEKIT_SECRET!, {
identity: userId,
})
token.addGrant({ room: 'room-1', roomJoin: true, canPublish: true, canSubscribe: true })
const jwt = token.toJwt()
// Envoyer au client pour qu'il se connecte au LiveKit server
Un serveur WebSocket single-node ne scale pas au-delà de ~10-50k connexions concurrentes. Pour plus :
Chaque node reçoit les messages via un adaptateur Redis. Quand un message arrive sur node A, il est publié sur Redis et consommé par tous les autres nodes.
// Socket.IO + Redis adapter (déjà montré plus haut)
import { createAdapter } from '@socket.io/redis-adapter'
io.adapter(createAdapter(pubClient, subClient))
Avec un load balancer, configurer sticky sessions (session affinity) pour que le même client tombe toujours sur le même node. Sinon, chaque reconnect peut atterrir sur un node différent qui ne connaît pas son state.
# Nginx
upstream ws_backend {
ip_hash; # sticky par IP
server ws1.internal:3001;
server ws2.internal:3001;
}
Pour du WebSocket globalement distribué sans backend dédié :
export class ChatRoom {
state: DurableObjectState
sessions: WebSocket[] = []
constructor(state: DurableObjectState) {
this.state = state
}
async fetch(request: Request) {
if (request.headers.get('Upgrade') === 'websocket') {
const pair = new WebSocketPair()
const [client, server] = Object.values(pair)
server.accept()
this.sessions.push(server)
server.addEventListener('message', (event) => {
this.sessions.forEach((s) => s.send(event.data))
})
server.addEventListener('close', () => {
this.sessions = this.sessions.filter((s) => s !== server)
})
return new Response(null, { status: 101, webSocket: client })
}
return new Response('WebSocket only', { status: 426 })
}
}
Chaque room = 1 Durable Object = 1 instance single-threaded qui gère toutes les connexions de la room. Voir skill deploy-cloudflare (dans atum-workflows).
const ws = new WebSocket(`wss://api.example.com/ws?token=${jwt}`)
⚠️ Le token apparaît dans les logs HTTP. OK pour des tokens courts-vie, pas pour les sessions long-vie.
const ws = new WebSocket('wss://api.example.com/ws', ['Bearer', jwt])
Mieux — le token est dans un header custom. Le serveur vérifie Sec-WebSocket-Protocol.
// Le cookie est automatiquement envoyé si même domaine
const ws = new WebSocket('wss://api.example.com/ws')
Le plus sécurisé mais nécessite CORS correctement configuré.
Métriques essentielles à surveiller :
import { register, Gauge, Counter } from 'prom-client'
const activeConnections = new Gauge({ name: 'ws_connections_active', help: 'Active WS connections' })
const messagesTotal = new Counter({ name: 'ws_messages_total', help: 'Total messages', labelNames: ['direction'] })
wss.on('connection', (ws) => {
activeConnections.inc()
ws.on('close', () => activeConnections.dec())
ws.on('message', () => messagesTotal.inc({ direction: 'in' }))
})
deploy-cloudflare (dans atum-workflows)supabase-architect (Phase 3, PR #57)