Lossless Backpressure in RxJS

Convert Node.js Streams to RxJS Pipelines

--

tl;dr I figured out a lossless way to handle backpressure in async iterators, generators, and Node.js streams with RxJS.

This article is my way of helping out all of the people unable to utilize one set of skills for a wide-array other situations (like RxJS and backpressure). RxJS is a wonderful stream library, and you shouldn’t have to learn iterators, generators, and Node.js streams on top of it especially if you’re already proficient in one of the greatest streaming libraries around. My goal is for the RxJS devs to eventually add this functionality into RxJS the same way they did with WebSocketSubject.

What is backpressure?

Backpressure occurs when your consumer is slower than your producer.

A few examples of backpressure are mouse movement events, window resize events, and parsing a file. All of these could either cause more events to occur than your consumer can process or, in the case of reading a file, too much memory for your machine to deal with.

To deal with these issues, you could either have lossy or lossless methods. Lossy methods include sampling, throttling, and debouncing. Whereas lossless methods include buffering and pausing.

Lossless backpressure is really important when reading a file; for example, because you don’t want pieces of the file to go missing as you’re processing it. On the other hand, events like mouse movement can be lossy because your monitor can only update so many times a second and the mouse movement event emits much more frequently.

IxJS vs RxJS

RxJS is a push library where the producer throws data at the consumer. There’s another library called IxJS under the same umbrella. For a long time, I was confused as to the reason there were two separate libraries doing essentially the same thing until I realized IxJS is a pull library designed to support both synchronous and asynchronous iterators. Even though RxJS supports iterators through the from observable, it doesn’t give you the power to deal with backpressure.

It’s important to understand the distinction between pulling and pushing data because that changes how you approach backpressure.

From the perspective of pipelining, iterators and observers are essentially the same thing, but when you get into the reason why you’d need one or the other, that’s when your choice of library comes into play.

I’d make the case you don’t need IxJS, but both have their place. While having an entire iterator library similar to RxJS is great for that specific workflow, it might be better to also have the power of controlling iterators in RxJS itself. I create a lot of web apps, and I find it better to save space on libraries by only pulling in RxJS (the normal use case) rather than both RxJS and IxJS. It’s also much easier to train up a dev team on a single library rather than a bunch of different ones.

The Problem

Iterators require you to pull data by asking for it whenever you’re ready to process the next set of data. This has the benefit of significantly reducing your memory footprint since you’re only grabbing the next value after you’ve completed processing the previous one. You’ll also have access to a similar mechanism in Node.js streams if you create a transform through the pipe method instead of using event handlers.

Because of this, iterators and Node.js streams have the ability to handle lossless backpressure, but RxJS doesn’t (unlike IxJS) and on top of that, RxJS doesn’t work with Node.js streams out of the box.

There’re actually a lot of similarities between iterables and observables. For instance, iterables implement an iterator whereas observables implement an observer. Both are producers, but iterators wait for consumers to ask for the next value whereas observers give consumers the next value as soon as they’re available. They’re the same, but reversed.

Because an observable is a one-way flow of data, there’s no way to call back into an observable and tell it to process more data like an iterable. If we could tap into the observer, then we’d be able to simulate a the IxJS pull library using two pushes. There’s already a tool specifically designed for just that purpose: subject.

In the same way that a generator is a more-advanced version of a iterable, a subject is a more advanced observable. Generators allow you to control the iterator whereas subjects allow you to control the observer. Just as a Subject is both an Observer and an Observable, a Generator is both an Iterator and Iterable. That’s our key to adding pull functionality to RxJS.

Iterator Support is Broken in RxJS

RxJS supports iterators natively, but the implementation is broken because it doesn’t account for backpressure:

In this example, I’ve put a 1 second delay in the pipeline of our generator. We can pretend this delay is an AJAX call.

Because of the way RxJS processes iterators, we’re seeing a bunch of incoming values being iterated looped through and shoved into the stream even though the stream isn’t fast enough to keep up. If we had an extremely large collection, it would all be loaded into memory while our pipeline struggled to process all the values.

Let’s assume this is an advanced generator that requires the return value of the AJAX call before continuing onto the next yield. If that’s the case, there’s no way to give back a value into the generator; believe me, I’ve tried.

The Idea

The only way to truly make RxJS work with generators is to use subjects. It’s the closest conversion. This gives you the power of passing in arguments and telling it when you’re ready to accept another value.

What we need is a way to take advantage of an iterators’s ability to wait on getting the next value until you’ve asked.

I’ve created a wrapper function for generators to turn them into subjects. Here’s a simpler example of what that looks like:

The key is waiting till we’ve hit next in our subscriber. At that point, we can ask the iterator for another value. So we don’t run into memory leaks, iterators have a done prop that signifies we should complete our observable.

This example isn’t a silver bullet. It doesn’t solve the issues of passing back values into the generator, but that’s a quick fix. Since the first next call on a generator can’t be passed a value, we can call it without passing in a value. This example also suffers because it doesn’t handle the case where a generator can a single value and immediately end.

Proper Handling of Iterator Backpressure

Taking that original idea, we can extrapolate it into a function:

When you call our custom push method on iterator$, it will pass that value to the generator which will return us a new value. That new value then gets passed through the stream. If you’ve used WebSocketSubject, this is the same kind of implementation except I’m not overloading the next method.

A push to get the data into the generator, and another push to get the value into the stream. That’s how a pull can be satisfied by two pushes and have a subject can mimic a generator but with a lot more functionality.

Let’s try createIteratorSubject against our original observable pipeline:

And now we can handle backpressure. The iterator only calculates the next value when we ask for it. Instead of using the subscribe method, you can also use tap to get the same result.

Node.js Stream Transforms

Node.js has its own stream processor which isn’t supported by RxJS. There are a few libraries out there to add compatibility, but I think they miss the point. They also don’t handle the case of backpressure and that’s the reason I ever use Node.js streams.

Let’s look at creating a simple transform stream which uppercases every letter in a novel and then outputs that to a new file:

If you’ve ever done a lot of Node.js development, you’ve come across this same kind of code a bunch. Each pipe function can only take a single stream. The most-common are transform streams, and if you’ve ever used Gulp, there’re a bunch of available transform streams for just about anything you want to do.

Still, I know RxJS really well, but I don’t know how I’d even begin to do a lot of the crazy stuff I’m used to in a Node.js transform stream. Honestly, it took me an embarrassingly long amount of time to figure out how to write a custom transform stream and even wrap my head around how a Node.js stream works in the first place. I remember using Gulp just 3 years ago and even then, it was never intuitive to do anything custom.

Instead of having to learn a completely different stream methodology — which I had to do anyway to write this article — I’ve created my own RxJS implementation similar to the iterator example.

RxJS Controlling a Node.js Stream

Before showing you the implementation, I want to first go over the usage as it has some differences:

We’re passing transformStream$.next to the subscriber’s next prop just like before. This allows us to get another chunk only when we’ve finished processing this chunk, but there’s more to it when dealing with transform streams.

We need to also call transformStream$.push if we want to pass that value to the next Node.js stream piped in — our write stream. In this case, anytime we want to write a chunk of text to a file, we will call transformStream$.push, and then use transformStream$.next to ask for the next value because we’re done processing.

There are a lot of reasons you’d want these separated into two different methods, but that’s beyond the scope of this article.

The goal here is that we only accept an incoming chunk when we’re done processing the last one and that’s what we’re seeing. Pretty neat right? All our transform streams can now be combined into a single RxJS observable pipeline instead of a bunch of really complicated pipe methods.

You need to have a write stream at the end of your pipeline so it’ll finish processing so I tried calling writeStream.write(value) and writeStream.end() in transformStream$, but I wasn’t able to get it working where it would actually write more than a couple megabytes of data nor would it end the stream. Because of this, I exposed the original Node.js stream on transformStream$.stream. You can create your own write stream, but for this example, I’m going to write back to another file because that’s a normal use-case.

Now let’s look at the implementation:

Every time transform is called in our transformStream object, it gives us the ability to call the callback function which asks for another value. Using chunk$, we’re able to control calling the callback function. Notice the use of take(1). This on-the-fly subscription needs to be unsubscribed as soon as the callback’s been called; otherwise, you’ll wind up with a memory leak.

We also subscribed to push$ which allows us to push values into the Node.js stream when transformStream$.push is called. This is how the uppercased values are being passed into the write stream.

Another important piece is listening for the finish event which triggers the completion of all 3 subjects. This is another situation where you could find memory leaks if they’re not properly closed out.

Conclusion

I spent a lot of time doing research, working out a clean API, and finding tons of online examples before being able to write these backpressure helpers. I know I’ll be using these implementations in the future instead of figuring out how to mess with generators or trying to remember everything about Node.js streams in 6 months when I’ll probably need to use them again.

To me, learning something really well means it’s easier to keep using that thing even if it’s not the best solution for every use case. While I am a strong believer in using the right tool for the job, I believe I’ve shown that RxJS can be the right tool for the job when managing backpressure. It’s a lot easier to use and a lot easier to remember when I’m already working with it everyday.

More Reads

If you’ve got an interest in more topics related to RxJS, you should checkout my other articles:

--

--