NodeJs Streams API

Streams – Why do we need ?

To understand the need lets compare the below two ways of copying the files.

const fs = require('fs').promises;

async function copyFile(fromFile, toFile){

  var data = await fs.readFile(fromFile); //Read the whole content  
  await fs.writeFile(toFile, data);       //Then write the content into new file

}
const fs = require('fs');

function copyFile(fromFile, toFile){

  fs.createReadStream(fromFile)        //Read in small chunks and
  .pipe(fs.createWriteStream(toFile)); //Send it for writing into new file

}
  1. In the first case we are reading the whole file, before passing it to the writer and
  2. In the second, we are continuously reading in small chunks and passing them for writing.

To understand the effect, just think of the video files we see on the YouTube. What if we have to send the entire file at a time? What if we have to download the complete file before we watch them?

The data streaming APIs are extremely useful in reading\writing large files in smaller chunks. The key benefits the streams provide us are that they are :

Memory Efficient : Since it work on smaller chunks of data, its highly memory efficient and it’s capable of handling very large files.

Time Efficient : Its time efficient because as in the above example, a stream can allow you to watch your video much before we download it completely.

 

Types of Streams

There are basically two types of streams as shown below and each type uses a buffer to temporarily store their working data.

Besides reading the data in chunks, a readable stream can pause and restart in between. This helps it to adjust to the data consumption rates. Similarly, a writable stream also has the capability to flag its feeder to pause when it’s buffer is running out of space.

The Duplex and Transform use the above two basic streams to build various data transformation logic.

Here, is a brief summary of these four streams.

  • Readable : We can read from these streams in chunks.
    • These stream read/receive data into their buffers and enable us to read data from there in chunks.
    • e.g. fs.createReadStream()
  • Writable : We can write into these streams multiple times.
    • As we write, it goes to its buffer and from there it streams the data to its destination.
    • e.g. fs.createWriteStream()
  • Duplex : Its a combination of writable and readable stream.
    • e.g. net.Socket()
  • Transform : These are Duplex streams doing some transformation on the data as we write or read from it.
    • For example the following Gzip transformer converts the plain text into Gzip format as it passes through it.
    • e.g. zlib.createGzip()

 

How to use the streams

 

1. Writable Streams

Here are some of the examples of writable streams.

  • process.stdout , process.stderr
  • fs write stream
  • zlib streams
  • HTTP Responses, on the server
  • HTTP Requests, on the client

They all implement the interface defined by streams.Writable class. Hence, as streams they all follow the same usage pattern. Hence, to understand the writeable streams let us use a fs write stream.

1.1 Writing into a fs Write Stream

The program below shows a simple writable file stream where we are trying to write our content in 2000 small parts.

It shows how can we use a writable stream for creating a large file using multiple writes.

const fs = require('fs');
const writableStream = fs.createWriteStream('example.txt', 'utf8');

for (let i = 0; i < 2000; i++) {
  //We can write multiple times to the stream
  writableStream.write(`Creating a long content, #${i}!\n`);
}

writableStream.end('The end() takes the last line for writing. \n');

writableStream.on('finish', () => {
  console.log('A call to end(), closes the stream and triggers the finish event.');
});

Importantly, we should call the end() function when we are done with our writing and want to close the stream. A call to .end() can take the last content. After finishing the data in the buffer, it closes the stream and fires a finish event.

The finish event is a place where we can specify our follow up action.

 

1.2 Backpressure : Flagging when the buffer is running out of space

Every time we write the data, it goes to the buffer.

    let hasBufferSpace = writableStream.write(`Some content....`);

Every time we write, the stream returns a flag, as shown above, to say if it has enough space to safely accept the next write.

As the buffer is limited, the stream uses a threshold limit called highWaterMark limit for raising this cautionary signals. If the buffer exceeds the highWaterMark the call to write() returns false , signaling the backpressure.

With a backpressure, it essentially asks us to wait until enough buffer space is available and until it sends a drain event. The next tab shows how to write safely using backpressure and the drain event.

const fs = require('fs');
const writableStream = fs.createWriteStream('example.txt', 'utf8');

writableStream.on('finish', () => {
  console.log('A call to end()- takes the last line, closes the stream and triggers the finish event.');
});

function writeAndHandleBackPressure(requiredCount, completedCount){
  let hasBufferSpace = true;
  for(index=completedCount;index<requiredCount;index++){
    hasBufferSpace = writableStream.write(`Creating a long content, #${index}!\n`);

    if(requiredCount == index+1){
      writableStream.end('Writing completed successfully!');
    }else{

      if(!hasBufferSpace){//
        console.log('Stop! Continue after it triggers drain, the buffer is complaining ! Current line:'+ (index+1))
        writableStream.once('drain', ()=>{writeAndHandleBackPressure(requiredCount,index+1);});
        break;
      }

    }
  }
}
//Write 1000 times -wait for drain when the buffer is running out of space
writeAndHandleBackPressure(2000,0);

Here we are writing 2000 lines of text. On each write, we are checking if the returned value is false. If so, we are pausing for our next write, till the stream drains out so that it has enough space.

Line-18 : We are stopping the loop here until we get the next ‘drain’ event from the writable stream.

Below is a sample output of running the above program when we are trying to write 2000 lines. It shows the buffer was going beyond it’s safe limits on four instances.

F:\nodejs\samples\streams>node writestream-with-backpressure.js
Stop! Continue after it triggers drain, the buffer is complaining ! Current line:532
Stop! Continue after it triggers drain, the buffer is complaining ! Current line:1059
Stop! Continue after it triggers drain, the buffer is complaining ! Current line:1571
A call to end()- takes the last line, closes the stream and triggers the finish event.

 

1.3 Key Events Emitted by a Writable Stream

We have seen the use of the ‘finish’ and ‘drain’ events in our above examples. Here, is a brief on the key events available on a writable stream:

  1. finish : stream.end() calls this event after the writable stream finishes the data in the buffer.
  2. close : The stream triggers this event when the stream itself has been closed.
  3. error : This event is triggered whenever we have any error inside the writable stream.
  4. drain : This event is triggered to say that the buffer is again ready to accept new data safely, after the backpressure.
  5. pipe : We get this event when we pipe a readable stream into it.
  6. unpipe : We get this event when we remove the piped stream from it.

 

2. Readable Stream

Some examples of the readable streams are :

  • process.stdin
  • fs read stream
  • zlib streams
  • HTTP Requests, on the server
  • HTTP Responses, on the client

They all implement the interface defined by streams.Readable class. Hence, as readable streams they all follow the same usage pattern.

A readable stream operates in two modes – paused and flowing.

When we create a stream, it starts with the paused mode wherein any data coming into it’s buffer stays there until we read it. In the flow mode, we simply ask the readable to send out the data from the buffer in chunks. Thus, there are the two ways of reading the data in a readable stream :

  1. Flowing Mode Reading : Here, we let the buffer stream its data in chunks.
  2. Paused Mode Reading : Here, we manually read the data from the buffer

 

2.1 How to use Flowing Mode Reading

Let us take an example of fs.createReadStream to read a file. The following code will never call the ‘end‘ event as the reading will not start at all.

Similar to ‘finish‘ event in the writable stream, the read stream always calls the ‘end‘ after completely reading the data.

const fs = require('fs');
const rs = fs.createReadStream('example.txt','utf8');

// Since the reading will not start, the 'end' event will never arise.

rs.on('end', () => {
  console.log('Reading completed!'); // Will not be called.
});

In order to start reading the data in the flowing mode, we can use any of the following three approaches :

  1. Call resume()
  2. Pipe it to a writable stream or
  3. Add a on(‘data’) event handler

In all these three cases, the buffer will start streaming the data in chunks until it completes the reading. The three tabs below shows the code snippet for each with comments.

const fs = require('fs');
const rs = fs.createReadStream('example.txt','utf8');

rs.on('data', (chunk)=>{
    // The stream keeps triggering the data event 
    // for each chunk of data received in the buffer
    //console.log('\nData length : '+ chunk.length +' bytes'); 
});

rs.on('end', () => {
  console.log('Reading completed!');
});

//Output :
//Reading completed!
const fs = require('fs');

const ws = fs.createWriteStream('example-copy.txt','utf8');
const rs = fs.createReadStream('example.txt','utf8');

//2. When we pipe the readable stream to writeable stream,
//   it's another way to start the flow of data from the buffer
rs.pipe(ws);

rs.on('end', () => {
  console.log('Reading completed!');
});

//Output :
//Reading completed!
const fs = require('fs');
const rs = fs.createReadStream('example.txt','utf8');

//3. We can simply resume the flowing mode using resume
//   Here, we are consuming all data without doing anything. 
rs.resume();

rs.on('end', () => {
  console.log('Reading completed!');
});

//Output :
//Reading completed!
2.2 How to use Paused Mode Reading

Instead of asking the buffer to stream the incoming data, we can also manually read the data from it using stream.read().

In order to read the data from the buffer manually, we can add a ‘readable‘ event handler instead. Adding this event handler over-rides any configuration for the flow-mode and keeps the stream in the pause mode.

The stream triggers a readable event whenever the data becomes available for read. The below code shows how to read using ‘readable’ event handler using read() .

const fs = require('fs');
const rs = fs.createReadStream('example.txt','utf8');

rs.on('readable', ()=>{
     
    //reading all data manually in a loop
    while(chunk = rs.read()){
      console.log('\nData length : '+ chunk.length +' bytes');
    }

});

rs.on('end', () => {
  console.log('Reading completed!');
});

Here, we have created a 10000 line file using the write stream example we have created above.

As we read this test file, can find the output as below which shows each read provides a chunk of data from the buffer.

F:\nodejs\samples\streams>node pause-mode-read.js

Data length : 65536 bytes

Data length : 65536 bytes

Data length : 65536 bytes

Data length : 65536 bytes

Data length : 56790 bytes
Reading completed!

Importantly, from within the ‘readable‘ event handler, if we do not use stream.read() the data will stay in the buffer and the ‘end’ event will never be triggered.

Now, having seen two ways of reading, it is recommended to use only one mode of reading instead of mixing them together. From ease of use point of view the flow mode is better.

 

2.3 How to Pause and Resume while Reading in Flowing Mode

One challenge in the flow mode could be when the rate of flow is more than what we can consume. For example, if the writeable stream we are writing into from our readable stream is flagging a backpressure.

To address this issue, this mode allows us to pause and restart the reading using pause() and resume() respectively.

While writing into a writable stream, we can pause feeding more data on backpressure and, then resume on the ‘drain’ event.

The code below shows how we are pausing for a second after reading a chunk of data.

const fs = require('fs');
const rs = fs.createReadStream('example.txt','utf8');

rs.on('data', (chunk)=>{
    console.log('\nData length : '+ chunk.length +' bytes');
    
    //We are using pausing the flow mode for a second after each chunk
    rs.pause();
    setTimeout(() => {rs.resume();}, 1000);

});
rs.on('end', () => {
  console.log('Reading completed!');
});

 

2.4 Key Events Emitted by a Readable Stream
  • data : While in flowing mode, this event is triggered as the buffer streams out each chunk of data.
  • readable: While in pause mode, this event is triggered whenever some data is available in the buffer for read.
  • pause : This event is triggered when we pause a stream. Say, when we unpipe a piped writable stream.
  • resume : This event is triggered when we resume a stream. Say, when we pipe a writable stream or add a on ‘data’ event handler.
  • end: A readable calls this when there is no more data to be consumed.
  • close : The stream triggers this event when the stream is closed and no further action on the stream is possible.
  • error: This event is triggered for any error inside the readable stream.

 

3. Why and How to use Pipes

As we have seen above, the pipes use the flowing mode of the readable streams for reading the incoming data.

Pipes enable us to easily connect a readable stream to a writable stream. It passes the data managing the flow rates or the backpressure. More specifically, takes care of pausing the readable stream when the consuming writable stream is sending signals for backpressure.

Below are some simple examples of pipes.

Example-1 : This one shows how we can copy a file using streams and pipe.

const fs = require('fs');

const readable = fs.createReadStream('example.txt');
const writable = fs.createWriteStream('example-copy.txt');

readable.pipe(writable);
console.log("A copy of the file created.");

 

Example-2 : This one uses Gzip transform which has both a writable and readable stream.

As we pipe the data in plain text to Gzip, it transforms the data stream into .gz format.

Then, the readable stream interface of the Gzip, pipes the formatted data into a write stream for saving into the specified file.

const fs = require('fs');
const zlib = require('zlib');

const readable = fs.createReadStream('example.txt');
const zip = zlib.createGzip();
const writable = fs.createWriteStream('example.txt.gz');

readable.pipe(zip).pipe(writable);
console.log("The gz file created.");