Rx.js Operators, Part II
It’s been more than 2 years after I published my first article on Rx.js, and now it’s time to talk more about its operators. Lots of stuff has changed since then; we didn’t even have the .pipe method back when I wrote the first piece.

It’s been more than 2 years after I published my first article on Rx.js, and now it’s time to talk more about its operators. Lots of stuff has changed since then; we didn’t even have the .pipe
method back when I wrote the first piece.
In the meantime I used Rx.js more and more in the applications I’ve been working on, and the main insight I got from that experience is this:
Developers’ biggest problem with Rx.js is not knowing more operators
The amount of code that was made better just by using one or another (or a combination of) operators has been ridiculous. Of course, expecting someone to know all of them is an overkill (granted there are over a 100 operators in Rx.js), but at least understanding some of the important groups can make coding (especially in Angular) so much easier.
So, without a further ado, let’s dive in
debounceTime vs auditTime
If you ever tried to make an autocomplete input with Rx.js, then you probably heard about debounceTime
: this operator, as the documentation says,
Emits a value from the source Observable only after a particular time span has passed without another source emission.
Which means, if we debounceTime(3000)
, for example, we will be getting notifications after the source has sort-of settled down for at least 3000 milliseconds. This is useful if we want to make a request for information from a server after the user finished typing, and don’t want to hit the server with useless requests before the user has actually finished typing. Here is an example:
const inputEl = document.querySelector('input');
fromEvent(inputEl, 'input')
.pipe(
debounceTime(300),
map(event => event.target.value),
switchMap(query => from(fetch(`https://some-url?q=${query}`)))
)
.subscribe(console.log);
If we now remove the debounceTime
call and type “Hello” really fast, we will make 5 requests to https://some-url?q=
, 4 of which are completely unnecessary. But with debounceTime
we will only hit the server after the user stops typing.
So far so good. But what is auditTime
then and what use cases does it have?
As the documentations states, auditTime
Ignores source values for duration
milliseconds, then emits the most recent value from the source Observable, then repeats this process.
This may feel a lot like debounceTime
, but it really is not. While both of this functions ignore some of the emissions based on a time interval, there is a significant difference: debounceTime
waits after each emission for a given time period, and if there are no new emissions, it will allow that last emission to pass; auditTime
does not care about emissions; instead, it comes back every given interval, checks if there have been any emissions in between the previous checking and the current one, and if there are, it lets the last one of them to pass. Imagine having an Observable
of Facebook messages coming from all your different friends. If you auditTime(3000)
this Observable
, and in 3 seconds Anthony and Julia send you a message in that order, then at the top of the checking you will receive a message from Julia, but the message from Anthony will be lost forever.
But what is a use case for this operator? Imagine a situation where we receive emissions from a source that emits very frequently, say, a WebSocket
conveying live data on stock exchange rates fluctuations. Every time we receive an emission from that source, a chart representing stock values is being repainted, depending on new values, which is a costly operation. Because this rates can change very fast, and the user won’t notice the smallest fluctuations that take place in <500ms, we can auditTime
it to only repaint the chart every 500 milliseconds:
observableFromSocket$
.pipe(
auditTime(500),
)
.subscribe(repaintChart);
scan vs reduce
Both of this functions are aggregating emitted values, for example, counting the sum of numbers in an Observable
of an Array
. The main difference is that reduce
emits only once — as soon as the source Observable
completes. So we can use it to calculate the average age of users, for example:
of(23,25,24, 25, 25, 25)
.pipe(
map(age => [age, 1]),
reduce(([accAge, accCount], [age, count]) => [accAge + age, accCount + count]),
map(([sum, count]) => sum / count)
)
.subscribe(console.log);
In this example, we map ages of people to a tuple containing the age and the number 1 (which is the count). We then add up the ages and increment the count separately, still keeping the tuple, and after the observable completes, map the the result to sum / count
, which is the average.
scan
, on the other hand, emits each aggregated value. Every time it adds up a new number, an emission will fire. Here is a use case: imagine a webpage that displays donations from different people. Every time someone makes a donation, and Observable
notifies us of the amount being donated. It does not show us the entire collected money, it only shows the amount of a new donation. So we essentially want to aggregate the donations as they arrive and update the DOM every time a new donation arrives:
donations$
.pipe(
scan((acc, next) => acc + next),
)
.subscribe(updateAmount);
distinct vs distinctUntilChange vs distinctUntilKeyChange
The name of the distinct operator speaks for itself — it only allows emissions that haven’t already happened. Here is a quick example:
of(1, 2, 3, 2, 4, 4)
.pipe(
distinct()
)
.subscribe(console.log);
This example will log 1, 2, 3, 4
, omitting the duplicate emissions of 2
and 4
.
What is a good use case for this operator? Mainly, disallowing repeating actions on the same data entry. For example, say we have a Subject
which fires a product
object every time the user deletes one. Of curse, we can only delete a product once, but user may click on the button twice before our request is fulfilled, so we may want to use only distinct
values. In this case we want the distinct
operator to work on the user id-s instead of the object references, but Rx.js has got us covered: the distinct
operator can accept a mapping function which can help it determine by what part of the emission to make the distinction. Here is an example:
interface Product {
id: number;
title: string;
}
deleteProduct$ // a Subject emitting Products
.pipe(
distinct(product => product.id),
)
.subscribe(product => ProductService.delete(product));
But what if we don’t want two subsequent identical emissions, but two identicals in the overall stream are acceptable? For example, say we have a game in which the player has to answer some questions from different fields of science. One can choose from math, physics, history, geography and literature. The user chooses a field of science and is presented with the question. But we have to ensure that the player must answer question from at least two different areas, so they cannot choose the same field twice in a row. Here is where distinctUntilChanged
comes to play:
fieldsOfStudy$
.pipe(
distinctUntilChanged(),
)
.subscribe(field => presentQuestion(field));
This will now allow duplicates, but not one after another.
Of course, as with the distinct
operator, we may provide a function to check distinction on a nested value rather then the emitted value itself. But Rx.js went as far as to provide a shorthand for this: distinctUntilKeyChanged
. Here is how it works:
of<Product>(
{id: 1, title: 'Chair'},
{id: 1, title: 'Chair'},
{id: 2, title: 'Table'},
{id: 1, title: 'Chair'},
).pipe(
distinctUntilKeyChanged('id'),
map(product => product.title),
).subscribe(console.log);
This code will log “Chair, Table, Chair”.
takeWhile vs takeUntil
Sometimes we just need manual methods of stopping emissions from an Observable
source. Actually, there are three possible scenarios:
- Take only a fixed amount of emissions
- Take emissions while they satisfy a constraint (example: read a list of numbers while they are lower than 5)
- Take emissions until some external event happens.
Actually, the names of the methods that Rx.js has for this are exactly descriptive of these scenarios. I am not going to cover the take
operator, as it is pretty straightforward. As to the other two, here is a simple example:
of(1, 2, 3, 4, 5)
.pipe(
takeWhile(n => n < 4),
)
.subscribe(console.log);
This will only log “1, 2, 3”. This behavior may seem similar to filter
, but it is very important to understand that takeWhile
actually unsubscribes from the source completely — filter
on the other hand will just disallow some emissions. It will resume after some other emissions do satisfy the constraint. takeWhile
, on the other hand, will cut off forever.
But what if I want to stop emissions based on some external event? Say, I want to log something every second, but only for 10 seconds? So I will need to stop the emissions after that amount of time?
We can actually achieve it by using takeWhile
:
let stopped = false;
setTimeout(() => stopped = true, 10000);
interval(1000)
.pipe(
takeWhile(() => !stopped),
)
.subscribe(console.log);
This works fine, but looks pretty terrible. As a matter of fact, Rx.js does provide a way of handling things like this: the takeUntil
operator accepts an Observable
, and will continue reading emissions until the said Observable
emits some value. Thus we can use it by passing an Observable
to it, which will fire in 10 seconds (using timer
). Here is the code:
interval(1000)
.pipe(
takeUntil(timer(10000)),
)
.subscribe(console.log);
This pattern is actually quite popular in Angular, when unsubscribing from Observables
in our components. I elaborate on it in my article Harnessing the Power of Mixins in Angular.
See the reference section to more about takeWhile and takeUntil.
Conclusion
As I mentioned, Rx.js contains hundreds of operators, each of them accomplishing some fascinating features. I will continue talking about them in my follow-up articles.