Node.js Streaming: Readable and Writable Streams

Node.js streams solve a fundamental problem: how do you process data that's too large to fit in memory? The naive approach loads everything at once, which works fine until you're dealing with...

Key Insights

  • Streams process data in chunks rather than loading everything into memory, making them essential for handling large files or real-time data without crashing your application.
  • Backpressure handling is critical—ignoring return values from write() operations can lead to memory bloat and degraded performance in production systems.
  • The pipeline() utility should be your default choice for connecting streams, as it automatically handles error propagation and cleanup that manual pipe() chains miss.

Understanding the Stream Advantage

Node.js streams solve a fundamental problem: how do you process data that’s too large to fit in memory? The naive approach loads everything at once, which works fine until you’re dealing with gigabyte-sized log files or video uploads.

Consider this comparison:

const fs = require('fs');

// Bad: Loads entire file into memory
fs.readFile('large-video.mp4', (err, data) => {
  if (err) throw err;
  // If this file is 2GB, you just consumed 2GB of RAM
  console.log(`Loaded ${data.length} bytes`);
});

// Good: Processes file in chunks
const stream = fs.createReadStream('large-video.mp4');
let bytesRead = 0;

stream.on('data', (chunk) => {
  bytesRead += chunk.length;
  // Only holds one chunk in memory at a time (default 64KB)
});

stream.on('end', () => {
  console.log(`Processed ${bytesRead} bytes`);
});

The streaming approach uses a constant, predictable amount of memory regardless of file size. For a 2GB file, the difference is between consuming 2GB of RAM versus roughly 64KB.

Readable Streams Deep Dive

Readable streams operate in two modes: flowing and paused. In flowing mode, data is automatically read and emitted through events. In paused mode, you must explicitly call read() to consume data.

Here’s a practical example reading a log file:

const fs = require('fs');
const readable = fs.createReadStream('access.log', {
  encoding: 'utf8',
  highWaterMark: 16 * 1024 // 16KB chunks
});

readable.on('data', (chunk) => {
  console.log(`Received ${chunk.length} characters`);
  // Process chunk here
});

readable.on('end', () => {
  console.log('Finished reading file');
});

readable.on('error', (err) => {
  console.error('Stream error:', err);
});

The pipe() method provides a cleaner way to connect streams:

const fs = require('fs');
const zlib = require('zlib');

// Read file, compress it, write to new file
fs.createReadStream('large-file.txt')
  .pipe(zlib.createGzip())
  .pipe(fs.createWriteStream('large-file.txt.gz'));

This works, but it has a critical flaw we’ll address later: poor error handling.

Writable Streams and Backpressure

Writable streams accept data, but they can’t always accept it as fast as you can send it. This is where backpressure comes in. When a writable stream’s internal buffer fills up, write() returns false, signaling you to pause until the drain event fires.

Ignoring backpressure is one of the most common mistakes in Node.js:

const fs = require('fs');
const writable = fs.createWriteStream('output.txt');

// Wrong: Ignores backpressure
for (let i = 0; i < 1000000; i++) {
  writable.write(`Line ${i}\n`);
  // If the stream can't keep up, this fills memory
}

// Correct: Respects backpressure
function writeWithBackpressure(writable, data, callback) {
  let i = 0;
  
  function write() {
    let ok = true;
    while (i < data.length && ok) {
      ok = writable.write(data[i]);
      i++;
    }
    
    if (i < data.length) {
      // Buffer is full, wait for drain
      writable.once('drain', write);
    } else {
      writable.end(callback);
    }
  }
  
  write();
}

const data = Array.from({ length: 1000000 }, (_, i) => `Line ${i}\n`);
writeWithBackpressure(writable, data, () => {
  console.log('Write complete');
});

Creating Custom Streams

Custom streams let you integrate any data source or destination into Node’s stream ecosystem. You extend base classes and implement specific methods.

Here’s a custom Readable that generates sequential numbers:

const { Readable } = require('stream');

class NumberStream extends Readable {
  constructor(max, options) {
    super(options);
    this.current = 1;
    this.max = max;
  }
  
  _read() {
    if (this.current <= this.max) {
      this.push(`${this.current}\n`);
      this.current++;
    } else {
      this.push(null); // Signals end of stream
    }
  }
}

const numbers = new NumberStream(100);
numbers.pipe(process.stdout);

A custom Writable that processes JSON lines:

const { Writable } = require('stream');

class JsonLineProcessor extends Writable {
  constructor(options) {
    super(options);
    this.records = [];
  }
  
  _write(chunk, encoding, callback) {
    try {
      const line = chunk.toString().trim();
      if (line) {
        const record = JSON.parse(line);
        this.records.push(record);
      }
      callback(); // Signal completion
    } catch (err) {
      callback(err); // Signal error
    }
  }
  
  _final(callback) {
    // Called before stream closes
    console.log(`Processed ${this.records.length} records`);
    callback();
  }
}

// Usage
const processor = new JsonLineProcessor();
fs.createReadStream('data.jsonl').pipe(processor);

Robust Stream Pipelines

The pipeline() function from the stream module is the correct way to chain streams. It handles errors and cleanup automatically:

const { pipeline } = require('stream');
const fs = require('fs');
const zlib = require('zlib');

pipeline(
  fs.createReadStream('input.txt'),
  zlib.createGzip(),
  fs.createWriteStream('input.txt.gz'),
  (err) => {
    if (err) {
      console.error('Pipeline failed:', err);
    } else {
      console.log('Pipeline succeeded');
    }
  }
);

Compare this to manual pipe() chains, which require explicit error handling on each stream:

// Manual approach - error prone
const readable = fs.createReadStream('input.txt');
const gzip = zlib.createGzip();
const writable = fs.createWriteStream('input.txt.gz');

readable.on('error', handleError);
gzip.on('error', handleError);
writable.on('error', handleError);

readable.pipe(gzip).pipe(writable);

function handleError(err) {
  console.error('Stream error:', err);
  readable.destroy();
  gzip.destroy();
  writable.destroy();
}

Here’s a real-world example processing CSV data:

const { pipeline, Transform } = require('stream');
const fs = require('fs');

// Custom transform stream to parse CSV and filter rows
class CsvFilter extends Transform {
  constructor(options) {
    super({ ...options, objectMode: true });
    this.headersParsed = false;
    this.headers = [];
  }
  
  _transform(chunk, encoding, callback) {
    const lines = chunk.toString().split('\n');
    
    for (const line of lines) {
      if (!line.trim()) continue;
      
      const values = line.split(',');
      
      if (!this.headersParsed) {
        this.headers = values;
        this.headersParsed = true;
        continue;
      }
      
      const row = {};
      this.headers.forEach((header, i) => {
        row[header.trim()] = values[i]?.trim();
      });
      
      // Filter: only rows where age > 18
      if (parseInt(row.age) > 18) {
        this.push(JSON.stringify(row) + '\n');
      }
    }
    
    callback();
  }
}

pipeline(
  fs.createReadStream('users.csv'),
  new CsvFilter(),
  fs.createWriteStream('adults.jsonl'),
  (err) => {
    if (err) {
      console.error('Processing failed:', err);
      process.exit(1);
    }
    console.log('CSV processing complete');
  }
);

Critical Best Practices

Always clean up streams. Unclosed streams can leak file descriptors and memory:

const stream = fs.createReadStream('file.txt');

// Bad: Stream might stay open if error occurs
stream.on('data', (chunk) => {
  if (someCondition) {
    return; // Stream still open!
  }
});

// Good: Explicit cleanup
stream.on('data', (chunk) => {
  if (someCondition) {
    stream.destroy();
    return;
  }
});

Use finished() to detect stream completion reliably:

const { finished } = require('stream');

const stream = fs.createReadStream('file.txt');

finished(stream, (err) => {
  if (err) {
    console.error('Stream failed:', err);
  } else {
    console.log('Stream completed successfully');
  }
});

Know when NOT to use streams. For small files (under 1MB), the overhead isn’t worth it:

// For small config files, this is fine
const config = JSON.parse(fs.readFileSync('config.json', 'utf8'));

// Don't overcomplicate with streams for small data

Set appropriate highWaterMark values. The default 64KB works for most cases, but you can tune it:

// Smaller chunks for real-time processing
const stream = fs.createReadStream('data.txt', {
  highWaterMark: 1024 // 1KB chunks
});

// Larger chunks for throughput
const fastStream = fs.createReadStream('data.txt', {
  highWaterMark: 1024 * 1024 // 1MB chunks
});

Streams are powerful but require discipline. Respect backpressure, handle errors properly, and clean up resources. Master these fundamentals, and you’ll build Node.js applications that scale efficiently with minimal memory overhead.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.