From harness-claude
Processes large data efficiently using Node.js Readable, Writable, Transform streams and pipelines for file I/O, data transformation, HTTP responses, and backpressure handling.
npx claudepluginhub intense-visions/harness-engineering --plugin harness-claudeThis skill uses the workspace's default tool permissions.
> Process large data efficiently using Node.js Readable, Writable, and Transform streams
Provides Node.js best practices with TypeScript via type stripping (Node 22+), covering async patterns, error handling, streams, modules, testing, performance, caching, logging. For native TS setups, graceful shutdown, flaky tests, and env config.
Performs Node.js file system operations using fs.promises, path utilities for safe paths, recursive mkdir/readdir, file watching, atomic writes, and large file streaming.
Guides Java Streams API usage for functional-style data processing on collections, covering stream creation, filter, map, flatMap, and terminal operations like collect and reduce.
Share bugs, ideas, or general feedback.
Process large data efficiently using Node.js Readable, Writable, and Transform streams
import { createReadStream } from 'node:fs';
import { pipeline } from 'node:stream/promises';
const readable = createReadStream('large-file.csv', { encoding: 'utf-8' });
for await (const chunk of readable) {
processChunk(chunk);
}
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';
import { pipeline } from 'node:stream/promises';
await pipeline(createReadStream('input.json'), createGzip(), createWriteStream('output.json.gz'));
import { Transform } from 'node:stream';
const uppercase = new Transform({
transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
},
});
await pipeline(createReadStream('input.txt'), uppercase, createWriteStream('output.txt'));
import { Transform } from 'node:stream';
const filterActive = new Transform({
objectMode: true,
transform(user, encoding, callback) {
if (user.isActive) this.push(user);
callback();
},
});
import { Readable } from 'node:stream';
async function* generateRecords() {
for (let i = 0; i < 1_000_000; i++) {
yield { id: i, value: Math.random() };
}
}
const readable = Readable.from(generateRecords());
import { createReadStream } from 'node:fs';
app.get('/download', (req, res) => {
res.setHeader('Content-Type', 'application/octet-stream');
const stream = createReadStream('large-file.zip');
stream.pipe(res);
stream.on('error', (err) => {
res.status(500).end('Error reading file');
});
});
import { createReadStream } from 'node:fs';
import { createInterface } from 'node:readline';
const rl = createInterface({
input: createReadStream('data.csv'),
crlfDelay: Infinity,
});
for await (const line of rl) {
const [name, email] = line.split(',');
await processUser({ name, email });
}
pipeline instead of .pipe() — pipeline handles error propagation and cleanup:// Bad: errors not propagated, streams not cleaned up
readable.pipe(transform).pipe(writable);
// Good: errors propagated, streams destroyed on error
await pipeline(readable, transform, writable);
Streams process data in chunks without loading the entire dataset into memory. A 10GB file can be processed with a 64KB buffer.
Stream types:
Backpressure: When a writable stream cannot consume data as fast as the readable produces it, Node.js automatically pauses the readable stream. This prevents memory exhaustion. pipeline handles backpressure correctly; manual .pipe() does not always.
highWaterMark: Controls the internal buffer size (default 16KB for byte streams, 16 objects for object mode). Lower values reduce memory but increase I/O operations.
Trade-offs:
readFilepipeline handles errors and cleanup — but requires all streams to implement destroy correctlyhttps://nodejs.org/api/stream.html