From effect-ts
This skill should be used when the user asks about "Effect Stream", "Stream.from", "Stream.map", "Stream.filter", "Stream.run", "streaming data", "async iteration", "Sink", "Channel", "Stream.concat", "Stream.merge", "backpressure", "Stream.fromIterable", "chunked processing", "real-time data", or needs to understand how Effect handles streaming data processing.
npx claudepluginhub andrueandersoncs/claude-skill-effect-ts --plugin effect-tsThis skill uses the workspace's default tool permissions.
Effect Streams provide:
Searches, retrieves, and installs Agent Skills from prompts.chat registry using MCP tools like search_skills and get_skill. Activates for finding skills, browsing catalogs, or extending Claude.
Searches prompts.chat for AI prompt templates by keyword or category, retrieves by ID with variable handling, and improves prompts via AI. Use for discovering or enhancing prompts.
Checks Next.js compilation errors using a running Turbopack dev server after code edits. Fixes actionable issues before reporting complete. Replaces `next build`.
Effect Streams provide:
Stream<A, E, R>;
// Produces values of type A
// May fail with error E
// Requires environment R
import { Stream } from "effect";
const numbers = Stream.make(1, 2, 3, 4, 5);
const fromArray = Stream.fromIterable([1, 2, 3]);
const empty = Stream.empty;
const single = Stream.succeed(42);
const infinite = Stream.iterate(1, (n) => n + 1);
const fromEffect = Stream.fromEffect(fetchData());
const polling = Stream.repeatEffect(checkStatus());
const scheduled = Stream.repeatEffectWithSchedule(checkStatus(), Schedule.spaced("5 seconds"));
// From async iterable
const fromAsyncIterable = Stream.fromAsyncIterable(asyncGenerator(), (error) => new StreamError({ cause: error }));
// From callback/event emitter
const fromCallback = Stream.async<number, never>((emit) => {
const handler = (value: number) => emit.single(value);
eventEmitter.on("data", handler);
return Effect.sync(() => eventEmitter.off("data", handler));
});
// From queue
const fromQueue = Stream.fromQueue(queue);
const naturals = Stream.unfold(1, (n) => Option.some([n, n + 1]));
const range = Stream.range(1, 100);
const repeated = Stream.repeat(Stream.succeed("ping")).pipe(Stream.take(5));
const doubled = numbers.pipe(Stream.map((n) => n * 2));
const enriched = users.pipe(Stream.mapEffect((user) => fetchProfile(user.id)));
const parallel = items.pipe(Stream.mapEffect(process, { concurrency: 10 }));
const evens = numbers.pipe(Stream.filter((n) => n % 2 === 0));
const valid = items.pipe(Stream.filterEffect((item) => validate(item)));
const expanded = numbers.pipe(Stream.flatMap((n) => Stream.make(n, n * 10, n * 100)));
// 1, 10, 100, 2, 20, 200, ...
const first5 = numbers.pipe(Stream.take(5));
const skip5 = numbers.pipe(Stream.drop(5));
const firstWhile = numbers.pipe(Stream.takeWhile((n) => n < 10));
const dropWhile = numbers.pipe(Stream.dropWhile((n) => n < 10));
const combined = Stream.concat(stream1, stream2);
// or
const combined = stream1.pipe(Stream.concat(stream2));
// Interleave elements from both
const merged = Stream.merge(stream1, stream2);
// Merge multiple
const allMerged = Stream.mergeAll([s1, s2, s3], { concurrency: 3 });
const zipped = Stream.zip(names, ages);
// Stream<[string, number]>
// With function
const combined = Stream.zipWith(names, ages, (name, age) => ({ name, age }));
const interleaved = Stream.interleave(stream1, stream2);
// a1, b1, a2, b2, ...
const array = yield * Stream.runCollect(numbers);
const first = yield * Stream.runHead(numbers);
const sum = yield * Stream.runFold(numbers, 0, (acc, n) => acc + n);
yield * numbers.pipe(Stream.runForEach((n) => Effect.log(`Got: ${n}`)));
yield * numbers.pipe(Stream.runDrain);
import { Sink } from "effect";
const sum = yield * numbers.pipe(Stream.run(Sink.sum));
const array = yield * numbers.pipe(Stream.run(Sink.collectAll()));
Streams process elements in chunks for efficiency:
const chunked = numbers.pipe(Stream.grouped(10));
const processed = numbers.pipe(Stream.mapChunks((chunk) => Chunk.map(chunk, (n) => n * 2)));
const rechunked = numbers.pipe(Stream.rechunk(100));
const safe = stream.pipe(Stream.catchAll((error) => Stream.succeed(fallbackValue)));
const handled = stream.pipe(Stream.catchTag("NetworkError", (error) => Stream.succeed(cachedValue)));
const resilient = stream.pipe(Stream.retry(Schedule.exponential("1 second")));
const withFallback = stream.pipe(Stream.orElse(() => fallbackStream));
// Stream with resource lifecycle
const fileStream = Stream.acquireRelease(
Effect.sync(() => fs.openSync("data.txt", "r")),
(fd) => Effect.sync(() => fs.closeSync(fd)),
).pipe(
Stream.flatMap((fd) =>
Stream.repeatEffectOption(
Effect.sync(() => {
const buffer = Buffer.alloc(1024);
const bytes = fs.readSync(fd, buffer);
return bytes > 0 ? Option.some(buffer.slice(0, bytes)) : Option.none();
}),
),
),
);
// Scoped streams
const scoped = Stream.scoped(Effect.acquireRelease(openConnection, closeConnection));
Sinks consume stream elements:
import { Sink } from "effect";
Sink.sum;
Sink.count;
Sink.head;
Sink.last;
Sink.collectAll();
Sink.forEach(f);
const maxSink = Sink.foldLeft(Number.NEGATIVE_INFINITY, (max, n: number) => Math.max(max, n));
const batchProcess = stream.pipe(
Stream.grouped(100),
Stream.mapEffect((batch) => Effect.tryPromise(() => api.processBatch(Chunk.toArray(batch)))),
);
const rateLimited = stream.pipe(
Stream.throttle({
units: 1,
duration: "100 millis",
strategy: "shape",
}),
);
const debounced = stream.pipe(Stream.debounce("500 millis"));
// Time-based windows
const windows = stream.pipe(Stream.groupedWithin(1000, "1 second"));
For comprehensive stream documentation, consult ${CLAUDE_PLUGIN_ROOT}/references/llms-full.txt.
Search for these sections: