New possibilities with Angular’s push pipe - Part 2
The main idea behind the Angular push pipe is a new way of handling change detection locally instead of the global handling used in async pipe by Angular natively. It is implemented in a way we can get zone-less performance in zone-full applications.

If you missed part 1 you should start reading it first as they depend on each other.
Here the links to all parts:
? New possibilities with Angular's push pipe - Part 1
? New possibilities with Angular's push pipe - Part 2
Working implementation under Rx-Angular :
-npm i @rx-angular/template -S
Live Demo:
- stackblitz.com/rx-angular
The push pipe implementation
The main idea behind the push pipe is a new way of handling change detection.
If all values are transported over Observables we know exactly when a value changes and could trigger the change detection.
Even more, we could trigger it only in the needed parts of our application.
In Angular's ViewEngine there are 2 options to render content:
- markForCheck
- detectChanges
markForCheck marks the related component as dirty and the next change detection of zone.js will trigger the rendering.
In comparison, detectChanges triggers the change detection immediately whether or not zone.js is present.
Ivy equivalents are:
- ɵdetectChanges
- ɵmarkDirty
As the push pipe should serve as a drop-in replacement for the async pipe, used in Angular v8, v9, and v10 we had to implement only a little change in the pipes code.
Switching from
tap(v => { this.value = v; this.ref.markForCheck(); })
to
tap(v => { this.value = v; this.ref.detectChanges(); })
With that in place we can already run Angular without zone.js.
If we would use it a bit and count the number of change detections we realize that multiple render calls are done for a single change.
A naive example could look like this:
@Component({
selector: 'app-display',
template: `
{{observable$ | push}}
`
})
export class DisplayComponent {
observable$ = of(1, 2, 3);
}
The listed code triggers change detection 3 times even if only one time would be enough.
This is really bad as we have now a slower app than with the original async pipe.?
To avoid this we have to implement intelligent coalescing logic for our pipe.
Jia Li - ? @Jialipassion the code owner and master mind of zone.js came up with some interesting findings.
He discovered that through event bubbling a click on a button triggers 2 calls within zone.js.
As solution to this he provided logic to coalesce multiple click events within the same event-loop tick by schedule the calls over the animation frame and only execute them once per loop.
Find the source here coalescing over animation frame and an insteresting article about it here: ? reduce-change-detection-cycles-with-event-coalescing-in-angular
Scheduling is pretty complex and I won't go too much into details. If you need more information here some links:
- ? RxJS schedulers in depth
- ? RxJS Scheduling in v7
Let's implement a minimal version of it as RxJS operator for our push pipe
export function coalesceWithPoc1<T>(
durationSelector: Observable<any>,
): MonoTypeOperatorFunction<T> {
return (source) => {
const o$ = new Observable<T>((subscriber) => {
const rootSubscription = new Subscription();
rootSubscription.add(
source.subscribe(createInnerObserver(subscriber, rootSubscription))
);
return rootSubscription;
});
return o$;
function createInnerObserver(
outerObserver: Subscriber<T>,
rootSubscription: Subscription
): Observer<T> {
let actionSubscription: Unsubscribable;
let latestValue: T | undefined;
return {
complete: () => {
if (actionSubscription) {
outerObserver.next(latestValue);
}
outerObserver.complete();
},
error: (error) => outerObserver.error(error),
next: (value) => {
latestValue = value;
if (!actionSubscription) {
actionSubscription = durationSelector.subscribe({
next: () => {
outerObserver.next(latestValue);
actionSubscription = undefined;
},
complete: () => {
if (actionSubscription) {
outerObserver.next(latestValue);
actionSubscription = undefined;
}
},
});
rootSubscription.add(actionSubscription);
}
},
};
}
};
}
The interesting parts are the `durationSelector` param and the next callback.
`durationSelector` is an Observable that specifies the execution context (or if we pass an interval also a time span) in wich the coalescing should take place.
In the next function we subscribe to the durationSelector and forward the last value to the outer observer:
next: (value) => {
latestValue = value;
if (!actionSubscription) {
actionSubscription = durationSelector.subscribe({
next: () => {
outerObserver.next(latestValue);
actionSubscription = undefined;
},
complete: () => {
if (actionSubscription) {
outerObserver.next(latestValue);
actionSubscription = undefined;
}
},
});
rootSubscription.add(actionSubscription);
}
Now we can use it in the push pipe!
? RxJS ships a prett cool feature in v7 called `animationframes`.
`animationframes` is a creation function that returns an Observable<number> similar to `interval`, but emitts on every `AnimationFrame` tick instead of the `setTimeout`.
Pretty cool huh? ?
@Pipe({ name: "push", pure: false })
export class PushPipe implements OnDestroy {
requestAnimationFrameId = -1;
value: any = null;
subscription;
observablesToSubscribeSubject = new Subject<Observable<any>>();
obs$ = this.observablesToSubscribeSubject
.pipe(
distinctUntilChanged(ɵlooseIdentical),
switchAll(),
coalesceWith(animationframes),
tap(v => {
this.value = v;
this.ref.detectChanges();
})
);
}
The coalesceWith operator takes animationframes as dur<ationSelector and forwards the last emission once in a animationFrame.
A marble diagram of the above code looks like that:
With this operator in place our push pipe is done quite nice.
To give a bit more technical context let me explain scheduling a little bit more.
Scheduling
Scheduling means trigger the execution of work at some point in the future.
A good way to understand that is to use flame-charts and analyze the code:
Here we see the execution of a simple function.
The button click is the trigger
The scheduled work is a render call:
This call is scheduled over an animationFrame and executes the work directly before the paint event (green dashed line)
So the coalesceWith operator takes an param to decide which scheduler method we want to use to coalesce
coalesceWith(durationSelector)
With that in place we could use different scheduling methods to prioritize the work.
Coalescing on a micro task looks like that
With that in place we are nearly done!
As the push pipe existed quite a while in my customers' projects until it got published as OOS lib we ran into another tricky problem when using the current implementation.
Applications still over rendered as the coalescing only worked for multiple sync values in a row, but did not respect multiple pipes in one component.
Practice showed this is a quite common case.
A naive example looks like that:
@Component({
selector: 'app-display',
template: `
{{id$ | push}}
{{firstName$ | push}}
{{lastName$ | push}}
`
})
export class DisplayComponent {
id$ = of(42);
firstName$ = of('John');
lastName$ = of('Doe');
}
This example rerenders 3 times as we coalesce separately for every pipe.
This is still 2 too much. ?
To overcome this we need a scope in which we coalesce the calls.
const scope = {numCoalescing: 0};
// ...
coalesceWith(durationSelector, scope)
In the operator we could implement the scoping in a naive way like this:
const tryEmitLatestValue = () => {
if (scope.numCoalescingSubscribers <= 1) {
outerObserver.next(latestValue);
}
};
// ...
next: (value) => {
latestValue = value;
if (!actionSubscription) {
++scope.numCoalescingSubscribers;
actionSubscription = durationSelector.subscribe({
next: () => {
--scope.numCoalescingSubscribers;
tryEmitLatestValue();
actionSubscription = undefined;
},
complete: () => {
if (actionSubscription) {
tryEmitLatestValue();
actionSubscription = undefined;
}
},
});
rootSubscription.add(actionSubscription);
}
You can see running examples here: stackblitz.com/coalesceWith
If we use it in Angular we have the component as scope.
Hmmm... This is problematic as we would mutate the components scope and could introduce nasty bugs.
For this very manner we could use a `WeakMap`.
A `WeakMap` can be used to manage some state, in our case the number of subscribers, of an instance by using a weak reference.
This enables us to:
- manage a set of properties of a compnent without mutationg its state
- avoid memory leaks as it is referenced indirectly
The implementatoin of the WeakMap can be found here: coalescing-manager.ts
This brings us exactly ONE RENDERING of the component in question.
This is the fastest possible way to render a component out there.
The implementation of the basic functionality is done!
Performance improvements



You can see the feature set of the fully fleshed pipe in the official repo:
Rx-Angular ?
Summary
The push pipe even if it's an old idea, with the latest implementation it is the first time it really opens the door to a new way of detection changes in Angular.
A way where the developer is aware of rendering and empowered to write fully reactive high performant applications.
Full Implementation details:
- coalescing-manager.ts
- push-pipe.ts
Wanna npm i @rx-angular/template -S
it out???
Rx-Angular went beta recently! ???
---
Thanks to:
- Jia Li - ?@Jialipassion the main blocker related to zone.js were only solved because of his help.
- Lars Gyrup Brink Nielsen - ?@LayZeeDK for knowing everything about Ivy possible!
- Nicholas Jamieson - ?@ncjamieson pointed me to `WeakMaps` and helped me with the scheduling part.
- The amazimg core team of rx-angular for feedback and review: Kajetan Świątek, Kirill Karnaukhov, Julian Jandl
---
Resources
You can find the source code of the examples as well as all the resources in the repository
- ? Rx-Angular - Push Pipe on GitHub.
- ? Event Coalescing in Angular
- ? Zone docs
@NetanelBasal
- ?reduce-change-detection-cycles-with-event-coalescing-in-angular
@Michael_Hladky
- ? New possibilities with Angular's rendering and the push pipe - Part 1
- ? Angular Push Pipe Design Doc
- ? RxJS Scheduling in v7
- ? RxJS schedulers in depth
Used RxJS parts:
- of
- tap
- filter
- distinctUntilChanged
- switchAll
- Subscription
- Observable
- Subject