Introduction
Node.js streams process data in chunks. When a readable stream produces data faster than a writable stream can consume it, backpressure occurs. The readable stream should pause until the writable stream signals it is ready for more data. If backpressure is not handled correctly, data accumulates in memory (overflow) or chunks are dropped (data loss).
Symptoms
- Memory grows unbounded during large file processing
RangeError: Maximum call stack size exceededfrom stream recursion- Output file is truncated or corrupted
- Process OOM killed during stream processing
.pipe()returns false but code continues pushing data
```javascript // WRONG - no backpressure handling const readStream = fs.createReadStream('large-input.txt'); const writeStream = fs.createWriteStream('output.txt');
readStream.on('data', (chunk) => { // If writeStream is slow, chunks pile up in memory const processed = processChunk(chunk); writeStream.write(processed); // Ignores return value! }); // Memory grows until writeStream catches up or process crashes ```
Common Causes
- Using
.on('data')instead of.pipe()for stream processing - Ignoring
.write()return value (false means backpressure) - Not waiting for
drainevent before writing more data - Using
fs.createReadStreamwithout piping to a slow destination - Transform streams with expensive synchronous operations
Step-by-Step Fix
- 1.Use pipe() for automatic backpressure handling:
- 2.```javascript
- 3.const fs = require('fs');
- 4.const { Transform } = require('stream');
// Create a transform stream for processing const transform = new Transform({ transform(chunk, encoding, callback) { // Process chunk const processed = processChunk(chunk); callback(null, processed); // Push processed data }, });
// pipe() handles backpressure automatically fs.createReadStream('large-input.txt') .pipe(transform) .pipe(fs.createWriteStream('output.txt')) .on('finish', () => console.log('Done - all data processed')); ```
- 1.Handle backpressure manually with .on('data'):
- 2.```javascript
- 3.const fs = require('fs');
const readStream = fs.createReadStream('large-input.txt'); const writeStream = fs.createWriteStream('output.txt');
readStream.on('data', (chunk) => { const processed = processChunk(chunk);
// .write() returns false when backpressure is applied if (!writeStream.write(processed)) { // Pause reading until drain event readStream.pause(); } });
// Resume reading when writable stream is ready writeStream.on('drain', () => { readStream.resume(); });
readStream.on('end', () => { writeStream.end(); console.log('Processing complete'); }); ```
- 1.Use stream.pipeline() for error handling:
- 2.```javascript
- 3.const { pipeline } = require('stream/promises');
- 4.const fs = require('fs');
- 5.const zlib = require('zlib');
async function compressFile(input, output) { try { await pipeline( fs.createReadStream(input), zlib.createGzip(), fs.createWriteStream(output), ); console.log('Compression complete'); } catch (err) { console.error('Pipeline failed:', err.message); // pipeline() automatically cleans up all streams on error } }
compressFile('large-file.txt', 'large-file.txt.gz'); ```
- 1.Handle slow async transforms:
- 2.```javascript
- 3.const { Transform } = require('stream');
// WRONG - sync transform with async operation // callback is called before async operation completes
// CORRECT - async-aware transform const asyncTransform = new Transform({ objectMode: true, async transform(chunk, encoding, callback) { try { const result = await expensiveAsyncOperation(chunk); callback(null, result); } catch (err) { callback(err); // Propagate error through stream } }, });
// Or use stream.Transform with concurrency limit const { PassThrough } = require('stream'); const { map } = require('stream/consumers');
// Limit concurrent async operations const limitedTransform = new Transform({ objectMode: true, highWaterMark: 10, // Max 10 chunks buffered async transform(chunk, encoding, callback) { // This limits parallelism const result = await processWithLimit(chunk); callback(null, result); }, }); ```
Prevention
- Always use
.pipe()orstream.pipeline()for stream processing - Never ignore
.write()return value in manual stream handling - Set
highWaterMarkappropriately for your use case (default: 16KB) - Use
stream/promisespipeline for cleaner async stream handling - Monitor memory usage during stream processing
- Use
finished()fromstream/promisesto detect stream completion: - ```javascript
- const { finished } = require('stream/promises');
- await finished(writeStream);
`- Test with large files to verify backpressure handling works correctly