From effect-ts
This skill should be used when the user asks about "Effect Sink", "Sink.collectAll", "Sink.sum", "Sink.fold", "stream consumers", "Sink.forEach", "creating sinks", "sink operations", "sink leftovers", "sink concurrency", "Stream.run with Sink", or needs to understand how Effect Sinks consume stream data.
npx claudepluginhub andrueandersoncs/claude-skill-effect-ts --plugin effect-tsThis skill uses the workspace's default tool permissions.
A `Sink` is a consumer of stream elements that produces a result:
Searches, retrieves, and installs Agent Skills from prompts.chat registry using MCP tools like search_skills and get_skill. Activates for finding skills, browsing catalogs, or extending Claude.
Searches prompts.chat for AI prompt templates by keyword or category, retrieves by ID with variable handling, and improves prompts via AI. Use for discovering or enhancing prompts.
Checks Next.js compilation errors using a running Turbopack dev server after code edits. Fixes actionable issues before reporting complete. Replaces `next build`.
A Sink is a consumer of stream elements that produces a result:
Sink<A, In, L, E, R>;
// A - Result type (what sink produces)
// In - Input element type (what sink consumes)
// L - Leftover type (unconsumed elements)
// E - Error type
// R - Required environment
Sinks are the counterpart to Streams - while Streams produce data, Sinks consume it.
import { Stream, Sink } from "effect";
const all = yield * Stream.make(1, 2, 3, 4, 5).pipe(Stream.run(Sink.collectAll()));
const array = yield * Stream.make(1, 2, 3).pipe(Stream.run(Sink.collectAllToArray()));
const firstThree = yield * Stream.range(1, 100).pipe(Stream.run(Sink.collectAllN(3)));
const whileSmall = yield * Stream.iterate(1, (n) => n + 1).pipe(Stream.run(Sink.collectAllWhile((n) => n < 5)));
const total = yield * Stream.make(1, 2, 3, 4, 5).pipe(Stream.run(Sink.sum));
const count = yield * Stream.make("a", "b", "c").pipe(Stream.run(Sink.count));
const first = yield * Stream.make(1, 2, 3).pipe(Stream.run(Sink.head));
const last = yield * Stream.make(1, 2, 3).pipe(Stream.run(Sink.last));
const taken = yield * Stream.make(1, 2, 3, 4, 5).pipe(Stream.run(Sink.take(3)));
const product = yield * Stream.make(1, 2, 3, 4, 5).pipe(Stream.run(Sink.foldLeft(1, (acc, n) => acc * n)));
const sumUntil100 =
yield *
Stream.iterate(1, (n) => n + 1).pipe(
Stream.run(
Sink.fold(
0,
(sum) => sum < 100,
(sum, n) => sum + n,
),
),
);
const foldWithLog = Sink.foldEffect(
0,
(sum) => sum < 100,
(sum, n) =>
Effect.gen(function* () {
yield* Effect.log(`Adding ${n} to ${sum}`);
return sum + n;
}),
);
yield * Stream.make(1, 2, 3).pipe(Stream.run(Sink.forEach((n) => Effect.log(`Got: ${n}`))));
yield * Stream.make(1, 2, 3).pipe(Stream.run(Sink.drain));
const maxSink = Sink.make<number, number, never, never, never>(
// Initial state
Number.NEGATIVE_INFINITY,
// Process each element
(max, n) => (n > max ? n : max),
// Extract result
(max) => max,
);
const max = yield * Stream.make(3, 1, 4, 1, 5, 9).pipe(Stream.run(maxSink)); // 9
const logAndReturn = <A>(label: string) =>
Sink.fromEffect(
Effect.gen(function* () {
yield* Effect.log(`Starting ${label}`);
return [] as A[];
}),
);
For more control over the sink lifecycle:
const customSink = Sink.fromPush<number, number, never, never>((input) =>
Effect.sync(() =>
Option.match(input, {
onNone: () => Either.left(finalResult), // Stream ended
onSome: (chunk) => {
// Process chunk
// Return Either.right to continue, Either.left to finish
return Either.right(undefined);
},
}),
),
);
const doubledSum = Sink.sum.pipe(Sink.map((sum) => sum * 2));
const lengthSum = Sink.sum.pipe(Sink.contramap((s: string) => s.length));
const processStrings = Sink.sum.pipe(
Sink.dimap(
(s: string) => s.length,
(sum) => `Total length: ${sum}`,
),
);
const sumAndCount = Sink.zip(Sink.sum, Sink.count);
const [sum, count] = yield * Stream.make(1, 2, 3, 4, 5).pipe(Stream.run(sumAndCount));
const firstOrSum = Sink.race(Sink.head, Sink.sum.pipe(Sink.map(Option.some)));
const sumPositive = Sink.sum.pipe(Sink.filterInput((n: number) => n > 0));
const result = yield * Stream.make(-1, 2, -3, 4, -5).pipe(Stream.run(sumPositive));
Sinks can leave unconsumed elements:
const takeThree = Sink.take<number>(3);
const [first, rest] =
yield * Stream.make(1, 2, 3, 4, 5).pipe(Stream.run(Sink.take<number>(3).pipe(Sink.collectLeftover)));
const parallelSinks = Sink.zipPar(Sink.sum, Sink.count, Sink.collectAll<number>());
const [sum, count, all] = yield * Stream.make(1, 2, 3, 4, 5).pipe(Stream.run(parallelSinks));
const chunkedSum = Sink.foldChunks(
0,
() => true,
(sum, chunk: Chunk.Chunk<number>) => sum + Chunk.reduce(chunk, 0, (a, b) => a + b),
);
const batchInsert = (batchSize: number) =>
Sink.collectAllN<Record>(batchSize).pipe(
Sink.mapEffect((batch) => Effect.tryPromise(() => db.insertMany(Chunk.toArray(batch)))),
);
yield * recordStream.pipe(Stream.run(batchInsert(100)));
const stats = Sink.zip(Sink.sum, Sink.zip(Sink.count, Sink.zip(Sink.head, Sink.last))).pipe(
Sink.map(([sum, [count, [first, last]]]) => ({
sum,
count,
average: count > 0 ? sum / count : 0,
first,
last,
})),
);
const writeToFile = (path: string) =>
Sink.forEach((line: string) =>
Effect.gen(function* () {
const fs = yield* FileSystem;
yield* fs.appendFileString(path, line + "\n");
}),
);
For comprehensive sink documentation, consult ${CLAUDE_PLUGIN_ROOT}/references/llms-full.txt.
Search for these sections: