Build Your Own Observable part 4: Map, Filter, Take, and all that Jazz
Today, we’re going to build some basic operators that will let us compose operations over streams of data, such as map ,filter, and take. Operators are a central part of RxJS, since they let us compose new streams of data from our input streams.

Welcome back. In the previous article, we looked at RxJS
’s take on the Observer pattern, and created our simple Observable class. We also added some functions for creating observables from data streams, such as of
, from
, and fromEvent
. If you’re new to this series, I recommend familiarizing yourself with the previous installments:
Today, we’re going to build some basic operators that will let us compose operations over streams of data, such as map
,filter
, and take
. Operators are a central part of RxJS, since they let us compose new streams of data from our input streams. This lets us do more powerful stuff with the asynchronous data in our system.
As we’ll see, the first rule of RxJS is that everything can be modeled as a stream, including the solution to our async problem, so composing streams to create new streams is very powerful, as it will let us get to our solution.
Operators
Please note that the examples in this article use the RxJS v5 chaining syntax. I’ll cover v6 pipeable operators in a later article.
Operators are functions which take in some number of input streams, create a new output stream, and then perform some operation over items emitted by the input streams, and emit the processed results on the newly created output stream. You’ll often see them represented using marble diagrams:

The black arrow with the shapes on top represents the input stream. Each of those shapes could be some data arriving over time, like DOM events. The result of applying the filter operator on that input stream is the output stream on the bottom of the diagram. Each time something is emitted on the input stream, the filter operator checks to see if it’s a circle, and if so, emits it on the output stream. Streams are immutable, so the source stream is not affected by this operation.
Let’s pause for a second, and try to develop an intuition for what is going on here. Why all this talk about composing things? Why is that word so popular in RxJS?
Composing streams just means to take a stream, and apply some operator to it, which will produce a new output stream. That stream can then be fed into another operator, to produce another output stream, and so on. Think of it as chaining (or piping) streams.
…again…why do this?
Well, the first rule of RxJS was that anything can be modeled using a stream. Which means that whenever we are trying to solve some async problem, the solution to that problem can also be modeled as a stream.
This becomes our goal when using RxJS; to identify what the solution stream to our problem looks like, to identify what the input streams are in our problem, and to then compose operations over those input streams, until we produce our desired output stream.
Sounds a bit abstract and hand-wavey, right? Well, consider the simple example presented in the marble diagram above. We want a stream full of circles, and nothing else. That’s our goal (funny, I know). We have an input stream, which will emit a lot of shapes, some of which are circles. So our task becomes finding a series of operations which can produce a new stream containing only circles, given our input stream. In this case, that happens to be a simple filter
operator. Applying the filter
operator to our input stream produces the desired output stream, and we’re done.
Still not convinced? Ok, let’s try something a little more concrete. Let’s do the obligatory drag event stream example.
What is a drag event? One way of thinking about it is that it’s a mousedown
event, followed by some number of mousemove
events, and terminated by a mouseup
event. So let’s identify our streams:
- Inputs: mousedown, mousemove, and mouseup streams
- Desired output: A stream of drag events
We’ll start by building our input streams using fromEvent
.

We said that our drag event starts when we get a mousedown
:

Given a mousedown
, we want to start listening for mousemoves
and mouseups
(we don’t want to listen for these ahead of time, since our drag event starts with a mousedown
). There are a few different operators that can be used here. We’ll use concatMap
. Don’t worry too much about what it does, as we’ll build it from scratch in a later article:

We want to listen for mousemoves
until we get an emission from our mouseup
stream. The takeUntil
operator works perfectly for this. Again, don’t worry if you don’t know exactly how takeUntil
works. Just focus on the approach here:

This is starting to look good. We’re now listening to all of the mousemove
s. Eventually, a mouseup
will come along, and complete the drag event:

So to recap. We identified our desired output ( a stream of drag events), as well as our inputs (mousedown
, mousemove
, mouseup
), and then composed those input streams together to produce our output stream.
Try to keep this intuition in mind when solving things “The Rx way”.
By the way, the topic of composition comes up a lot in software, especially as you get closer and closer to functional programming. If you’d like to know more about it, I suggest Eric Elliott’s excellent series of articles on the topic. However, those are not a prerequisite for what we will be doing in this article.
Now, with that out of the way. Let’s build some operators.
Smooth Operators
As we rattle through this list of operators, we’re going to start to see a common pattern emerge. Let’s think of how an operator should work. Ideally, we’d like to be able to write something like the following:
Observable.fromEvent(document, 'click')
.map(click => console.log('Got a click!'));
We’ve already seen how fromEvent
is implemented in the previous article, so here we want to add operator functions to the Observable prototype which can be composed with other operators, and pipe events through the system. This will be clearer if we look at some code first, and then walk through it.
If you want to play around with the operators implemented in this article, they can be found on this Stackblitz:
map
The map function takes in a projectionFunction
, applies it to each element of the source stream as it arrives, and projects the results into a new output stream. Here’s the official marble diagram:

Do you recall how we implemented map
on arrays back in the first article of this series?
// overriding properties on Array.prototype is a bad idea. This is given for educational purposes only :D
Array.prototype.map = function(projectionFn) {
let retVal = [];
for (let i = 0; i < this.length; i++) {
retVal.push(projectionFn(this[i]));
}
return retVal;
}
Here’s what it looks like as a method on our observable class:
map(projFn): Observable {
return new Observable((observer) => {
return this.subscribe(
(val) => observer.onNext(projFn(val)),
(e) => observer.onError(e),
() => observer.onCompleted()
);
});
}
Let’s break that down.
- On line 2, we return a new Observable, which will take in an observer object. This is going to be a very common pattern. We always have to return a new Observable, otherwise, composition with other operators will not be possible.
- That’s really all this function does when it’s invoked. The newly returned observable doesn’t do anything until its
subscribe
method is called at a later time. This is why we say observables are lazy. - When subscribe is called on the returned observable, some interesting things happen. Namely,
this
on line 3 references whichever observable calledmap
, notmap
itself (since we’re using an arrow function on line 2, which preserves the binding ofthis
). So we are subscribing to whichever observable comes directly beforemap
in the chain. In doing this, we are able to pipeline a piece of data through our chain of operators.
Consider the following example
let obs = Observable.fromEvent(document, 'click')
.map(event => event.clientX);
// our three observer functions
obs.subscribe(console.log, (e) => {}, () => {});
When we call map
on the first line, and it returns an observable, that is what is stored in our obs
variable. Nothing has started listening for events at this point. Then, on the next line, when we call obs.subscribe()
, everything starts to fire.
As mentioned before, the this
reference on line 3 of our map
implementation will refer to the observable returned by fromEvent
, so inside of map
, we subscribe to that observable, and pass it next
, error
, and completed
handlers (in a sense, the map
is observing the fromEvent
).
This way, when an event is emitted, our map
sees it, calls the projFn
on it, and then calls the next
method of the observer that was passed into map’s subscribe, in this case, it’s the three functions we pass in our example obs.subscribe(...)
, but if there were another operator after map
in the pipeline, like a filter
, it would be the observer provided by filter
.
A diagram of how subscribes and events propagate within an observable pipeline looks like this:

On line 1, the only thing that has happened is that sub
now holds the observable returned by the call to map(x => x.clientX)
. No observation has been set up yet. So if any clicks happened on document
, they would not be observed.
On line 2, when subscribed
is invoked, the following steps happen in the diagram.
- subscribe is called, which is really calling subscribe on the observable returned by
map
- This causes
map
to call subscribe on the observable it has a reference to viathis
. In this example, it’sfromEvent
. - This causes
fromEvent
to set up an observation for click events on the document. Check out the previous article to see exactly how this works. - A click event comes in, which causes
fromEvent
to call its observer’snext
method. - this propagates the event to
map
.map
applies its projection function to the event (in this case, returning its clientX value), thenmap
calls its observer’snext
method. In this case, it’s referring to the observer we passed intosubscribe
, which is a simple console log - Our
console.log
function is called.
This process can be extended to any number of chained operators. Let’s write another operator.
filter
Filter follows a very similar pattern as map. The only difference is that filter takes a predicateFn
instead of a projectionFn
. Don’t worry too much about the fancy names. A predicateFn
just means a function that returns either true
or false
. Each time a value is emitted by our source, it will pass through filter
's predicateFn
. If the function returns true
, then the value will be forwarded on to the next observer. Otherwise, it will not.

Here’s how we wrote it on Array.prototype
back in part 1:
Array.prototype.filter = function(predicateFn) {
let retVal = [];
for (let i = 0; i < this.length; i++) {
if (predicateFn(this[i]) {
retVal.push(this[i]);
}
}
return retVal;
}
and here it is as a method on the Observable class:
filter(predicateFn): Observable {
return new Observable((observer) => {
return this.subscribe(
(val) => {
// only emit the value if it passes the filter function
if (predicateFn(val)) {
observer.onNext(val);
}
},
(e) => observer.onError(e),
() => observer.onCompleted()
);
});
}
take
Last but not least, we come to the take
operator. which will feel familiar if you’ve ever worked with something like Haskell. The take
operator accepts a single number as its argument, and will take up to that many values from a source stream, and emit them in a new stream. Imagine that you have some infinite stream that you are observing, and you say, “I only want at most 5 values from this stream”. That would be take(5)
.

Let’s build it:
take(count: number): Observable {
return new Observable((observer) => {
let currentCount = 0;
return this.subscribe(
(val) => {
if (currentCount < count) {
observer.onNext(val);
currentCount++
} else if (currentCount === count){
observer.onCompleted();
currentCount++
}
},
(e) => observer.onError(e),
() => observer.onCompleted()
);
});
}
We maintain an internal counter, and only emit values if currentCount
is less than the count
parameter. Once we reach the count
parameter, we call the observer’s onCompleted
method to signal that no more data will be arriving.
In a real implementation, we would also unsubscribe
at this point, and do some cleanup.
Summary
That’s all for now. We now have a basic Observable class with some operators. In the next article, we will focus on building operators which can consume multiple input observables, and higher-order observables (observables within observables). Stay tuned!