Master Node.js streams for memory-efficient processing of large datasets, real-time data handling, and building data pipelines
Process large files and real-time data efficiently using Node.js streams for memory-constrained operations. Use when handling GB+ files, building data pipelines, or streaming HTTP requests/responses to avoid loading entire datasets into memory.
/plugin marketplace add pluginagentmarketplace/custom-plugin-nodejs/plugin install nodejs-developer-plugin@pluginagentmarketplace-nodejsThis skill inherits all available tools. When active, it can use any tool Claude has access to.
assets/config.yamlassets/schema.jsonreferences/GUIDE.mdreferences/PATTERNS.mdscripts/validate.pyMaster streams for memory-efficient processing of large files, real-time data, and building composable data pipelines.
Streams in 4 types:
const fs = require('fs');
// Create readable stream
const readStream = fs.createReadStream('large-file.txt', {
encoding: 'utf8',
highWaterMark: 64 * 1024 // 64KB chunks
});
// Event-based consumption
readStream.on('data', (chunk) => {
console.log(`Received ${chunk.length} bytes`);
});
readStream.on('end', () => {
console.log('Finished reading');
});
readStream.on('error', (err) => {
console.error('Read error:', err);
});
const writeStream = fs.createWriteStream('output.txt');
// Write data
writeStream.write('Hello, ');
writeStream.write('World!\n');
writeStream.end(); // Signal end
// Handle backpressure
const ok = writeStream.write(data);
if (!ok) {
// Wait for drain event before writing more
writeStream.once('drain', () => {
continueWriting();
});
}
const { Transform } = require('stream');
// Custom transform: uppercase text
const upperCase = new Transform({
transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
}
});
// Usage
fs.createReadStream('input.txt')
.pipe(upperCase)
.pipe(fs.createWriteStream('output.txt'));
const { pipeline } = require('stream/promises');
const zlib = require('zlib');
// Compose streams with error handling
async function compressFile(input, output) {
await pipeline(
fs.createReadStream(input),
zlib.createGzip(),
fs.createWriteStream(output)
);
console.log('Compression complete');
}
// With transform
await pipeline(
fs.createReadStream('data.csv'),
csvParser(),
transformRow(),
jsonStringify(),
fs.createWriteStream('data.json')
);
const { pipeline } = require('stream');
pipeline(
source,
transform1,
transform2,
destination,
(err) => {
if (err) {
console.error('Pipeline failed:', err);
} else {
console.log('Pipeline succeeded');
}
}
);
const http = require('http');
const fs = require('fs');
// Stream file as HTTP response
http.createServer((req, res) => {
const filePath = './video.mp4';
const stat = fs.statSync(filePath);
res.writeHead(200, {
'Content-Type': 'video/mp4',
'Content-Length': stat.size
});
// Stream instead of loading entire file
fs.createReadStream(filePath).pipe(res);
}).listen(3000);
// Stream HTTP request body
http.createServer((req, res) => {
const writeStream = fs.createWriteStream('./upload.bin');
req.pipe(writeStream);
req.on('end', () => {
res.end('Upload complete');
});
}).listen(3001);
const { Transform } = require('stream');
const jsonParser = new Transform({
objectMode: true,
transform(chunk, encoding, callback) {
try {
const obj = JSON.parse(chunk);
this.push(obj);
callback();
} catch (err) {
callback(err);
}
}
});
// Process objects instead of buffers
const processRecords = new Transform({
objectMode: true,
transform(record, encoding, callback) {
record.processed = true;
record.timestamp = Date.now();
this.push(record);
callback();
}
});
const { Readable } = require('stream');
// Create from async iterator
async function* generateData() {
for (let i = 0; i < 100; i++) {
yield { id: i, data: `item-${i}` };
}
}
const stream = Readable.from(generateData(), { objectMode: true });
// Consume with for-await
async function processStream(readable) {
for await (const chunk of readable) {
console.log('Processing:', chunk);
}
}
const readable = fs.createReadStream('huge-file.txt');
const writable = fs.createWriteStream('output.txt');
readable.on('data', (chunk) => {
// Check if writable can accept more data
const canContinue = writable.write(chunk);
if (!canContinue) {
// Pause reading until writable is ready
readable.pause();
writable.once('drain', () => {
readable.resume();
});
}
});
// Or use pipeline (handles automatically)
pipeline(readable, writable, (err) => {
if (err) console.error('Error:', err);
});
const { Readable } = require('stream');
class DatabaseStream extends Readable {
constructor(query, options) {
super({ ...options, objectMode: true });
this.query = query;
this.cursor = null;
}
async _read() {
if (!this.cursor) {
this.cursor = await db.collection('items').find(this.query).cursor();
}
const doc = await this.cursor.next();
if (doc) {
this.push(doc);
} else {
this.push(null); // Signal end
}
}
}
// Usage
const dbStream = new DatabaseStream({ status: 'active' });
for await (const item of dbStream) {
console.log(item);
}
const { Readable, Transform } = require('stream');
const { pipeline } = require('stream/promises');
describe('Stream Processing', () => {
it('should transform data correctly', async () => {
const input = Readable.from(['hello', 'world']);
const chunks = [];
const upperCase = new Transform({
transform(chunk, enc, cb) {
this.push(chunk.toString().toUpperCase());
cb();
}
});
await pipeline(
input,
upperCase,
async function* (source) {
for await (const chunk of source) {
chunks.push(chunk.toString());
}
}
);
expect(chunks).toEqual(['HELLO', 'WORLD']);
});
});
| Problem | Cause | Solution |
|---|---|---|
| Memory grows infinitely | No backpressure | Use pipeline or handle drain |
| Data loss | Errors not caught | Use pipeline with error callback |
| Slow processing | Small chunk size | Increase highWaterMark |
| Stream hangs | Missing end() call | Call writable.end() |
Use streams when:
This skill should be used when the user asks to "create an agent", "add an agent", "write a subagent", "agent frontmatter", "when to use description", "agent examples", "agent tools", "agent colors", "autonomous agent", or needs guidance on agent structure, system prompts, triggering conditions, or agent development best practices for Claude Code plugins.
This skill should be used when the user asks to "create a slash command", "add a command", "write a custom command", "define command arguments", "use command frontmatter", "organize commands", "create command with file references", "interactive command", "use AskUserQuestion in command", or needs guidance on slash command structure, YAML frontmatter fields, dynamic arguments, bash execution in commands, user interaction patterns, or command development best practices for Claude Code.
This skill should be used when the user asks to "create a hook", "add a PreToolUse/PostToolUse/Stop hook", "validate tool use", "implement prompt-based hooks", "use ${CLAUDE_PLUGIN_ROOT}", "set up event-driven automation", "block dangerous commands", or mentions hook events (PreToolUse, PostToolUse, Stop, SubagentStop, SessionStart, SessionEnd, UserPromptSubmit, PreCompact, Notification). Provides comprehensive guidance for creating and implementing Claude Code plugin hooks with focus on advanced prompt-based hooks API.