Help us improve
Share bugs, ideas, or general feedback.
From majestic-rails
Implements AnyCable patterns for reliable real-time features in Rails apps, including LLM streaming with message delivery guarantees, presence tracking, and Action Cable migration.
npx claudepluginhub majesticlabs-dev/majestic-marketplace --plugin majestic-railsHow this skill is triggered — by the user, by Claude, or both
Slash command
/majestic-rails:anycable-coderThis skill is limited to the following tools:
The summary Claude sees in its skill listing — used to decide when to auto-load this skill
Action Cable provides "at-most once" delivery—messages can be lost on reconnection. For LLM streaming where every chunk matters, this is insufficient.
Guides real-time communication implementation using WebSocket, SSE, Socket.io, Redis pub/sub, presence/typing indicators, CRDT collaboration with Yjs, and scaling setups.
Implements real-time features with WebSockets (Socket.io, ws), SSE, Supabase Realtime, Firebase, Pusher. Covers presence indicators, live cursors, CRDT collaboration (Yjs, Automerge), chat, notifications, Redis scaling. For live updates, chat, collaboration.
Guides building reactive Rails apps with Hotwire (Turbo Drive/Frames/Streams, Stimulus): installation, ActionCable/Redis setup, core patterns.
Share bugs, ideas, or general feedback.
Action Cable provides "at-most once" delivery—messages can be lost on reconnection. For LLM streaming where every chunk matters, this is insufficient.
AnyCable provides:
bundle add anycable-rails
bin/rails g anycable:setup
# Client (replace @rails/actioncable)
npm install @anycable/web
class LlmStreamChannel < ApplicationCable::Channel
def subscribed
stream_for current_user
end
def generate(data)
prompt = data["prompt"]
llm_client.stream(prompt) do |chunk|
LlmStreamChannel.broadcast_to(
current_user,
{ type: "chunk", content: chunk }
)
end
LlmStreamChannel.broadcast_to(
current_user,
{ type: "complete" }
)
end
end
class ChatChannel < ApplicationCable::Channel
include AnyCable::Rails::Channel::Presence
def subscribed
stream_from "chat_#{params[:room_id]}"
presence.join(current_user.id, name: current_user.name)
end
def unsubscribed
presence.leave
end
end
// Before (Action Cable)
import { createConsumer } from "@rails/actioncable"
// After (AnyCable) - same API!
import { createConsumer } from "@anycable/web"
export default createConsumer()
import { createCable } from "@anycable/web"
const cable = createCable()
// Class-based channel
import { Channel } from "@anycable/web"
class LlmStreamChannel extends Channel {
static identifier = "LlmStreamChannel"
async generate(prompt) {
return this.perform("generate", { prompt })
}
}
// Subscribe and handle chunks
const channel = new LlmStreamChannel()
cable.subscribe(channel)
await channel.ensureSubscribed()
channel.on("message", (msg) => {
if (msg.type === "chunk") {
appendToResponse(msg.content)
} else if (msg.type === "complete") {
finishResponse()
}
})
channel.generate("Explain WebSockets")
// Subscribe directly to a stream without channel class
const cable = createCable()
const stream = cable.streamFrom("llm_response/user_123")
stream.on("message", (msg) => console.log(msg))
const chatChannel = cable.subscribeTo("ChatChannel", { roomId: "42" })
// Join presence
chatChannel.presence.join(user.id, { name: user.name })
// Listen for presence events
chatChannel.presence.on("presence", (event) => {
if (event.type === "join") {
console.log("User joined:", event.id, event.info)
} else if (event.type === "leave") {
console.log("User left:", event.id)
}
})
// Get current presence
const users = await chatChannel.presence.info()
// Leave presence
chatChannel.presence.leave()
# app/channels/assistant_channel.rb
class AssistantChannel < ApplicationCable::Channel
def subscribed
stream_for current_user
end
def ask(data)
conversation_id = data["conversation_id"]
message = data["message"]
# Broadcast start
broadcast_event("start", conversation_id:)
# Stream LLM response
response = ""
llm.chat(message) do |chunk|
response += chunk
broadcast_event("chunk", conversation_id:, content: chunk)
end
# Save and broadcast completion
Message.create!(conversation_id:, role: "assistant", content: response)
broadcast_event("complete", conversation_id:)
rescue => e
broadcast_event("error", conversation_id:, message: e.message)
end
private
def broadcast_event(type, **payload)
AssistantChannel.broadcast_to(current_user, { type:, **payload })
end
def llm
@llm ||= OpenAI::Client.new
end
end
// app/javascript/channels/assistant_channel.js
import { Channel } from "@anycable/web"
export default class AssistantChannel extends Channel {
static identifier = "AssistantChannel"
constructor() {
super()
this.responseBuffer = ""
}
async ask(conversationId, message) {
this.responseBuffer = ""
return this.perform("ask", { conversation_id: conversationId, message })
}
// Override to handle message types
receive(message) {
switch (message.type) {
case "start":
this.onStart?.(message.conversation_id)
break
case "chunk":
this.responseBuffer += message.content
this.onChunk?.(message.content, this.responseBuffer)
break
case "complete":
this.onComplete?.(this.responseBuffer, message.conversation_id)
break
case "error":
this.onError?.(message.message)
break
}
}
}
# config/anycable.yml
production:
broadcast_adapter: nats
redis_url: <%= ENV.fetch("REDIS_URL") %>
# Enable reliable streams
streams_history_size: 100
streams_history_ttl: 300
<!-- app/views/layouts/application.html.erb -->
<%= action_cable_meta_tag %>
// Auto-detects from meta tag, or specify explicitly
import { createCable } from "@anycable/web"
createCable("wss://cable.example.com/cable")
web: bundle exec puma -C config/puma.rb
anycable: bundle exec anycable
ws: anycable-go
services:
web:
command: bundle exec puma
anycable:
command: bundle exec anycable
ws:
image: anycable/anycable-go:1.6
environment:
ANYCABLE_RPC_HOST: anycable:50051
ANYCABLE_REDIS_URL: redis://redis:6379
| Feature | Action Cable | AnyCable |
|---|---|---|
| Delivery guarantee | At-most once | At-least once |
| Message ordering | Not guaranteed | Guaranteed |
| History on reconnect | No | Yes (configurable) |
| Presence tracking | Manual | Built-in |
| Performance | Ruby threads | Go server |
| LLM streaming | Unreliable | Reliable |
| Anti-Pattern | Problem | Solution |
|---|---|---|
| Action Cable for LLM streaming | Lost chunks on reconnect | Use AnyCable |
| Ignoring message ordering | Garbled responses | AnyCable handles automatically |
| Manual reconnection logic | Complex, error-prone | Use AnyCable client |
| No presence tracking | Unknown user state | Use built-in presence API |
When implementing real-time features with AnyCable: