From harness-claude
Implements GraphQL subscriptions over WebSockets for real-time updates like chat messages, notifications, live scores, and collaborative editing. Covers schema definition, graphql-ws setup, Apollo Server integration, and PubSub with Redis.
npx claudepluginhub intense-visions/harness-engineering --plugin harness-claudeThis skill uses the workspace's default tool permissions.
> Implement real-time data streaming with GraphQL subscriptions over WebSocket connections
Implements real-time GraphQL subscriptions with Absinthe in Phoenix apps. Covers endpoint/socket setup, PubSub config, subscription definitions, triggers, and publishing from mutations.
Configures GraphQL subscription setups with step-by-step guidance, production-ready code generation, best practices, and validation for API development tasks.
Streams real-time events like notifications, chat messages, and live updates to clients over WebSocket using tRPC subscriptions and observables. Replaces polling for lower latency and server-push collaborative features.
Share bugs, ideas, or general feedback.
Implement real-time data streaming with GraphQL subscriptions over WebSocket connections
type Subscription {
messageAdded(channelId: ID!): Message!
orderStatusChanged(orderId: ID!): Order!
}
type Message {
id: ID!
content: String!
author: User!
createdAt: DateTime!
}
graphql-ws for the WebSocket transport. The older subscriptions-transport-ws is unmaintained. graphql-ws implements the GraphQL over WebSocket protocol correctly with proper connection lifecycle handling.npm install graphql-ws ws
import { createServer } from 'http';
import { WebSocketServer } from 'ws';
import { useServer } from 'graphql-ws/lib/use/ws';
import { ApolloServer } from '@apollo/server';
import { expressMiddleware } from '@apollo/server/express4';
import { makeExecutableSchema } from '@graphql-tools/schema';
const schema = makeExecutableSchema({ typeDefs, resolvers });
const httpServer = createServer(app);
const wsServer = new WebSocketServer({
server: httpServer,
path: '/graphql',
});
const serverCleanup = useServer(
{
schema,
context: async (ctx) => {
const token = ctx.connectionParams?.authToken;
return { currentUser: await authenticate(token as string) };
},
},
wsServer
);
const server = new ApolloServer({
schema,
plugins: [
{
async serverWillStart() {
return {
async drainServer() {
await serverCleanup.dispose();
},
};
},
},
],
});
import { PubSub } from 'graphql-subscriptions';
// Development only — in-memory, single-process
const pubsub = new PubSub();
// Production — use Redis-backed PubSub
import { RedisPubSub } from 'graphql-redis-subscriptions';
const pubsub = new RedisPubSub({
connection: { host: 'redis', port: 6379 },
});
subscribe and optional resolve. The subscribe function returns an AsyncIterator. The resolve function transforms the published payload before sending to the client.const resolvers = {
Subscription: {
messageAdded: {
subscribe: (_parent, { channelId }, { pubsub }) => {
return pubsub.asyncIterator(`MESSAGE_ADDED.${channelId}`);
},
resolve: (payload) => payload.messageAdded,
},
},
Mutation: {
sendMessage: async (_parent, { input }, { pubsub, dataSources }) => {
const message = await dataSources.messages.create(input);
await pubsub.publish(`MESSAGE_ADDED.${input.channelId}`, {
messageAdded: message,
});
return message;
},
},
};
withFilter. Only push events that match the subscriber's criteria — do not send all events and let the client discard irrelevant ones.import { withFilter } from 'graphql-subscriptions';
const resolvers = {
Subscription: {
orderStatusChanged: {
subscribe: withFilter(
() => pubsub.asyncIterator('ORDER_STATUS_CHANGED'),
(payload, variables) => payload.orderStatusChanged.id === variables.orderId
),
},
},
};
Authenticate on the WebSocket connection, not per message. Extract the auth token from connectionParams during the initial WebSocket handshake. Reject unauthenticated connections in the onConnect handler.
Handle client-side subscriptions with useSubscription or subscribeToMore.
// Standalone subscription
const { data } = useSubscription(MESSAGE_ADDED, {
variables: { channelId },
});
// Augment an existing query with live updates
const { subscribeToMore, data } = useQuery(GET_MESSAGES, { variables: { channelId } });
useEffect(() => {
return subscribeToMore({
document: MESSAGE_ADDED,
variables: { channelId },
updateQuery: (prev, { subscriptionData }) => ({
messages: [...prev.messages, subscriptionData.data.messageAdded],
}),
});
}, [channelId, subscribeToMore]);
In-memory PubSub limitations: The default PubSub from graphql-subscriptions works only within a single Node.js process. In a multi-instance deployment, subscribers on one instance will not receive events published by another. Use Redis, Kafka, or Google PubSub for production.
Connection lifecycle: graphql-ws supports onConnect (authentication, rate limiting), onDisconnect (cleanup), and onSubscribe (per-subscription validation). Use these hooks for access control.
Scaling considerations:
Alternatives to subscriptions:
@defer and @stream directives for incremental delivery of query results (experimental)cache-and-network for simple cases with moderate freshness needshttps://the-guild.dev/graphql/ws