I stumbled across a little gotcha using async with Node.js Streams: you can easily corrupt your output if you are not careful.
Node.js Streams are an abstraction of Unix pipes; they let you push or pull data a little bit at a time, never keeping more in memory than its needed. async is a library used to organize all the asynchronous callbacks used in node applications without getting the kind of "Christmas Tree" deep nesting of callbacks that can occur too easily.
I'm working on a little bit of code to pull an image file, stored in MongoDB GridFS, scale the image using ImageMagick, then stream the result down to the browser.
My first pass at this didn't use ImageMagick or streams, and worked perfectly ... but as soon as I added in the use of async (even before adding in ImageMagick), I started getting broken images in the browser, meaning that my streams were getting corrupted.
Before adding async, my code was reasonable:
However, I knew I was going to add a few new steps here to pipe the file content through ImageMagick; that's when I decided to check out the async module.
The logic for handling this request is a waterfall; each step kicks off some work, then passes data to the next step via an asynchronous callback. The async library calls the steps "tasks"; you pass an array of these tasks to async.waterfall()
, along with the end-of-waterfall callback. This special callback may be passed an error provided by any task, or the final result from the final task.
With waterfall()
, each task is passed a special callback function. If the callback function is passed a non-null error as the first parameter, then remaining tasks are skipped, and the final result handler is invoked immediately, to handle the error.
Otherwise, you pass null as the first parameter, plus any additional result values. The next task is passed the result values, plus the next callback. It's all very clever.
My first pass was to duplicate the behavior of my original code, but to do so under the async model. That means lots of smaller functions; I also introduced an extra step between getting the opened file and streaming its contents to the browser. The extra step is intended for later, where ImageMagick will get threaded in.
The code, despite the extra step, was quite readable:
My style is to create local variables with each function; so openFile
kicks off the process; once the file has been retrieved from MongoDB, the readFileContents
task will be invoked ... unless there's an error, in which case errorCallback
gets invoked immediately.
Inside readFileContents
we convert the file to a stream with file.stream(true)
(the true
means to automatically close the stream once all of the file contents have been read from GridFS).
streamToClient
comes next, it takes that stream and pipes it down to the browser via the res
(response) object.
So, although its now broken up into more small functions, the logic is the same, as expressed on the very last line: open the file, read its contents as a stream, stream the data down to the client.
However, when I started testing this before moving on to add the image scaling step, it no longer worked. The image data was corrupted. I did quite a bit of thrashing: adding log messages, looking at library source, guessing, and experimenting (and I did pine for a real debugger!).
Eventually, I realized it came down to this bit of code from the async module:
The code on line 7 is the callback function passed to each task; notice that once it decides what to do, on line 21 it defers the execution until the "next tick".
The root of the problem was simply that the "next tick" was a little too late. By the time the next tick came along, and streamToClient
got invoked, the first chunk of data had already been read from MongoDB ... but since the call to pipe()
had not executed yet, it was simply discarded. The end result was that the stream to the client was missing a chunk at the beginning, or even entirely empty.
The solution was to break things up a bit differently, so that the call to file.stream()
happens inside the same task as the call to stream.pipe()
.
So that's our Leaky Abstraction for today; what looked like an immediate callback was deferred just enough to change the overall behavior. And that, in Node, anything that can be deferred, will be deferred, since that makes the overall application that much zippier.
Another solution may be to issue stream.pause() once it's initialized and when you are ready to receive have it resume()d, this way you won't lose unhandled data.
ReplyDelete