Redux-Observable will solve your state problems.

Solving Common Async State Issues

ITNEXT
Published in
17 min readSep 30, 2018

--

Everyone using Redux will eventually find themselves in need of a way to handle async actions. There are quite a few options out there with the biggest name being Redux-Thunk followed by Redux-Saga and Redux-Observable.

Redux-Observable is an amazingly powerful middleware for Redux using RxJS. Using Redux-Observable requires knowing or learning RxJS which is itself a major pain in the butt. When I first started using these tools, it took quite a bit of time and a lot of experimentation to figure out a good process.

Instead of forcing everyone else to go through the same thing, I’ve created a bunch of helpful starter epics you might use in your project. While these are going to be simpler examples (relative statement), you should be able to figure out the basics of RxJS and how it ties into Redux-Observable for your own uses.

What is Redux-Observable

Redux-Observable abstracts out connecting to the Redux store and handles subscriptions for you so you don’t have to worry about listening for actions or dispatching them yourself.

At a very basic level, it’s calling Redux’s dispatch function when it subscribes to all your epics (observable pipelines) at the same time like so:

allYourObservablesTogether
.subscribe(store.dispatch)

Because Redux-Observable only dispatches actions, it might not be obvious how to work with side-effects.

Side-Effects

Let’s start with side-effects. Sometimes you need to hook into 3rd party libraries like a chat client, a logging platform, or an analytics service. To do so, you’re almost always going to be calling into their non-Redux libraries. Since Redux, by default, only calls actions that affect state through reducers, you might be wondering how you’re supposed to tie in 3rd party libraries with Redux. That’s where Redux-Observable and its epics come in.

For this example, I’ll be communicating with a 3rd party service and telling it to start logging only after I’ve stored the user’s information; for instance, after a successful sign-in.

We start by piping off the action$ observable given to our epic by Redux-Observable. We want to check if the action coming in has a matching type using ofType.

Under the hood, ofType is really:

filter(({ type }) => (
type === 'USER::UPDATE_USER_INFO'
))

ofType is just like checking for action.type in a state reducer. Any action that occurs will go through this ofType check and either proceed or stay.

Because a logging service requires our user’s data formatted in a certain way, we’ll map it to a new object, then pass that over to the logging service using tap.

tap is used when you want to do side-effects such as a console.log or hook into a 3rd party tool like updateLoggingService. It’s exactly the same as a Promise’s .then() except it doesn’t mutate the state of the pipeline.

This is the Promise equivalent of a tap:

Promise
.resolve(value)
.then(value => {
updateLoggingService(value)

return value
})

tap is also great when you wanna debug your pipeline. It’s invaluable when working on a new epic.

Since all epics must emit an action and we’re not doing that, we end our pipeline with ignoreElements. We’ve placed ignoreElements right at the end of our observable so it’ll stop the pipeline as soon as it gets past tap. No action is ever emitted.

Execute Once

Along with side-effects, sometimes you’ll want to execute an action only once. In this example, it’s going to be when you want to start your on-site chat.

This epic looks very similar to what we had before except it starts the chat client only once using take(1). This means, no matter how many times that action is called, it only executed that first time.

There might be a separate action to update the chat client with user info like we had in the previous example, but this epic is only concerned with starting the chat client. Since we’re calling a function on a 3rd party service, we really only want to call this the one time. Calling it again would result in an error from that 3rd party library.

Immediately Executing Actions

When your app starts, you might want to immediately execute an action instead of waiting for an action to come in. It’s most-likely a bad idea to tightly couple an epic with Redux-Observable’s initialization, but it removes the need to call a an action to execute your epic.

A simple example is to log the app’s current version to the console so you can be sure the deployed version of your app matches up with the version you expected to be out there.

I’ve provided 3 separate examples, but you’ll probably only want to use one in your application.

Notice that we’re not using ofType. There are no Redux actions being emitted to trigger the app version logging. While this may seem like a good idea at first, it also means we have no control over its execution. I’d recommend against using immediate actions like this unless you really have a need. Either way, we can use this example to learn some more RxJS patterns.

Redux-Observable, prior to version 1, did not handle immediate execution properly and required using timer(0). The timer function takes a time and waits until that time has passed before emitting a 0 through the pipeline only once. To be clear, it passes 0 through the pipeline no matter what you’ve passed into the timer function.

Under the hood, it looks something like this:

In the other two examples, I use of and EMPTY. of will emit anything it’s given. If you pass it more parameters, each one of those will be emitted separate one after the other. In this case, we don’t care what we emit, so long as we emit something. We could even emit null, but we’re choosing to emit appVersion to make our code a bit cleaner.

EMPTY is a constant observable which completes immediately when subscribed. This is why we can use defaultIfEmpty. What happens is EMPTY calls observer.complete() and then the value in defaultIfEmpty gets passed down the pipeline so long as it’s in your pipeline. That’s why we’re not calling console.info(appVersion) directly.

Updating State

Probably the most basic example of Redux-Observable you’ll run into is updating the state. It’s not entirely obvious, but updating Redux’s state with an epic requires listening for an action and executing another action.

In this example, we have an authentication platform dispatching an UPDATE_USER_INFO action when a user signs in. There are a few ways we could handle this such as doing all the conversions directly in our reducer, but we can let our epic handle it instead.

First, we map (transform) our data to match how we’re going to store it, then we map to the action that our reducer listens to for storage.

Now, our userInfoReducer can be a simple as this:

Because you can do a lot more with an observable pipeline in your epics than your reducers, it’s a great process to have super simple, reusable reducers and utilize epics for all your data transformations and business logic. That way, you can concentrate on complex logic in Redux-Observable instead of complicating your reducers.

Utilizing the Store with Actions

A lot of the time, you’ll want to utilize something in state to execute other actions. This is where we get to the real power of Redux-Observable. It’s one thing to listen to actions, it’s another to bring together the entirety of your application’s state in one place and use it to execute other actions.

I’ve had quite a few situations where I actually want something out of the state, but maybe it’s not there yet. Maybe I’d like to wait until something’s in the store before my epic continues. There are a few ways of going about this. I’m going to go over a few simpler examples and explain why you’d use them, but I’ll leave the complicated one for the end of this article.

In this first example, we wait for the UPDATE_AUTH_INFO action to dispatch before doing anything. Our end goal is to dispatch the closeModal action once we’re authenticated.

Like our other observables, we’re listening for an action, then mapping. In this case, we don’t even care what UPDATE_AUTH_INFO has as a payload because we’re mapping to the closed-over state$.value. The .value prop doesn’t exist on most observables, but state$ is a BehaviorSubject which means you can grab the current value off at any time.

Next, we grab the current authInfo object from our state and pluck the isAuthenticated prop off it. It’s the same as doing:

map(authInfo => (
authInfo
.isAuthenticated
))

Next is a trick I use to filter truthy values with a lot less code. By passing the Boolean constructor function to filter, only values that are truthy will proceed. Be aware 0 and '' are falsey.

Once we know for sure the user’s authenticated from our state, we can execute a new operator: mapTo. This is just like a map, but it stores the value it receives at runtime instead of executing a function in the pipeline like map.

From here, we continue as usual, mapping our pipeline value to an action which will get dispatched by Redux-Observable.

Listening to State Updates Directly

Sometimes you’ll want to listen to state$ directly. This can be very efficient and decouples your epics needing to know which actions update which parts of the state.

In this example, we’re relying on the immutability of the Redux store through the distinctUntilChanged operator. This operator holds onto the previous state and compares that with the current value in the pipeline. If they match, it stops the pipeline right there. If the value has changed, continue through the pipeline.

JavaScript Events

Sometimes you need to hook into JavaScript events like listening for changes to localStorage, communication from an iframe, or interacting with a 3rd party library. This comes up a lot since 3rd parties almost never use observables.

We’re going to listen for changes to our window size because we’re going to be creating a custom implementation of an @media query.

We create a fromEvent observable using window, and the type of event we’re listening to: resize. This is the same as doing a window.addEventListener except it’s wrapped up nicely into an observable.

You can even listen to other kinds of events such as on:

someThirdPartyLibrary
.on('ready', () => {})

When using fromEvent, it looks like this:

fromEvent(
someThirdPartyLibrary,
‘ready’,
)
.pipe(
// This is your callback pipeline
)

Once those responses come in from our resize events, we’re going to limit them to 60 frames a second. This means resize notifications will only come in so long as they’re within this 60 frames-per-second time slot. That also means we’re only going to have a maximum of 60 resize events come through per second.

As clarification, we should really be using the requestAnimationFrame scheduler in RxJS, but we rolled our own for this example because it’s much simpler to explain.

AJAX Calls

If you’re hooking into API endpoints directly instead of through a GraphQL server, you’ll probably want to go through Redux-Observable to manage state updates from AJAX calls.

This is where dependencies come in. Instead of pulling in ajax directly, it’s best to inject it into your epic through the middleware function.

This allows for easier debugging. If you render server-side as well like I do, you’ll want to know the ajax function from RxJS only works when you have a DOM because it’s doing an XMLHttpRequest under the hood. You could always swap this out for something like axios or fetch as well.

There’s an operator in RxJS which will work in-tandem with ajax and that’s switchMap. When using switchMap, it completes the previous ajax observable which cancels the request. This is great for async actions because it means you can easily minimize race conditions.

It looks a little something like this:

Dependencies come in as the third argument of our epic creator. Your access token or id token should have the permissions stored in it; although, you might need to check with the associated API what your user’s capable of doing to ensure the UI doesn’t allow them more power than usual. In this case, we’re asking the API for all user permissions.

With this epic, there’s a lot more going on. Before we can even make an AJAX call, we need to grab the accessToken from the store using state$.value.

Now we’re seeing a new operator: switchMap. This, and a few others like it, are by-far the most-important operators since they allow you to chain observables together in a single pipeline. In our example, when you call the ajax function, it will return an observable.

Normally, we could return immediately after calling ajax and add our operators to the main pipeline. Instead, we’re going to pipe off of ajax directly. While it doesn’t make sense in this example, it has to do with how we’ll eventually handle errors.

After our successful AJAX call, we pluck the response property containing our permissions list then map to the storePermissions action.

Cancelling AJAX Calls When Using mergeMap

When pulling all permissions at once, you can probably get away with a single epic. Based on your architecture, you might be using React and each component might know which permission(s) it requires. In this case, especially if your app has a ton of permissions, you might fetch a single permission when you need it rather than pulling in the whole list.

You don’t want to create a separate epic for every possible permission. Instead, you’ll want to create a generic epic which utilizes mergeMap. To avoid race conditions in these situations, you’ll need the takeUntil operator.

Let’s start from the top. I’ve added a helper function to filter out permission actions based on a given permissionType. This is going to come in handy later in the epic.

Much of this epic is the same except we’re now passing in a permissionType with our action and need to combine it with a value off the state. I’m using an observable from earlier: of which takes the value passed and pipes it through. In this way, we can pipe off the of observable, grab the access token, and switchMap to ajax like before.

When you use mergeMap instead of switchMap, it will create a new observable and also subscribe to it. This operator can single-handedly create memory leaks if you’re not careful. Thankfully, ajax only fires once and completes after, but if you want the same cancellation behavior we had before with switchMap, we’ll want to use takeUntil.

takeUntil is given an observable and never pushes anything through the pipeline. When the given observable emits, it completes the current observable in the same way take would.

In our takeUntil, we’re listening for another FETCH_PERMISSION action that has a matching permissionType. If so, we’ll complete the observable which cancels the AJAX call. This allows another one to begin without incurring a possible race condition.

Delaying AJAX Requests

You might want to limit the amount of AJAX requests that go out at a time. For instance, you might be uploading files and want to do them in pairs of two instead of ten files all at once.

Using mergeMap, it will most-certainly send all ten files at once. On the other hand, using concatMap will limit it to one item at a time. If you want to limit it to two items at a time, use mergeMap with a second argument of 2. This can be really hard to catch if you keep everything in the same epic though.

A better way would be to use map and mergeAll like so:

This example is pretty complex, and it’s got a lot of concepts we haven’t seen yet. For instance, wrapping permissionTypes in from actually pipes each permissionType through as a separate value in the pipeline. We’re mapping those into a whole new observable which we mergeAll(2) to keep it down to two items being merge-mapped at a time. This restricts the load we’re putting on the API and ensures our application can handle larger and larger batches of permissions with ease. Other than those changes, it’s mostly the same epic as before.

The one major difference is how we handle the takeUntil. When FETCH_PERMISSIONS is executed, we now need to loop through each permissionType and see if any match up with our action call before cancelling.

Waiting for Merge-Mapped Observables

Now, what if you wanted to wait until all these merge-mapped observables complete but at the same time fire off the storePermission action? For that, we’ll use toArray and multicast.

I’ve actually used this functionality in a real application for an epic that controlled file uploads so it does have its uses.

The multicast solution is pretty amazing, but also hard to understand. You really need to be aware what this is doing if you wanna change the behavior of it, but I’m only going to cover the current implementation at a high-level in this article.

Subject has some special sauce where it’s both an observable and an observer. Because of this, we’re creating a separate Subject observable that is going to receive the same output at this point in the pipeline and is connected to the source observable (in this case from(permissionTypes)). We are going to merge the source observable with itself and pipe off the merged observable.

We’ve placed this multicast operator immediately after the mergeAll, so it will receive each permissionType response as they emit. Once all permission types have passed this area of the pipeline, toArray will emit an array of AJAX responses it gathered. Since we don’t care about those responses as we already handled them individually, we then map to the finishedStoringPermissions action.

Here’s a much much simpler example that you could copy-paste into just about any JavaScript playground:

Note: range(1, 4) is the same as from([1, 2, 3, 4]).

Catching AJAX Errors

How do we handle errors in Redux-Observable? The same way we do with RxJS… Kinda.

We use catchError to catch any unsuccessful AJAX responses. If not, they would be swallowed by Redux-Observable, and you’d never know. RxJS v6 solves the error swallowing, but it’s possible you want to handle that error anyway.

In this example, we open the error modal and pass it the error along with a stack trace. Note the use of the of observable again. catchError takes a function that returns an observable. Failing to do so results in another error entirely so if you’re using catchError, be sure you’re going to fire off an action.

We also have to put catchError on the ajax observable specifically because it wouldn’t be caught otherwise. This is why we pipe off ajax directly, because the action returned by catchError still goes through the pipeline as if catchError was never called.

Generic Error Catching

Like AJAX errors, you will want to put catchError on quite a few of your epics. In this example, we’ll send them over to our logging software by calling an error action. This way, those errors are also available in the Redux Devtools. We can also create an epic somewhere else in the codebase that simply listens for this error action and handles integrating with the 3rd party logging service using a side-effect.

If this example is executed server-side, window is undefined; thus, you’ll normally break your app, but with Redux-Observable, you’ll just hit the catchError and continue on as if nothing happened.

We’re purposefully making sure to still call storeCurrentUrl in this example, but since we don’t have window, we’ll pass a null string instead.

Not only that, but we’re also calling logError to notify our logging service at the same time because of the helpful observable merge. As you’ve probably gathered, merge allows us to execute multiple actions at the same time when we pass them in as separate arguments.

Because it’s going to come up, you should be aware there’s also a merge operator. I would say you could probably avoid using the operator version altogether, but because of a deficiency in how RxJS’s finalize operator works with Redux-Observable, you’ll eventually come to need it.

I briefly went over merge in the generic catchError example, but there are far more uses for it. Typically, before making AJAX calls, I’ll start a loading indicator. After the AJAX call is complete, I’ll fire more actions such as storing that data and stopping the loading indicator.

Our examples are getting a lot more complicated so we’re keeping it to this same permissions epic.

Using merge, we’re able to start the loading indicator right before the AJAX call and stop it immediately after. Sadly, there’s no real way to listen for both successful and unsuccessful with Redux-Observable. This is because the finalize method in RxJS is like tap. It calls an action when the observable either completes or errors, it does not actually execute anything. So you can use it when you need a side-effect, but not when you want to call an action in scenario one or two.

To get around this issue, I’ve added the operator version of merge which I’m going to alias as mergeOperator so it’s clear:

In this example, mergeOperator works similar to a finalize function, and we don’t have to duplicate our loaded action.

Listening to Two Observables

Our app has an error notification that automatically removes itself after 10 seconds, but also allows the user to hide the error manually before the timer runs out. Somehow, we have to find a way to listen to two observables at the same time and have them race to see which one happens first.

We’re using the same concepts as before, but now there’s a new observable in town: race. This observable emits the first value between the two observables.

Something to be weary, race doesn’t just stop as soon as a race occurred. While similar to Promise.race(), it actually keeps listening. When one observable emits, it lets that observable’s emission go through and waits until the other one emits as well. Once that one emits, it goes back to listening and seeing which of the two will win the next race. This continues as long as the race is still active.

To mitigate the infinite wait issue, I’ve placed take(1) on the HIDE_ERROR_NOTIFICATION action listener. timer(10000) completes after it emits so I don’t have to take(1) on it.

The last part that ends up being pretty weird is the filter. This is there to ensure we only call the hideErrorNotification action once. If the timer runs out first, hideErrorNotification will be called. Since it’s in a race, it won’t emit anything. But if the user hides the notification him or herself, then hideErrorNotification has already been called, and we don’t want to call it a second time. This is why it’s being filtered out.

Race to the State

If we’re loading components which dispatch actions requiring an accessToken, it’s quite possible those components will load prior to an access token being available in state. In this particular example, we don’t want to make an AJAX call without an accessToken.

Here’s the error-prone example:

This particular observable might make the AJAX call with undefined as the accessToken. You might be thinking we could use race to solve this issue; and we can, but it’s unnecessary.

To ensure you have an accessToken in your state, you need to listen to state$:

By listening to state$ and checking for a truthy value from our accessTokenSelector, we’re ensuring that we only make this AJAX call once we have an accessToken.

This only works because state$ is a special kind of observable called a BehaviorSubject which means state$ always has a value whereas action$ is stateless and can only listen for actions. This is why state$.value exists, and also why listening to state$ gives you the latest value.

In another situation, you might not want to wait for the accessToken to be available. Instead, you could write some logic after your accessTokenSelector to display an error if you don’t have an accessToken available.

Conclusion

I hope you’ve gained something from reading this article. I see it as an extension of the Redux-Observable documentation and an accumulation of about 9 months of my own experience with Redux-Observable and RxJS in production applications.

All of my applications using Redux-Observable have been built using these very same building blocks. There are plenty more solutions I’ve implemented with Redux-Observable, but these few should be plenty to get started!

More Reads

If you’ve got an interest in more topics related to Redux and RxJS, you should checkout my other articles:

--

--