Description of and notes on the node.js Stream API
There is also a screencast version of this article.
Since writing this blog post (back in the Node 0.6 days) there have been major changes made to the Node Stream API. In Node 0.10 a new version of Streams called Streams2 were released. Here is a summary of using Streams2.
.destroy()
if it has .destroy
.destroy
you are out of luck and the stream
should upgrade to use e.g. newer through2
.abort()
(this should get fixed to use .destroy()
).close
method exists except in FD backed streams
(usually only fs
). usually you would never call .close()
.end()
tries to end the stream gracefullyfinish
event is emitted when a stream ends nicely (e.g. stream.end()
)close
and error
events are emitted when a stream
ends due to failure (e.g. stream.destroy()
)end
is emitted when a readable stream has no more dataend
in streams2 behaves more like a flush event, it only gets
emitted when the data has been read from the readablenode bills itself as JavaScript evented I/O. In a nutshell that means if you are trying to solve a problem that is I/O bound (the limiting factor is reading/writing to relatively slow interfaces) then node can provide some useful abstractions to you. An example is if you are writing an application that has to talk to multiple databases, a caching layer, a load balancer, external web services, mobile clients, third party auth providers and serve web applications then node acts as a nice glue between all of the things.
"Streams in node are one of the rare occasions when doing something the fast way is actually easier. SO USE THEM. not since bash has streaming been introduced into a high level language as nicely as it is in node."
-@dominictarr in his high level node style guide
The main tool in node's evented toolbox is the Stream.
Stream instances are basically Unix pipes. They can be readable, writable
or both and are easy to reason about -- you can pipe a readable stream
to a writable stream by doing readableStream.pipe(writableStream)
.
Readable streams will emit data
events each time they get a
"chunk" of data and then they will emit end
when they
are all finished. emit (from EventEmitter)
is the observer pattern - a publish/subscribe pattern which allows a number
of observer objects to see an event. Different types of streams will have
different ways of chunking up their data. For example, a library that reads
CSVs might emit data
every time it reads a new line whereas
an HTTP request might emit data
every few kilobytes during the
download.
Readable streams can also be paused and resumed, and it's up to the Stream
implementer to write the pause()
and resume()
methods.
Pause is intended to be an advisory API meaning when you call it you are
telling the stream to stop emitting data
events but you may
still receive some data
events after you call pause because
certain things may already in-flight, or between states at the
moment you called pause.
Writable streams must implement two functions: write
and end
.
When you write data to a writable stream it will return either true
or false
. true
means cool, keep sending more data with write and false
means Uh-oh I am backed up -- don't write any more data until I emit drain.
This is a form of back pressure which is a very powerful feature
as it lets stream communicate "upstream" to their writers. Most
of the back pressure related APIs are advisory so there is sort of a gentlemens
agreement to honor requests to start or stop writing as timely as possible.
However, since they are advisory it means a writable stream may still receive write
calls
after it returns false
.
After working with node for a while I came to love the Stream API ( and I'm not the only one) but sorely missed it when I had to deal with the variety of I/O bound APIs in the client side realm. Some examples of these are XHR, WebSockets, IndexedDB, WebWorkers, WebRTC, and DOM Events (mouse, touch, etc). Wouldn't it be great if you could use the same node style Stream semantics to interface with all of these things? Unfortunately it seems that standards bodies have instead implemented their own (usually inconsistent or poorly done) streaming semantics for the client.
I thought it would be useful to write something like fingerSwipe.pipe(webSocket)
(pseudo-code
for detecting swipe touch
events and streaming them to the server
as they happen over a WebSocket connection) so I started a project called
domnodewhich intends to wrap common I/O bound APIs in node style streams
so that you can render them into the DOM easily.
There is a magical project by @substack called browserifythat makes domnode possible. It lets you write code in client side JS that looks like node code:
var stream = require('stream')
var util = require('util')
function XHRStream(xhr) {
stream.Stream.call(this)
xhr.onreadystatechange = function () { me.handle() }
xhr.send(null)
}
util.inherits(XHRStream, stream.Stream)
The best part about browserify is that it avoids reinventing the wheel/not invented here by letting you use the pure JavaScript parts of node or thousands of third party modules while maintaining node's simplified require API. Since browserify uses node's source code verbatim you get to use the node documentationas well as the multitude of related information floating around on the internets to look up how these things work.
@dominictarr contributed a Stream wrapper for Socket.io called browser-stream that nicely integrates with domnode so you can emit things from node and rest assured that they will make their way into your web app.
As wonderful as streams are today, they can definitely use some improvements. @izstells me node v0.9 is going to feature a major Stream overhaul to make the API even more unified. When node started it created a few domain specific evented I/O abstractions for things like the filesystem or TCP and then later, around 0.4, it consolidated them into the Stream API we have today, but there are still some vestigial phantoms lurking around.
One of the major blemishes is the collection of methods used end a stream. destroy()
initiates
a forceful end to a stream, whereas destroySoon()
and end()
both
wait for the current data to finish buffering. end()
and destroySoon()
came
from different parts of node originally but after being consolidated into
Streams they have some overlap and can probably be redesigned.
There is also some confusion between the close
and end
events. end
means no more data will be emitted but if this stream is also writable it should stay open for more writes if they need to happen and close
means whatever this thing was tied to, it's done now. you may dispose of it, it's gone.
Close comes from the fs.close()
method and end is tied to the end()
function
on Streams and as you can see these can probably be refactored.
Another possibility is that streams will get automagic buffering. This means that if you create a writable stream but haven't yet piped it anywhere then it will buffer any data you write to it and then emit that data when someone asks for it later. This same behavior will be available if you pause a stream and then resume later. Currently you have to roll your own buffered stream implementation or use a generic third party module like BufferedStreamby @mikeal.
var writestream = new stream.Stream()
writestream.writable = true
writestream.write = function (data) {
return true // true means 'yes i am ready for more data now'
// OR return false and emit('drain') when ready later
}
writestream.end = function (data) {
// no more writes after end
// emit "close" (optional)
}
writestream.write({number: 1})
// note: in node core data is always a buffer or string
var readstream = new stream.Stream()
readstream.readable = true
readstream.on('data', function(data) {
var ready = writestream.write(data)
if (ready === false) {
this.pause()
writestream.once('drain', this.resume.bind(this))
}
})
readstream.on('end', function() {
writestream.end()
})