Understanding RxJS Operators

André Braga
ITNEXT
Published in
10 min readJun 13, 2019

--

Hey guys. I’m here today to talk about RxJS Operators. I hope that, at the end of this article, you’re aware of the main differences and how/when to use them. 😊

Introduction

As you know, RxJS brings a lot of great functionality into our Angular applications and one of the things that I really like about it is its support for subjects — you can read about it here.

In this article we’re going to focus on some of the operators that RxJS offer us. We will go through them by groups to organize the information as best as we can 😊.

Error Handling

There are many operators that can help us work with errors in our streams. Let’s take a look at some of them.

catchError: It allows our code to be notified when an error occurs in the source stream and reacts by either passing the error along or emitting another observable. In either event, it prevents an unexpected or unhandled stoppage in the stream.

throwIfEmpty: If our source completes without emitting anything, this operator will force that to be considered an error. I know, nothing says that our source must emit something before it completes, but this operator gives us that capability.

Here we’re explicitly setting up an empty observable. We pipe it through, throwIfEmpty and the results when we run it is, we get our error message that will say “no elements in sequence”. This means that if our source doesn’t emit any values, we’re going to consider that an error.

retry: It is pretty straightforward and useful for things like HTTP requests that may fail due to network issues. This operator allows us to go ahead and to re-subscribe to try again, up at certain numbers of retries, if our source throws an error. You have to have in mind that it starts at the beginning of that observable.

Filtering to Multiple Results

Often, when working with streams, we don’t need or want to process every value. So, that’s where operators come in. They let us filter the stream to a subset of items based on the criteria we specify.

skip: We must provide a number of values that we don’t care about and that number will be filtered out from the beginning of the source observable.

In this example, we’re emitting 5 values and with skip we’re telling that we want to ignore the first 3 values. This means that we will get the values 4 and 5 in our output.

skipWhile: Allows us to evaluate the values coming from our source and use them to determine whether we should skip. This is an one-time determination. Means that once the supplied predicate function evaluates too false, every value after that is accepted and emitted to the subscribers.

Here, we’re looking for our value to be less than 4. So, our condition turns false at the position 2 (value 4) and we stop to skipping values. Now, we know that our output will be 4, 2, 1 and 6.

take: Is the opposite of skip. Here we provide a number value as a parameter and that’s the number of items that will be accepted from the source and passed along to our subscribers.

In this example, we will get 2 and 3 in the output.

distinct: Allows us to eliminate duplicates from our stream. There are a couple of approaches to do this. If we don’t provide any parameter, then it will exclude all duplicates. If we provide a function, it will use that function to determine what duplicate means. The function will receive each value emitted from the source observable in sequence and then returns a potentially modified value, which is what is used for the comparison to determine uniqueness.

Here will we get only the values that are unique in the source. So, we get the numbers 1, 2, 3 and 4.

distinctUntilChanged: This is a bit different from the plain distinct in that will only compare a new value emitted from the source observable to the immediately previous value. If they’re different, the new value will be passed along. If they’re identical, the new value will be dropped. In this case, it doesn’t matter if the value had been emitted prior to the immediately value because the look back window is only 1.

Do you know the output here 😊? We’re going to get the output of the first 2 and then the next two 2’s are skipped because the value hasn’t changed. The 3 gets emitted because it’s different from the 2. So, the final output will be: 2, 3, 1, 2, 1.

filter: Allows us to get as fancy as we would like our elimination of values from the source observable. We need to provide a predicate function to do the comparison.

We can specify any comparison function that we want. It’s going to receive each value of our source and pass it through that filter function that we specify. If that function returns true, then that value is going to be passed along to our subscribers. If it returns false, then the value is going to be dropped. In this case our output will be: 3, 1, 1, 1.

Filtering to One Result

Now, we’re going to look at operators that let us filter values coming out of a source observable. However, these operators are limiting the source to just a single value.

first: Emits just the first value from a source and the unsubscribes.

In this case it will output the value 2. Note that we can also pass a function as we did on filter.

elementAt: Let us emit only the value from the source observable from the specified position.

This is very straightforward, so the output here will be 1.

find: Allow just the first instance of a value that causes the predicate function to be true to passed along to the subscribers. Once that first value as passed along, the source observable is unsubscribed. So, no additional values are emitted.

The first value that meets with our criteria is 5 and this value will be our output in this scenario.

single: Maybe, this is a little bit different from what you will expect. It emits one of the tree things: True, Error and Undefined.

True: It has one and only one value that matches with our criteria.

Error: If there’s more than one value that causes the predicate function to return true.

Undefined: If the source emits values, but none cause the predicate to be true.

Grouping Observables

This is useful when we have multiple source observables and we want to bring together and to emit their values as a single stream.

combineAll: Takes an observable as its source and emits the latest value from each observable when any one of the inner observables emits.

In the previous diagram when ‘a’ emits, nothing happens because the other observable hasn’t emitted yet. When ‘c’ emits, the subscribers receive the latest value emitted from each source: ‘a’ and ‘c’. When ‘b’ emits, the latest value from each is ‘b’ and ‘c’. Finally when ‘d’ emits, the latest value is ‘b’ and ‘d’.

concatAll: Emit all values from each source observable, finishing one observable before the beginning the next.

In our example, we have two different arrays: workingDays and weekend. Then we’re going to concatenate them together. To do this we build our observable of observables and pipe that through concatAll and even though the weekends observable emits essentially simultaneously, workingDays will fire first. In this case our output will be: Monday, Tuesday, Wednesday, Thursday, Friday, Saturday and Sunday. This is what concatAll give us, the ability to combine multiple sources into a single output.

Grouping Values

Values coming out from observables are emitted one-at-a-time. However, sometimes we need to work with those values in groups.

groupBy: It organizes the values from the source observable according to a condition that we provide as a parameter.

Here, we subscribe to our collection of players, we pipe through groupBy, specifying the id as the property we want to groupBy. So, we’re going to pass into mergeMap all of our players as separate observables grouped by id. Then, with reduce we’re going through and totaling up all of the goals and weeks by id and in this way, we will have a single entry that combines all of that information into a single entry per id. Our output will be something like this:

pairwise: Groups values in pairs. This is useful, for example, to compare the values that were emitted.

This will give us the values coming out of our source as an array of two values like this: [1,2] [3,4].

toArray: Collects all of the values, stuffs them into an array and passes them along to any subscribers. Nothing is emitted from toArray until the source closes and then the array arrives as a single value.

We’re also using the take operator. Do you know the output in this scenario? If your answer is: [0, 1, 2, 3] congrats, you are correct 😊.

Value Transformation

in some way, each of these operators takes the output from a source observable and modifies or replaces the value before passing it along to subscribers.

concatMap: takes each input value and passes it to the function provided to the operator as a parameter. The result from the function is then passed to the subscribers and the next value from the source is processed.

In this example we will get the following output: 1, 2, 2, 4, 3, 6, 4, 8. This means that we get each value and then each value multiplied by 2.

defaultIfEmpty: Allows us to specify a value to be emitted from a source if it completes without emitting any values.

Here we have an empty observable and we pipe it through defaultIfEmpty. Note that whatever value we provide to this operator is what’s going to come out to the subscribers when we’re using an empty observable. In this case ‘-1’.

map: Maybe the single most popular operator in RxJS. It allows us to take every value coming out of a source and run it through a function that we provide. Then the value returned from that function is what gets send to subscribers.

In this case our output will be: 1, 4, 9, 16.

reduce: Only emits the final accumulated value. Takes each value from the source and passes it sequentially to a function that we provide, as we saw in our groupBy example. That function also receives the cumulative result of all previous calls to the function. It then returns a value, which is used as the cumulative result for the next call to the function. Reduce doesn’t emit until the source observable completes, at which point it sends the final accumulated value onto the subscribers.

mergeMap: First, I want to warn you that this operator is also called flatMap, so you’ll see both names used. flatMap is literally exported from the RxJS source as an alias of mergeMap. So, whichever name we use, it runs the exact same code. We use either to flatten an inner observable but want to manually control the number of inner subscriptions. If only one inner subscription should be active at a time we must use switchMap.

switchMap: We use to subscribe a new observable every time our source emits. The new observable that we subscribe is generated by the factory function we pass in to switchMap.

Other Useful Operators

tap: This operator is simple, it receives every value emitted from the source, let us take an action and then passes that same value onto other operators or to subscribers. The action that we take is typically some type of side effect, something outside of the typical data flow. Logging the value in some way is quite common, but it really can be anything.

count: Simply waits for a source observable to complete and then it just emits the total number of values that came from source. None of the values actually emitted from source get passed along to the subscribers, they just receive that total count.

Conclusion

I hope that this article will be helpful to you. So, feel free to use this as a quick refresher resource.
Remember that you can (or must) also take a look on the official RxJS documentation, where you can find marble diagrams and more examples, even some operators which were not approached in this article.

--

--