RxJS: applying asyncScheduler as an argument vs with observeOn operator

This article explains the difference between applying an asyncScheduler as an argument and passing it to the observeOn operator

RxJS: applying asyncScheduler as an argument vs with observeOn operator

If you are not familiar with Schedulers in RxJS, there is a short overview in my article about queueScheduler. In short — asyncScheduler allows you emit each value in a separate macrotask (in terms of browser event-loop queue tasks). If you want to find out more about how the event-loop works — watch this wonderful video and read this great article.

If you want to apply a scheduler to an observable sequence you can do that in two ways:

  1. Some factory functions (like of, from, range, etc) have an optional SchedulerLike param. Like this:
 range description on rxjs-dev.firebaseapp.com

For example:

of(1,2,3, asyncScheduler)

2. You can apply it with the observeOn operator:

For example:

of(1,2,3).pipe(observeOn(asyncScheduler))

Both of these solutions make each separate data emission happen in a separate macrotask (so each emission is queued in the browser event-loop macrotasks queue).

But, they do that in a slightly different ways. Usually, you don’t have to worry about it, but in some specific cases this difference can be important.

Let’s review how each solution works in detail:

Argument scheduler

of(1,2,3, asyncScheduler).subscribe(console.log)

Steps:

  1. of emits the first value according to the scheduler (the next macrotask in this example)
  2. It puts the next value to the scheduler queue (another macrotask)
  3. Steps 1 and 2 are repeated with each of the next emissions.

Since only next value emission is scheduled in the browser event-loop queue, theoretically there is possibility that other macrotasks can run between the values being produced.

What about the observeOn way?

of(1,2,3).pipe(observeOn(asyncScheduler)).subscribe(console.log)

Steps:

1. of produces all the values, they are buffered by the observeOn operator.

2. observeOn schedules all values, but separately with the specified Scheduler (separate macrotask in example) and puts the respective tasks in the event-loop queue.

This means that all the emission tasks are put into the event-loop queue one after another, so it isn’t possible for other tasks to run between them.

You can check these statements in a codepen.

Here is our code for the argument scheduler:

let Rx = window['rxjs'];
const {of,
       queueScheduler,
       asapScheduler,
       asyncScheduler,
       animationFrameScheduler
      } = Rx;
const {observeOn, tap} = Rx.operators;
console.clear();

setTimeout(() => console.log('It will runs just after this Macrotask'))

 let source$ = of(1, 2, 3, asyncScheduler).pipe(
   tap((v) => console.log('tap ', v))
 )

source$.subscribe((v) => {
  console.log('Value ', v);
  Promise.resolve().then(() => console.log('Microtask value ', v));
  setTimeout(() => console.log('MAcrotask value ', v), 0);
});
Argument scheduler gist

In the subscription handler I use console.log to print the value for the current macrotask. Also Promise.resolve.then is used to print the value in the microtask just after the current macrotask. And finally, setTimeout prints the value in another macrotask according to the event-loop queue order.

And this is its output:

OK, so what do we have here? Macrotask value 1 is printed after Microtask value 2. So what? This means that before of observable did the first emission, it had already scheduled the next emission in event-loop queue.

Packtpub.com and I prepared a whole RxJS course with many other details of how you can solve your every-day developer’s tasks with this amazing library. It can be interesting for beginners but also contains advanced topics. Take a look!

OK, now let’s run the code with asyncScheduler applied by the observeOn operator:

let Rx = window['rxjs'];
const {of,
       queueScheduler,
       asapScheduler,
       asyncScheduler,
       animationFrameScheduler
      } = Rx;
const {observeOn, tap} = Rx.operators;
console.clear();

setTimeout(() => console.log('It will runs just after this Macrotask'))

let source$ = of(1, 2, 3).pipe(
   tap((v) => console.log('tap ', v)),
   observeOn(asyncScheduler)
 )

source$.subscribe((v) => {
  console.log('Value ', v);
  Promise.resolve().then(() => console.log('Microtask value ', v));
  setTimeout(() => console.log('MAcrotask value ', v), 0);
});
 Scheduler with ObserveOn operator gist

Now the output is a bit different:

The first of observable emitted all the values. Then observeOn scheduled all the emissions in separate macrotasks and put them in the event-loop queue. That’s why all the setTimeout log outputs from the subscription handler run at the very end — because the event loop queue already contained the scheduled tasks from ‘observeOn’.

Conclusion

Applying a scheduler as a factory function argument is much more CPU/memory resource efficient if the number of emitted values is substantial. So if you plan to apply asyncScheduler to such expression like

range(0, 1e10).pipe(tap(v => doSomething(v))) 

you definitely should prefer to applying it as a range argument.

// non-efficient
range(0, 1e10).pipe(
  tap(v => doSomething(v)), 
  observeOn(asyncScheduler)
)
// efficient
range(0, 1e10, asyncScheduler).pipe(
  tap(v => doSomething(v))
)

I hope you enjoyed this article. Please leave comments with your personal use-cases of RxJS schedulers!

Like this article? Let’s keep in touch on Twitter.

Starting from section 4 of my RxJS video course advances staff is reviewed — so if you familiar with RxJS already — you can find something useful for you as well: higher-order observables, anti-patterns, schedulers, unit testing, etc! Give it a try!