Filters and validates Bluesky Jetstream firehose events via WebSocket streams. Processes commits for posts/likes/follows, identities, and account updates in real-time.
From asi-skillsnpx claudepluginhub plurigrid/asiThis skill uses the workspace's default tool permissions.
Guides Next.js Cache Components and Partial Prerendering (PPR) with cacheComponents enabled. Implements 'use cache', cacheLife(), cacheTag(), revalidateTag(), static/dynamic optimization, and cache debugging.
Migrates code, prompts, and API calls from Claude Sonnet 4.0/4.5 or Opus 4.1 to Opus 4.5, updating model strings on Anthropic, AWS, GCP, Azure platforms.
Analyzes BMad project state from catalog CSV, configs, artifacts, and query to recommend next skills or answer questions. Useful for help requests, 'what next', or starting BMad.
GF(3) Trit: -1 (MINUS - validator/filter on incoming data stream) Role: Constrain and validate Bluesky firehose events before processing
Jetstream is Bluesky's simplified JSON firehose - a WebSocket streaming API that provides real-time access to all public activity on the Bluesky network. Unlike the full atproto firehose (which uses CBOR/CAR binary encoding), Jetstream delivers plain JSON, making it accessible for rapid prototyping.
Endpoints:
wss://jetstream2.us-east.bsky.network/subscribewss://jetstream1.us-west.bsky.network/subscribeSource: github.com/bluesky-social/jetstream
| Parameter | Type | Description |
|---|---|---|
wantedCollections | string[] | Filter by NSID (repeatable) |
wantedDids | string[] | Filter by specific DIDs (repeatable) |
compress | boolean | Enable zstd compression |
cursor | integer | Resume from timestamp (microseconds since epoch) |
Example URL:
wss://jetstream2.us-east.bsky.network/subscribe?wantedCollections=app.bsky.feed.post&wantedCollections=app.bsky.feed.like
Repository commit events (most common):
{
"did": "did:plc:abc123...",
"time_us": 1703123456789012,
"kind": "commit",
"commit": {
"rev": "3k...",
"operation": "create",
"collection": "app.bsky.feed.post",
"rkey": "3k...",
"record": {
"$type": "app.bsky.feed.post",
"text": "Hello world!",
"createdAt": "2024-12-21T10:00:00.000Z"
},
"cid": "bafyrei..."
}
}
Identity updates:
{
"did": "did:plc:abc123...",
"time_us": 1703123456789012,
"kind": "identity",
"identity": {
"did": "did:plc:abc123...",
"handle": "alice.bsky.social",
"seq": 12345
}
}
Account status changes:
{
"did": "did:plc:abc123...",
"time_us": 1703123456789012,
"kind": "account",
"account": {
"active": true,
"did": "did:plc:abc123...",
"seq": 12345
}
}
| NSID | Description |
|---|---|
app.bsky.feed.post | Posts/skeets |
app.bsky.feed.like | Likes |
app.bsky.feed.repost | Reposts |
app.bsky.graph.follow | Follows |
app.bsky.graph.block | Blocks |
app.bsky.graph.list | Lists |
app.bsky.graph.listitem | List memberships |
app.bsky.actor.profile | Profile updates |
app.bsky.feed.threadgate | Thread gates |
app.bsky.feed.postgate | Post gates |
const ws = new WebSocket(
'wss://jetstream2.us-east.bsky.network/subscribe?' +
'wantedCollections=app.bsky.feed.post'
);
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.kind === 'commit' && data.commit.operation === 'create') {
console.log(`[${data.did}]: ${data.commit.record.text}`);
}
};
(ns bluesky.jetstream
(:require [clojure.data.json :as json]
[org.httpkit.client :as http]))
(def GAMMA 0x9E3779B9)
(defn splitmix32 [x]
(let [z (bit-and (+ x GAMMA) 0xFFFFFFFF)
z (bit-and (* (bit-xor z (unsigned-bit-shift-right z 15)) 0x85EBCA6B) 0xFFFFFFFF)
z (bit-and (* (bit-xor z (unsigned-bit-shift-right z 13)) 0xC2B2AE35) 0xFFFFFFFF)]
(bit-xor z (unsigned-bit-shift-right z 16))))
(defn cid->trit [cid]
(let [hash (reduce #(bit-xor (unchecked-multiply %1 31) (int %2)) 0 cid)
hue (mod (splitmix32 hash) 360)]
(cond
(or (< hue 60) (>= hue 300)) +1 ; PLUS (warm)
(< hue 180) 0 ; ERGODIC (neutral)
:else -1))) ; MINUS (cold)
(defn process-event [event]
(when (and (= (:kind event) "commit")
(= (get-in event [:commit :operation]) "create"))
(let [cid (get-in event [:commit :cid])
text (get-in event [:commit :record :text])
trit (cid->trit cid)]
{:did (:did event)
:text text
:cid cid
:trit trit
:trit-label (case trit 1 "PLUS" 0 "ERGODIC" -1 "MINUS")})))
;; Feed Jetstream events into memory substrate
(defn ingest-to-substrate [event memory-client]
(when-let [processed (process-event event)]
;; Compress into control artifact
(compress-trace memory-client
[{:role :user
:content (format "Bluesky post from %s: %s"
(:did processed) (:text processed))}]
{:tags ["bluesky" (:trit-label processed)]
:source "jetstream"
:cid (:cid processed)})))
Posts from Jetstream get the same decay epistemology:
Each ingested post gets a trit assignment based on CID hash:
Σ(post trits) ≡ 0 (mod 3)
Over large samples, the distribution naturally balances due to SplitMix64's uniform properties.
time_us to resume after disconnecttime_us to wall clock for lag detectionCREATE TABLE IF NOT EXISTS bluesky_firehose (
seq_id BIGINT PRIMARY KEY,
did VARCHAR NOT NULL,
collection VARCHAR NOT NULL,
rkey VARCHAR,
cid VARCHAR,
operation VARCHAR, -- create | update | delete
record_json JSON,
text VARCHAR,
created_at TIMESTAMP,
trit INTEGER, -- -1, 0, +1
color_hex VARCHAR, -- Gay.jl deterministic color
cursor_us BIGINT, -- For resumption
ingested_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
INDEX idx_did (did),
INDEX idx_collection (collection),
INDEX idx_trit (trit),
INDEX idx_cursor (cursor_us)
);
-- View for GF(3) balance check
CREATE VIEW bluesky_gf3_balance AS
SELECT
DATE_TRUNC('hour', ingested_at) as hour,
SUM(trit) as trit_sum,
MOD(SUM(trit), 3) as gf3_remainder,
COUNT(*) as event_count
FROM bluesky_firehose
GROUP BY 1
ORDER BY 1 DESC;
The triadic processing of Jetstream events mirrors Savitch's reachability algorithm:
NSPACE(S(n)) ⊆ DSPACE(S(n)²)
Firehose Event Processing:
├── MINUS (-1): Validate/filter incoming event
├── ERGODIC (0): Route to appropriate handler
└── PLUS (+1): Generate processed artifact
Each branch: O(S(n)) space
Recursion depth: O(log n) for branching decisions
Total: O(S(n)²) deterministic space simulating nondeterministic choices
# Start Babashka consumer
bb scripts/bluesky_jetstream.clj --collections app.bsky.feed.post --limit 1000
# Open HTML viewer
open scripts/bluesky_viewer.html
# Query stored events
duckdb ~/.topos/ducklake.duckdb "SELECT * FROM bluesky_firehose ORDER BY cursor_us DESC LIMIT 10"
# Check GF(3) balance
duckdb ~/.topos/ducklake.duckdb "SELECT * FROM bluesky_gf3_balance LIMIT 24"
duckdb-temporal-versioning (+1): Store firehose with time-travelgay-mcp (+1): Deterministic coloring for CIDsmemory-substrate (0): Ingest into Agent-o-rama memorySkill Name: bluesky-jetstream Type: Real-time data stream consumer Trit: -1 (MINUS - validator/filter) GF(3) Triad: bluesky-jetstream (-1) ⊗ duckdb-temporal (0) ⊗ gay-mcp (+1) = 0 ✓
This skill connects to the K-Dense-AI/claude-scientific-skills ecosystem:
general: 734 citations in bib.duckdbThis skill maps to Cat# = Comod(P) as a bicomodule in the equipment structure:
Trit: 0 (ERGODIC)
Home: Prof
Poly Op: ⊗
Kan Role: Adj
Color: #26D826
The skill participates in triads satisfying:
(-1) + (0) + (+1) ≡ 0 (mod 3)
This ensures compositional coherence in the Cat# equipment structure.