Welcome to part 3 of the series. In this installment, we’re going to look at RxJS’s take on the Observer pattern, and start implementing an observable from scratch. Our humble Observable will be nowhere near as sophisticated as what’s going on inside of RxJS, but it will give you a good enough understanding of observables so that you can start digging into the RxJS source code to learn more.

Observer pattern

To understand Observables, it’s important to start with the Observer Pattern.

The classical Observer Pattern. Image taken from the original Gang of Four Design Patterns book.

The concept is pretty straightforward. There is some object containing state that will change over time. This is known as the subject in the classical Observer Pattern. All this subject does is accept callback functions from observers, which are objects that want to be notified whenever the subject’s state changes. Whenever such a state change happens, the subject loops through all of the observer callbacks, and invokes them with the new state as an argument.

class Subject {
  constructor() {
    this.callbacks = [];
  }

  subscribe(fn) {
     this.callbacks.push(fn);
  }

  publish(data) {
     this.callbacks.forEach(fn => fn(data));
  }
}

// usage
const subject = new Subject();
const observer1 = (data) => console.log(`Observer1 received data: ${data}`);

subject.subscribe(observer1);

setTimeout(() => {
  subject.publish('test data');
}, 1000);

The classic observer pattern has been a staple of web development for many years. However, it has not been without its shortcomings, or its critics.

For our purposes, it provides no way of containerizing events, meaning we can’t compose streams out of subject events.

RxJS improves upon this classical observer pattern by introducing a more robust interface for observers, one that supports not just a method for publishing data (onNext), but also methods for notifying observers of completion, as well as errors.

The Observable abstracts away the underlying data stream, and supports a simple interface that an observer can use to consume the stream.
As data arrives on the event stream (represented as a circle), the Observable calls the onNext method its observer.
If the stream ever runs out of data forever, the Observable will call the observer’s onCompleted method.
Should anything go wrong with the stream, the Observable will call the observer’s onError method.

Note that an observer’s onNext method can be called many times, but onError and onCompleted indicate that no more data will be arriving from the observable.

As a quick aside, observables and subjects are technically different from one another. In RxJS, subjects are “stateful”, in that they maintain a list of subscribers that they multicast data to, similar to the subject in the classical observer pattern. By contrast, observables are really just functions that set up a context for observation, without holding onto state. Each observer will see its own execution of the observable. More on this in a later article, as it’s not too important right now.


Our simple class must support the Observer interface, which means that any observer that subscribes to our observable will provide the following three methods:

  1. onNext to be called each time our observable emits data
  2. onError to be called if an error happens in our observable
  3. onCompleted to be called when our observable is done producing data

For reference, the official Observer interface can be found here.

A simple scenario looks like this:

const obs = new Observable.of(1,2,3); // creates an synchronous Observable which will emit the values 1,2,3, then complete.

// Observables are lazy, so nothing has been emitted yet. obs just holds a reference to an Observable which will emit 1,2,3 and then complete each time it is subscribed to.

const observer = {
  onNext: (val) => { console.log(`onNext: ${val}`); },
  onError: (err) => { console.log(`onError: ${err}`); },
  onCompleted: () => { console.log(`onCompleted`); }
};

obs.subscribe(observer);

// as soon as obs.subscribe(observer) is executed, the observable will emit its data:
// onNext: 1
// onNext: 2
// onNext: 3
// onCompleted
Note that line 11 returns a subscription object that an observer can use to unsubscribe from the observable.

Don’t concern yourself with the details too much just yet, we will implement everything in that example by hand shortly.

The Observer interface can be thought of as an analog to the Iterator interface. While iterators let a consumer pull data from a source, observers let a source push data to an observer, via onNext. Similar to how Iterators can communicate error and completion information, our Observer has onError and onCompleted . In many ways, the two are symmetric. Jafar Husain has a great talk on the symmetry between the Iterator pattern, and the Observer pattern. He also talks about it here.

Jafar Husain is an excellent resource for understanding reactive programming and observables. He’s also championing the TC39 proposal to add observables to Javascript.

The first thing to do is create the Observable class.

export class Observable<T> {
    /** Internal implementation detail */
    private _subscribe: any;

    /**
      * @constructor
      * @param {Function} subscribe is the function that is called when the 
      * observable is subscribed to. This function is given a subscriber/observer
      * which provides the three methods on the Observer interface:
      * onNext, onError, and onCompleted
    */
    constructor(subscribe?: any) {
        if (subscribe) {
            this._subscribe = subscribe;
        }
    }

    // public api for registering an observer
    subscribe(onNext: any, onError?: any, onCompleted?: any) {
        if (typeof onNext === 'function') {
            return this._subscribe({
                onNext: onNext,
                onError: onError || (() => {}),
                onCompleted: onCompleted || (() => {})
            });
        } else {
            return this._subscribe(onNext);
        }
    }
 }

The class has two functions, a public subscribe, and an internal _subscribe. This is pretty similar to how Observable is actually implemented in RxJS. The public method is a common way for observers to subscribe by providing onNext, onError, and onCompleted functions. The internal _subscribe method decides when to call those methods, and will be dependent on the implementation details of the underlying stream being observed, as we will see shortly when we implement from and fromEvent.

By itself, this Observable doesn’t do anything. It just provides the machinery for setting up observation for observers and their onNext, onError, and onComplete handlers. In order to do that, we need to add some creational methods. We’ll implement three, of, from, and fromEvent.

But before we do that… A quick note on RxJS 6, and pipeable operators

In the old days of RxJS (v5 and below), creational methods and operators were all put on the Observable prototype. Starting with RxJS 6, this has all changed. Operators are no longer placed on the Observable prototype, which leads to better tree-shaking, smaller bundle sizes, and better interoperability with third-party libraries. The chaining syntax for composition, such as:

Observable.of(1,2,3).map(x => x + 1).filter(x => x > 2);

has also been replaced in favor of using the pipe operator for composition:

of(1,2,3).pipe(map(x => x + 1), filter(x => x > 2));

Conceptually, nothing much has really changed, so we’re going to stick to the RxJS 5 style of chaining operators on Observable prototype for composition, just for the sake of simplicity. In a later article, we’ll break things up, and implement a pipe operator a la RxJS 6.
If you’d like to know more about RxJS 6 and pipeable (lettable) operators, head over to the RxJS section of AngularInDepth.

Now, with that out of the way, let’s implement a creational method, and write some tests for it. You can follow along with this repository.

Observable.of

This method takes in some arguments, and simply returns an observable that emits each of those values one by one, then completes. Here’s the code:

class Observable {
  ...
  // add as a static method on Observable so it can be used as
  // Observable.of()
    static of(...args): Observable {
      return new Observable((obs) => {
        args.forEach(val => obs.onNext(val));
        obs.onCompleted();
  
        return {
          unsubscribe: () => {
            // just make sure none of the original subscriber's methods are never called.
            obs = {
              onNext: () => {},
              onError: () => {},
              onCompleted: () => {}
            };
          }
        };
      });
    }
  
  ...
}
Observable.of() calls onNext for each of its arguments, then completes.

Notice how on line 6, we start by returning a new Observable. This is a very common pattern that we will see over and over again when implementing this Observable class. An Observable operation must always return a new Observable, otherwise, composition will not be possible. For instance, if Observable.of did not return an Observable, then we would not be able to do things like Observable.of(5).map(x => x * 2) because map expects an Observable in order to be callable. It’d be like if Array.prototype.filter didn’t return an array, then things like [1,2,3].filter(x => x > 1).map(x => x * 2) would break, because you’d be trying to call map on something other than an array.

Let’s write a couple of unit tests for of and see how it does. As a reference, here are the official RxJS 5 tests.

describe('Observable of', () => {
    it('should emit each input separately and complete', (done: MochaDone) => {
      const x = { foo: 'bar' };
      const expected = [1, 'a', x];
      let i = 0;

      Observable.of(1, 'a', x)
        .subscribe((val) => {
          expect(val).to.equal(expected[i++]);
        }, (err) => {
          done(new Error('should not be called'));
        }, () => {
          done();
      });
    });

    it('should emit one value', (done: MochaDone) => {
      let calls = 0;
    
      Observable.of(42).subscribe((x: number) => {
        expect(++calls).to.equal(1);
        expect(x).to.equal(42);
      }, (err: any) => {
        done(new Error('should not be called'));
      }, () => {
        done();
      });
    });
  });

Of is pretty simple. By default, its execution is going to be synchronous once subscribe is called, so the unsubscribe method will likely never be called. However, we supply one just in case.

Let’s take a look at a similar method, from, which consumes an iterable.

Observable.from

from accepts an iterable, and fires onNext for each element.

static from(iterable): Observable {
    return new Observable((observer) => {
        for (let item of iterable) {
            observer.onNext(item);
        }            

        observer.onCompleted();

        return {
            unsubscribe: () => {
                // just make sure none of the original subscriber's methods are never called.
                observer = {
                    onNext: () => {},
                    onError: () => {},
                    onCompleted: () => {}
                };
            }
        };
    });
}

Nothing too crazy here. We use a for...of loop to consume the iterable. Here are some tests:

describe('Observable fron', () => {
  it('should consume an iterable and complete', (done: MochaDone) => {
    const x = [1, 2, 'three'];
    const expected = [1, 2, 'three'];
    let i = 0;

    Observable.from(x)
      .subscribe((val) => {
        expect(val).to.equal(expected[i++]);
      }, (err) => {
        done(new Error('should not be called'));
      }, () => {
        done();
    });
  });

  it('should consume any iterable, including one from a generator', (done: MochaDone) => {
    function* foo(){
      yield 1;
      yield 2;
    };

    const expected = [1,2];
    let i = 0;

    Observable.from(foo()).subscribe((x: number) => {
      expect(x).to.equal(expected[i++]);
    }, (err: any) => {
      done(new Error('should not be called'));
    }, () => {
      done();
    });
  });
});

Last but certainly not least, let’s build fromEvent.

Observable.fromEvent

fromEvent is exciting since it will be our first asynchronous stream. We’ll implement a simplified version which only deals with DOM events, although RxJS’ implementation supports any event source which has addListener and removeListener style methods.

static fromEvent(source, event): Observable {
    return new Observable((observer) => {
        const callbackFn = (e) => observer.onNext(e);

        source.addEventListener(event, callbackFn);

        return {
            unsubscribe: () => source.removeEventListener(event, callbackFn)
        };
    });
}

We take a source DOM element, as well as the name of an event. We then use the browser’s addEventListener and removeEventListener functions to manage that subscription.

Summary

  • Observables are lazy. They don’t emit values until they are subscribed to (unless they are hot observables. More on that later).
  • RxJS improves on the classical Observer pattern by adding methods for error and completion, as well as enabling composition.
  • Observers have onNext, onError, and onComplete methods

That’s it for this article. In the next article we’ll dig into operators such as map and filter. Continue to part 4: Operators.