Rxjs + For + Await… What?
This article explores the `rxjs-for-await` library by Ben Lesh. Read it to learn about use cases and technical details.

If you have been using TypeScript/ES7+ lately, you have to be familiar with the async/await
keywords and their usefulness when working with asynchronous code. Here is a basic example to freshen up your memory:
async function request() {
const result = await fetch('some-api-url');
const data = await result.json();
return data.items; // for example a nested array;
}
const list = await request();
list.map(/*do stuff with the array*/);
(notice I used top-level await
which is available in TypeScript v3.8)
In this example we create a basic Promise
and handle it in a sync-style code, without calling .then
and callbacks. This makes our code look more linear and the lack of callbacks make our code easier to reason about.
Then what is for-await?
Of curse, Promises
represent a result of an asynchronous operation, but what if we have a process (instead of a singular task) that will emit events on a timeline? (sounds familiar, right?). Here is where for await
comes into play:
for await (const event of events) {
handleEvent(event);
}
So here we have a stream of events (important: this is not an array, but a stream and emits events asynchronously), but instead of providing a callback to handle events over time, we use for await
to deal with them in a more linear fashion. This, together with usual async/await
allows us to escape callbacks in our codebase entirely!
Or does it?…
RxJS Observables are more powerful than Promises
Obviously, not only are RxJS Observables
capable of streaming lots of events (as opposed to a singular result from a Promise
), but they also have powerful operators to deal with the data in a beautiful, functional way.
But can we use async/await
with RxJS? Turns out not. Or at least not entirely.
Of course, if our Observable is going to emit just one value and then complete, we can always use toPromise
and then apply async/await
. And before you think “why would I need an Observable
of just one value” bear in mind, that Angular’s HttpClient
is built on the premise of Observables
emitting a singular value and then shutting down. SO with HttpClient
, we can cast the resulting Observable
to a Promise
and leverage the powers of both async/await
and RxJS operators!
Of, so far so good. But what about streams of events?
import { fromEvent } from 'rxjs';
const clicks$ = fromEvent(document.body, 'click');
Here I have a stream of all the clicks on the document’s body
. Obviously — this does not do much, and I need to handle those events — log them to the console, at least. Here is how would we do that:
const clicks$ = fromEvent(document.body, 'click').subscribe(event => console.log(`Event ${event} occured`));
But this is a callback inside the subscribe
function. Can we use async/await
with this code to avoid callbacks? Not really, toPromise
relies on our source Observable
to complete, and this particular Observable
never completes. So are we stuck with the callback?
?Announcing rxjs-for-await! A library providing different strategies for subscribing to #RxJS observables using async/await and `for await` loops
— Ben Lesh (@BenLesh) March 18, 2020
Now available on npm!https://t.co/CwlI0O44Up pic.twitter.com/l9tLlseOUR
So what is it? As seen in the screenshot, this library provides a eachValueFrom
function which converts a source Observable
into an AsyncInterable
(read more on AsyncIterables
here). This, in its turn, allows us to use for await
on an Observable
stream! Here is how we can refactor our code:
import { fromEvent } from 'rxjs';
import { eachValueFrom } from 'rxjs-for-await';
const clicks$ = fromEvent(document.body, 'click');
async function handleClicks() {
for await (const event of eachValueFrom(clicks$)) {
console.log(event);
}
}
handleClicks();
Now we don’t have to write any callbacks at all — we can just use for await
with RxJS Observables
now.
Potential use cases
Of course this does not mean that you have to run and refactor all your apps right away; but there are cases where this neat little thing will come into help.
One of those cases is, actually, unit tests. When writing tests, it is important to keep them as plain and simple as possible. Introducing subscribe
calls with callbacks introduces unnecessary complexity and nesting inside unit tests (remember — tests should read like plain English to properly reflect what the app does). For example, think about this piece of code:
describe('AppComponent', () => {
beforeEach(async(() => {
TestBed.configureTestingModule({
imports: [
RouterTestingModule
],
declarations: [
AppComponent
],
}).compileComponents();
}));
it(`should have a source$ Observable emit numbers`, () => {
const fixture = TestBed.createComponent(AppComponent);
const app = fixture.debugElement.componentInstance as AppComponent;
app.start();
let counter = 1;
app.source$.subscribe(data => {
expect(data).toBe(counter);
counter++;
});
});
});
This test calls for a start
method on the AppComponent
, which is supposed to create a stream of numbers incrementing by one. Then we check if the stream does, in fact, emit such numbers. Obviously, we used subscribe
to get the emitted values, which created a third level of callback nesting. If we had to check something with a callback inside that last callback, which happens fairly often with unit tests, we would already have callback hell. rxjs-for-await
will help us deal with it:
it(`should have a source$ Observable emit numbers`, async () => {
const fixture = TestBed.createComponent(AppComponent);
const app = fixture.debugElement.componentInstance as AppComponent;
app.start();
let counter = 1;
for await (const data of eachValueFrom(app.source$)) {
expect(data).toBe(counter);
counter++;
}
});
Here we just used eachValueFrom
to be able to implement the same assertions with for await
, and thus avoided nesting callbacks. Also, if we need to check another async thing inside, we can use async/await
, so no callbacks at all.
Conclusion
While we shouldn’t be running to change our codebases completely, this library provides us with the ability to utilize newest features of ES while enjoying the power of RxJS.