Max Ogden | Open Web programmer
April 2019
Voxel.js Next
Check out the Voxel.js reboot
May 2016
Getting Started With Node For Distributed Systems
Where to get started with streams and peer to peer
July 2015
What's the deal with iot.js and JerryScript
Node.js will soon be running on tiny low power chips
July 2015
Electron Fundamentals
A quick intro to Electron, a desktop application runtime
May 2015
HD Live Streaming Cats to YouTube with the Raspberry Pi Camera
A how to guide
May 2015
Interdisciplinary Open Source Community Conferences
A list of community organized events
April 2015
Setting up HTTPS with a wildcard certificate and Nginx
How I set up HTTPS with Nginx
April 2015
A Month of Modules
Modules Mafintosh and I wrote this month
February 2015
Tessel Powered Plant Watering System
Make an HTTP accessible water pump
January 2015
Portland Fiber Internet
Review of 1Gb fiber from CenturyLink
January 2015
node-repl
An interactive console for node
January 2015
Nested Dependencies
Insight into why node_modules works the way it does
July 2013
Node Packaged Modules
Bringing NPM modules to the web
March 2013
Kindleberry Wireless
A Portable Outdoor Hackstation
January 2013
Bringing Minecraft-style games to the Open Web
A status report from the one month old voxel.js project
November 2012
A Proposal For Streaming XHR
XHR2 isn't stream friendly. Lets explore why and propose a solution!
October 2012
Scraping With Node
Useful modules and a tutorial on how to parse HTML with node.js
October 2012
Building WebView Applications
Things I learned while building @gather
May 2012
Fast WebView Applications
How to make web apps feel fast and responsive
April 2012
Node Streams: How do they work?
Description of and notes on the node.js Stream API
December 2011
Gut: Hosted Open Data Filet Knives
HTTP Unix pipes for Open Data
July 2011
Little Coders
Elementary school programming
Node Streams: How do they work?

Description of and notes on the node.js Stream API

There is also a screencast version of this article.

UPDATE: new information about Node >= 0.10

Since writing this blog post (back in the Node 0.6 days) there have been major changes made to the Node Stream API. In Node 0.10 a new version of Streams called Streams2 were released. Here is a summary of using Streams2.

how to destroy/end streams in node >= 0.10

  • usually you call .destroy() if it has .destroy
  • if it doesnt have .destroy you are out of luck and the stream should upgrade to use e.g. newer through2
  • in request you call .abort() (this should get fixed to use .destroy())

what about close, end

  • usually no .close method exists except in FD backed streams (usually only fs). usually you would never call .close()
  • .end() tries to end the stream gracefully

what about events

  • finish event is emitted when a stream ends nicely (e.g. stream.end())
  • close and error events are emitted when a stream ends due to failure (e.g. stream.destroy())
  • end is emitted when a readable stream has no more data
  • end in streams2 behaves more like a flush event, it only gets emitted when the data has been read from the readable

Why Streams?

node bills itself as JavaScript evented I/O. In a nutshell that means if you are trying to solve a problem that is I/O bound (the limiting factor is reading/writing to relatively slow interfaces) then node can provide some useful abstractions to you. An example is if you are writing an application that has to talk to multiple databases, a caching layer, a load balancer, external web services, mobile clients, third party auth providers and serve web applications then node acts as a nice glue between all of the things.

"Streams in node are one of the rare occasions when doing something the fast way is actually easier. SO USE THEM. not since bash has streaming been introduced into a high level language as nicely as it is in node."

-@dominictarr in his high level node style guide

What are Streams?

The main tool in node's evented toolbox is the Stream. Stream instances are basically Unix pipes. They can be readable, writable or both and are easy to reason about -- you can pipe a readable stream to a writable stream by doing readableStream.pipe(writableStream).

Readable

Readable streams will emit data events each time they get a "chunk" of data and then they will emit end when they are all finished. emit (from EventEmitter) is the observer pattern - a publish/subscribe pattern which allows a number of observer objects to see an event. Different types of streams will have different ways of chunking up their data. For example, a library that reads CSVs might emit data every time it reads a new line whereas an HTTP request might emit data every few kilobytes during the download.

Readable streams can also be paused and resumed, and it's up to the Stream implementer to write the pause() and resume() methods. Pause is intended to be an advisory API meaning when you call it you are telling the stream to stop emitting data events but you may still receive some data events after you call pause because certain things may already in-flight, or between states at the moment you called pause.

Writable

Writable streams must implement two functions: write and end. When you write data to a writable stream it will return either true or false. true means cool, keep sending more data with write and false means Uh-oh I am backed up -- don't write any more data until I emit drain.

This is a form of back pressure which is a very powerful feature as it lets stream communicate "upstream" to their writers. Most of the back pressure related APIs are advisory so there is sort of a gentlemens agreement to honor requests to start or stop writing as timely as possible. However, since they are advisory it means a writable stream may still receive write calls after it returns false.

Streams on the client

After working with node for a while I came to love the Stream API ( and I'm not the only one) but sorely missed it when I had to deal with the variety of I/O bound APIs in the client side realm. Some examples of these are XHR, WebSockets, IndexedDB, WebWorkers, WebRTC, and DOM Events (mouse, touch, etc). Wouldn't it be great if you could use the same node style Stream semantics to interface with all of these things? Unfortunately it seems that standards bodies have instead implemented their own (usually inconsistent or poorly done) streaming semantics for the client.

I thought it would be useful to write something like fingerSwipe.pipe(webSocket) (pseudo-code for detecting swipe touch events and streaming them to the server as they happen over a WebSocket connection) so I started a project called domnodewhich intends to wrap common I/O bound APIs in node style streams so that you can render them into the DOM easily.

There is a magical project by @substack called browserifythat makes domnode possible. It lets you write code in client side JS that looks like node code:

var stream = require('stream')
var util = require('util')
   
function XHRStream(xhr) {
  stream.Stream.call(this)
  xhr.onreadystatechange = function () { me.handle() }
  xhr.send(null)
}

util.inherits(XHRStream, stream.Stream)

The best part about browserify is that it avoids reinventing the wheel/not invented here by letting you use the pure JavaScript parts of node or thousands of third party modules while maintaining node's simplified require API. Since browserify uses node's source code verbatim you get to use the node documentationas well as the multitude of related information floating around on the internets to look up how these things work.

@dominictarr contributed a Stream wrapper for Socket.io called browser-stream that nicely integrates with domnode so you can emit things from node and rest assured that they will make their way into your web app.

Streams.next

As wonderful as streams are today, they can definitely use some improvements. @izstells me node v0.9 is going to feature a major Stream overhaul to make the API even more unified. When node started it created a few domain specific evented I/O abstractions for things like the filesystem or TCP and then later, around 0.4, it consolidated them into the Stream API we have today, but there are still some vestigial phantoms lurking around.

One of the major blemishes is the collection of methods used end a stream. destroy() initiates a forceful end to a stream, whereas destroySoon() and end() both wait for the current data to finish buffering. end() and destroySoon() came from different parts of node originally but after being consolidated into Streams they have some overlap and can probably be redesigned.

There is also some confusion between the close and end events. end means no more data will be emitted but if this stream is also writable it should stay open for more writes if they need to happen and close means whatever this thing was tied to, it's done now. you may dispose of it, it's gone. Close comes from the fs.close() method and end is tied to the end() function on Streams and as you can see these can probably be refactored.

Another possibility is that streams will get automagic buffering. This means that if you create a writable stream but haven't yet piped it anywhere then it will buffer any data you write to it and then emit that data when someone asks for it later. This same behavior will be available if you pause a stream and then resume later. Currently you have to roll your own buffered stream implementation or use a generic third party module like BufferedStreamby @mikeal.

Code examples

Simplified writable stream

var writestream = new stream.Stream()
writestream.writable = true
writestream.write = function (data) {
  return true // true means 'yes i am ready for more data now'
  // OR return false and emit('drain') when ready later
}
writestream.end = function (data) {
  // no more writes after end
  // emit "close" (optional)
}

writestream.write({number: 1})
// note: in node core data is always a buffer or string

Simplified Stream.pipe() implementation

var readstream = new stream.Stream()
readstream.readable = true

readstream.on('data', function(data) {
  var ready = writestream.write(data)
  if (ready === false) {
    this.pause()
    writestream.once('drain', this.resume.bind(this))
  }
})

readstream.on('end', function() {
  writestream.end()
})