Welcome back. In the previous article, we looked at RxJS’s take on the Observer pattern, and created our simple Observable class. We also added some functions for creating observables from data streams, such as of, from, and fromEvent. If you’re new to this series, I recommend familiarizing yourself with the previous installments:

Today, we’re going to build some basic operators that will let us compose operations over streams of data, such as map ,filter, and take. Operators are a central part of RxJS, since they let us compose new streams of data from our input streams. This lets us do more powerful stuff with the asynchronous data in our system.

As we’ll see, the first rule of RxJS is that everything can be modeled as a stream, including the solution to our async problem, so composing streams to create new streams is very powerful, as it will let us get to our solution.

Operators

Please note that the examples in this article use the RxJS v5 chaining syntax. I’ll cover v6 pipeable operators in a later article.

Operators are functions which take in some number of input streams, create a new output stream, and then perform some operation over items emitted by the input streams, and emit the processed results on the newly created output stream. You’ll often see them represented using marble diagrams:

Marble diagram for the filter operator. In this case, we emit only circles on our output stream.

The black arrow with the shapes on top represents the input stream. Each of those shapes could be some data arriving over time, like DOM events. The result of applying the filter operator on that input stream is the output stream on the bottom of the diagram. Each time something is emitted on the input stream, the filter operator checks to see if it’s a circle, and if so, emits it on the output stream. Streams are immutable, so the source stream is not affected by this operation.

Let’s pause for a second, and try to develop an intuition for what is going on here. Why all this talk about composing things? Why is that word so popular in RxJS?

Composing streams just means to take a stream, and apply some operator to it, which will produce a new output stream. That stream can then be fed into another operator, to produce another output stream, and so on. Think of it as chaining (or piping) streams.

…again…why do this?

Well, the first rule of RxJS was that anything can be modeled using a stream. Which means that whenever we are trying to solve some async problem, the solution to that problem can also be modeled as a stream.

This becomes our goal when using RxJS; to identify what the solution stream to our problem looks like, to identify what the input streams are in our problem, and to then compose operations over those input streams, until we produce our desired output stream.

Sounds a bit abstract and hand-wavey, right? Well, consider the simple example presented in the marble diagram above. We want a stream full of circles, and nothing else. That’s our goal (funny, I know). We have an input stream, which will emit a lot of shapes, some of which are circles. So our task becomes finding a series of operations which can produce a new stream containing only circles, given our input stream. In this case, that happens to be a simple filter operator. Applying the filter operator to our input stream produces the desired output stream, and we’re done.

Still not convinced? Ok, let’s try something a little more concrete. Let’s do the obligatory drag event stream example.

What is a drag event? One way of thinking about it is that it’s a mousedown event, followed by some number of mousemove events, and terminated by a mouseup event. So let’s identify our streams:

  • Inputs: mousedown, mousemove, and mouseup streams
  • Desired output: A stream of drag events

We’ll start by building our input streams using fromEvent.

Our three input streams (mousedown, mousemove, and mouseup), and our soon to be created output stream of drag events.

We said that our drag event starts when we get a mousedown:

A mousedown has occurred, but what operator should we feed it into?

Given a mousedown, we want to start listening for mousemoves and mouseups (we don’t want to listen for these ahead of time, since our drag event starts with a mousedown). There are a few different operators that can be used here. We’ll use concatMap. Don’t worry too much about what it does, as we’ll build it from scratch in a later article:

concatMap lets us start listening for mousemoves and mouseups, once we’ve received a mousedown.

We want to listen for mousemoves until we get an emission from our mouseup stream. The takeUntil operator works perfectly for this. Again, don’t worry if you don’t know exactly how takeUntil works. Just focus on the approach here:

we start listening for mousemoves, until we get a mouseup.

This is starting to look good. We’re now listening to all of the mousemoves. Eventually, a mouseup will come along, and complete the drag event:

A mouseup signals the end of our drag event, so a value is emitted on our drag event stream.

So to recap. We identified our desired output ( a stream of drag events), as well as our inputs (mousedown, mousemove, mouseup), and then composed those input streams together to produce our output stream.

Try to keep this intuition in mind when solving things “The Rx way”.

By the way, the topic of composition comes up a lot in software, especially as you get closer and closer to functional programming. If you’d like to know more about it, I suggest Eric Elliott’s excellent series of articles on the topic. However, those are not a prerequisite for what we will be doing in this article.

Now, with that out of the way. Let’s build some operators.

Smooth Operators

As we rattle through this list of operators, we’re going to start to see a common pattern emerge. Let’s think of how an operator should work. Ideally, we’d like to be able to write something like the following:

Observable.fromEvent(document, 'click')
          .map(click => console.log('Got a click!'));

We’ve already seen how fromEvent is implemented in the previous article, so here we want to add operator functions to the Observable prototype which can be composed with other operators, and pipe events through the system. This will be clearer if we look at some code first, and then walk through it.

If you want to play around with the operators implemented in this article, they can be found on this Stackblitz:

map

The map function takes in a projectionFunction , applies it to each element of the source stream as it arrives, and projects the results into a new output stream. Here’s the official marble diagram:

Map applies a projection function to each item in the source stream. In this case, it multiplies the inputs by 10, and projects the results into a new output stream.

Do you recall how we implemented map on arrays back in the first article of this series?

// overriding properties on Array.prototype is a bad idea. This is given for educational purposes only :D
Array.prototype.map = function(projectionFn) {
 let retVal = [];
  
 for (let i = 0; i < this.length; i++) {
   retVal.push(projectionFn(this[i])); 
 }
  
  return retVal;
}

Here’s what it looks like as a method on our observable class:

map(projFn): Observable {
  return new Observable((observer) => {
    return this.subscribe(
      (val) => observer.onNext(projFn(val)),
      (e) => observer.onError(e),
      () => observer.onCompleted()
    );
  });
}

Let’s break that down.

  • On line 2, we return a new Observable, which will take in an observer object. This is going to be a very common pattern. We always have to return a new Observable, otherwise, composition with other operators will not be possible.
  • That’s really all this function does when it’s invoked. The newly returned observable doesn’t do anything until its subscribe method is called at a later time. This is why we say observables are lazy.
  • When subscribe is called on the returned observable, some interesting things happen. Namely, this on line 3 references whichever observable called map, not map itself (since we’re using an arrow function on line 2, which preserves the binding of this). So we are subscribing to whichever observable comes directly before map in the chain. In doing this, we are able to pipeline a piece of data through our chain of operators.

Consider the following example

let obs = Observable.fromEvent(document, 'click')
                    .map(event => event.clientX);
                    
// our three observer functions
obs.subscribe(console.log, (e) => {}, () => {});

When we call map on the first line, and it returns an observable, that is what is stored in our obs variable. Nothing has started listening for events at this point. Then, on the next line, when we call obs.subscribe() , everything starts to fire.

As mentioned before, the this reference on line 3 of our map implementation will refer to the observable returned by fromEvent, so inside of map, we subscribe to that observable, and pass it next, error, and completed handlers (in a sense, the map is observing the fromEvent).

This way, when an event is emitted, our map sees it, calls the projFn on it, and then calls the next method of the observer that was passed into map’s subscribe, in this case, it’s the three functions we pass in our example obs.subscribe(...), but if there were another operator after map in the pipeline, like a filter, it would be the observer provided by filter.

A diagram of how subscribes and events propagate within an observable pipeline looks like this:

When subscribe is called on line 2 [1], it triggers map to subscribe to fromEvent [2], which sets up the observable [3]. When an event comes in [4], it sets off a cascade of next() calls [5,6]

On line 1, the only thing that has happened is that sub now holds the observable returned by the call to map(x => x.clientX) . No observation has been set up yet. So if any clicks happened on document, they would not be observed.

On line 2, when subscribed is invoked, the following steps happen in the diagram.

  1. subscribe is called, which is really calling subscribe on the observable returned by map
  2. This causes map to call subscribe on the observable it has a reference to via this. In this example, it’s fromEvent .
  3. This causes fromEvent to set up an observation for click events on the document. Check out the previous article to see exactly how this works.
  4. A click event comes in, which causes fromEvent to call its observer’s next method.
  5. this propagates the event to map. map applies its projection function to the event (in this case, returning its clientX value), then map calls its observer’s next method. In this case, it’s referring to the observer we passed into subscribe, which is a simple console log
  6. Our console.log function is called.

This process can be extended to any number of chained operators. Let’s write another operator.

filter

Filter follows a very similar pattern as map. The only difference is that filter takes a predicateFn instead of a projectionFn. Don’t worry too much about the fancy names. A predicateFn just means a function that returns either true or false. Each time a value is emitted by our source, it will pass through filter's predicateFn. If the function returns true, then the value will be forwarded on to the next observer. Otherwise, it will not.

The filter operator will only emit values if they return true when passed into its predicate function. In this example, only the odd numbers will pass the filter.

Here’s how we wrote it on Array.prototype back in part 1:

Array.prototype.filter = function(predicateFn) {
 let retVal = [];
 
 for (let i = 0; i < this.length; i++) {
   if (predicateFn(this[i]) {
     retVal.push(this[i]);    
   }
 }
  
 return retVal;
}

and here it is as a method on the Observable class:

filter(predicateFn): Observable {
    return new Observable((observer) => {
        return this.subscribe(
            (val) => {
                // only emit the value if it passes the filter function
                if (predicateFn(val)) {
                    observer.onNext(val);
                }
            },
            (e) => observer.onError(e),
            () => observer.onCompleted()
        );
    });
}

take

Last but not least, we come to the take operator. which will feel familiar if you’ve ever worked with something like Haskell. The take operator accepts a single number as its argument, and will take up to that many values from a source stream, and emit them in a new stream. Imagine that you have some infinite stream that you are observing, and you say, “I only want at most 5 values from this stream”. That would be take(5).

With take, only the first (2) values will be taken, then the output stream will complete.

Let’s build it:

take(count: number): Observable {
    return new Observable((observer) => {
        let currentCount = 0;
        return this.subscribe(
            (val) => {
                if (currentCount < count) {
                    observer.onNext(val);
                    currentCount++
                } else if (currentCount === count){
                  observer.onCompleted();
                  currentCount++
                }
            },
            (e) => observer.onError(e),
            () => observer.onCompleted()
        );
    });
}

We maintain an internal counter, and only emit values if currentCount is less than the count parameter. Once we reach the count parameter, we call the observer’s onCompleted method to signal that no more data will be arriving.
In a real implementation, we would also unsubscribe at this point, and do some cleanup.

Summary

That’s all for now. We now have a basic Observable class with some operators. In the next article, we will focus on building operators which can consume multiple input observables, and higher-order observables (observables within observables). Stay tuned!