Max Ogden | Open Web programmer
Node Streams: How do they work?

Description of and notes on the node.js Stream API

There is also a screencast version of this article.

Why Streams?

node bills itself as JavaScript evented I/O. In a nutshell that means if you are trying to solve a problem that is I/O bound (the limiting factor is reading/writing to relatively slow interfaces) then node can provide some useful abstractions to you. An example is if you are writing an application that has to talk to multiple databases, a caching layer, a load balancer, external web services, mobile clients, third party auth providers and serve web applications then node acts as a nice glue between all of the things.

"Streams in node are one of the rare occasions when doing something the fast way is actually easier. SO USE THEM. not since bash has streaming been introduced into a high level language as nicely as it is in node."

-@dominictarr in his high level node style guide

What are Streams?

The main tool in node's evented toolbox is the Stream. Stream instances are basically Unix pipes. They can be readable, writable or both and are easy to reason about -- you can pipe a readable stream to a writable stream by doing readableStream.pipe(writableStream).

Readable

Readable streams will emit data events each time they get a "chunk" of data and then they will emit end when they are all finished. emit (from EventEmitter) is the observer pattern - a publish/subscribe pattern which allows a number of observer objects to see an event. Different types of streams will have different ways of chunking up their data. For example, a library that reads CSVs might emit data every time it reads a new line whereas an HTTP request might emit data every few kilobytes during the download.

Readable streams can also be paused and resumed, and it's up to the Stream implementer to write the pause() and resume() methods. Pause is intended to be an advisory API meaning when you call it you are telling the stream to stop emitting data events but you may still receive some data events after you call pause because certain things may already in-flight, or between states at the moment you called pause.

Writable

Writable streams must implement two functions: write and end. When you write data to a writable stream it will return either true or false. true means cool, keep sending more data with write and false means Uh-oh I am backed up -- don't write any more data until I emit drain.

This is a form of back pressure which is a very powerful feature as it lets stream communicate "upstream" to their writers. Most of the back pressure related APIs are advisory so there is sort of a gentlemens agreement to honor requests to start or stop writing as timely as possible. However, since they are advisory it means a writable stream may still receive write calls after it returns false.

Streams on the client

After working with node for a while I came to love the Stream API ( and I'm not the only one) but sorely missed it when I had to deal with the variety of I/O bound APIs in the client side realm. Some examples of these are XHR, WebSockets, IndexedDB, WebWorkers, WebRTC, and DOM Events (mouse, touch, etc). Wouldn't it be great if you could use the same node style Stream semantics to interface with all of these things? Unfortunately it seems that standards bodies have instead implemented their own (usually inconsistent or poorly done) streaming semantics for the client.

I thought it would be useful to write something like fingerSwipe.pipe(webSocket) (pseudo-code for detecting swipe touch events and streaming them to the server as they happen over a WebSocket connection) so I started a project called domnodewhich intends to wrap common I/O bound APIs in node style streams so that you can render them into the DOM easily.

There is a magical project by @substack called browserifythat makes domnode possible. It lets you write code in client side JS that looks like node code:

var stream = require('stream')
var util = require('util')
   
function XHRStream(xhr) {
  stream.Stream.call(this)
  xhr.onreadystatechange = function () { me.handle() }
  xhr.send(null)
}

util.inherits(XHRStream, stream.Stream)

The best part about browserify is that it avoids reinventing the wheel/not invented here by letting you use the pure JavaScript parts of node or thousands of third party modules while maintaining node's simplified require API. Since browserify uses node's source code verbatim you get to use the node documentationas well as the multitude of related information floating around on the internets to look up how these things work.

@dominictarr contributed a Stream wrapper for Socket.io called browser-stream that nicely integrates with domnode so you can emit things from node and rest assured that they will make their way into your web app.

Streams.next

As wonderful as streams are today, they can definitely use some improvements. @izstells me node v0.9 is going to feature a major Stream overhaul to make the API even more unified. When node started it created a few domain specific evented I/O abstractions for things like the filesystem or TCP and then later, around 0.4, it consolidated them into the Stream API we have today, but there are still some vestigial phantoms lurking around.

One of the major blemishes is the collection of methods used end a stream. destroy() initiates a forceful end to a stream, whereas destroySoon() and end() both wait for the current data to finish buffering. end() and destroySoon() came from different parts of node originally but after being consolidated into Streams they have some overlap and can probably be redesigned.

There is also some confusion between the close and end events. end means no more data will be emitted but if this stream is also writable it should stay open for more writes if they need to happen and close means whatever this thing was tied to, it's done now. you may dispose of it, it's gone. Close comes from the fs.close() method and end is tied to the end() function on Streams and as you can see these can probably be refactored.

Another possibility is that streams will get automagic buffering. This means that if you create a writable stream but haven't yet piped it anywhere then it will buffer any data you write to it and then emit that data when someone asks for it later. This same behavior will be available if you pause a stream and then resume later. Currently you have to roll your own buffered stream implementation or use a generic third party module like BufferedStreamby @mikeal.

Code examples

Simplified writable stream

var writestream = new stream.Stream()
writestream.writable = true
writestream.write = function (data) {
  return true // true means 'yes i am ready for more data now'
  // OR return false and emit('drain') when ready later
}
writestream.end = function (data) {
  // no more writes after end
  // emit "close" (optional)
}

writestream.write({number: 1})
// note: in node core data is always a buffer or string

Simplified Stream.pipe() implementation

var readstream = new stream.Stream()
readstream.readable = true

readstream.on('data', function(data) {
  var ready = writestream.write(data)
  if (ready === false) {
    this.pause()
    writestream.once('drain', this.resume.bind(this))
  }
})

readstream.on('end', function() {
  writestream.end()
})