From perplexity-pack
Build event-driven architectures around Perplexity Sonar API with streaming, batch pipelines, and scheduled search monitoring. Trigger with phrases like "perplexity streaming", "perplexity events", "perplexity batch search", "perplexity news monitor", "perplexity SSE".
npx claudepluginhub flight505/skill-forge --plugin perplexity-packThis skill is limited to using the following tools:
Build event-driven architectures around Perplexity Sonar API. Perplexity does not have webhooks -- all interactions are request/response. Event patterns are built using streaming SSE, job queues for batch processing, and cron-triggered monitoring.
Guides Next.js Cache Components and Partial Prerendering (PPR): 'use cache' directives, cacheLife(), cacheTag(), revalidateTag() for caching, invalidation, static/dynamic optimization. Auto-activates on cacheComponents: true.
Guides building MCP servers enabling LLMs to interact with external services via tools. Covers best practices, TypeScript/Node (MCP SDK), Python (FastMCP).
Share bugs, ideas, or general feedback.
Build event-driven architectures around Perplexity Sonar API. Perplexity does not have webhooks -- all interactions are request/response. Event patterns are built using streaming SSE, job queues for batch processing, and cron-triggered monitoring.
| Pattern | Trigger | Use Case |
|---|---|---|
| Streaming SSE | Client request | Real-time search with progressive rendering |
| Batch queue | Job submission | Research automation, report generation |
| Scheduled search | Cron job | News monitoring, trend alerts, competitive intel |
| Citation pipeline | Post-processing | Source verification, link validation |
openai package installedPERPLEXITY_API_KEY setimport OpenAI from "openai";
import express from "express";
const perplexity = new OpenAI({
apiKey: process.env.PERPLEXITY_API_KEY!,
baseURL: "https://api.perplexity.ai",
});
const app = express();
app.use(express.json());
app.post("/api/search/stream", async (req, res) => {
const { query, model = "sonar" } = req.body;
res.writeHead(200, {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
});
try {
const stream = await perplexity.chat.completions.create({
model,
messages: [{ role: "user", content: query }],
stream: true,
max_tokens: 2048,
});
let fullText = "";
for await (const chunk of stream) {
const text = chunk.choices[0]?.delta?.content || "";
fullText += text;
res.write(`data: ${JSON.stringify({ type: "text", content: text })}\n\n`);
// Citations arrive in the final chunk
const citations = (chunk as any).citations;
if (citations) {
res.write(`data: ${JSON.stringify({ type: "citations", urls: citations })}\n\n`);
}
}
res.write(`data: ${JSON.stringify({ type: "done", totalLength: fullText.length })}\n\n`);
} catch (err: any) {
res.write(`data: ${JSON.stringify({ type: "error", message: err.message })}\n\n`);
}
res.end();
});
import { Queue, Worker } from "bullmq";
const searchQueue = new Queue("perplexity-research", {
connection: { host: "localhost", port: 6379 },
});
async function submitResearchBatch(
queries: string[],
callbackUrl: string,
model: string = "sonar-pro"
) {
const batchId = crypto.randomUUID();
for (const query of queries) {
await searchQueue.add("search", { batchId, query, callbackUrl, model }, {
attempts: 3,
backoff: { type: "exponential", delay: 2000 },
});
}
return { batchId, totalQueries: queries.length };
}
const worker = new Worker("perplexity-research", async (job) => {
const { query, callbackUrl, batchId, model } = job.data;
const response = await perplexity.chat.completions.create({
model,
messages: [{ role: "user", content: query }],
max_tokens: 2048,
});
const result = {
event: "perplexity.search.completed",
batchId,
query,
answer: response.choices[0].message.content,
citations: (response as any).citations || [],
model: response.model,
tokens: response.usage?.total_tokens,
};
// Deliver result via callback
await fetch(callbackUrl, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(result),
});
}, {
connection: { host: "localhost", port: 6379 },
concurrency: 3, // Stay within rate limits
limiter: { max: 40, duration: 60000 }, // 40 RPM safety margin
});
// Run via cron: every 6 hours
async function monitorTopics(
topics: string[],
webhookUrl: string
) {
for (const topic of topics) {
const response = await perplexity.chat.completions.create({
model: "sonar",
messages: [{
role: "system",
content: "Summarize the latest developments. Be concise. Include only new information.",
}, {
role: "user",
content: `Latest developments about "${topic}" in the past 24 hours`,
}],
search_recency_filter: "day",
max_tokens: 500,
} as any);
const answer = response.choices[0].message.content || "";
const citations = (response as any).citations || [];
// Only notify if there are actual developments
if (citations.length > 0 && answer.length > 100) {
await fetch(webhookUrl, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
event: "perplexity.monitor.update",
topic,
summary: answer,
citations,
timestamp: new Date().toISOString(),
}),
});
}
// Rate limit protection
await new Promise((r) => setTimeout(r, 2000));
}
}
// Browser client consuming the streaming endpoint
function consumeSearchStream(
query: string,
onText: (text: string) => void,
onCitations: (urls: string[]) => void,
onDone: () => void
) {
fetch("/api/search/stream", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ query }),
}).then(async (response) => {
const reader = response.body!.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const lines = decoder.decode(value).split("\n");
for (const line of lines) {
if (!line.startsWith("data: ")) continue;
const event = JSON.parse(line.slice(6));
if (event.type === "text") onText(event.content);
if (event.type === "citations") onCitations(event.urls);
if (event.type === "done") onDone();
}
}
});
}
| Issue | Cause | Solution |
|---|---|---|
| Stream stalls | Complex search taking too long | Set per-chunk timeout (10s) |
| 429 in batch | Too many concurrent workers | Reduce concurrency, add rate limiter |
| Empty monitor alerts | Topic too niche | Broaden topic or reduce recency filter |
| Callback fails | Webhook URL down | Retry with exponential backoff |
For deployment setup, see perplexity-deploy-integration.