{"community_link":"https://github.com/indepth-dev/community/discussions/167"}

Telegraph with RxJS: the power of reactive systems

Explore interesting RxJS code and Angular directives in this fun exercise of recreating an analogue Morse code telegraph using your favorite tools!

Telegraph with RxJS: the power of reactive systems

I was watching a movie the other day. There was a guy using telegraph. He had to remember the whole Morse code by heart and press his single button crazy fast! It got me thinking. With RxJS we can do better! Let's make a telegraph using single fromEvent and lots of cool stuff. We will practice Dependency Injection, other Angular features, and awesome RxJS usage to build a working demo that looks (and sounds) great. Here we go!

Morse code

Telegraph uses Morse code. It's pretty easy for us to understand as it is just a binary sequence. The signal is discrete, meaning it is broken into segments of a particular duration. At every given piece it's either a "bleep" or silence. We can use Boolean or just 1s and 0s for brevity. It follows the rules below:

  • There are two symbols: dash and dot
  • Dash is three 1s in a row
  • Dot is a single 1
  • They are separated by a single 0
  • Letters are separated by three 0s
  • Words are separated by seven 0s (we can see this as space being four 0s + three 0s of char end)

Each letter is represented by the sequence depicted in the table below. You can also see the signal grid, black square means 1, white square is 0.

Let's take letter "H". It maps to 4 dots. Dot is a single 1, which also should be separated with 0, so we'll have 4 of 1s and 3 zeros in between: 1010101. Given that here's what "HELLO WORLD" would look like:

10101010001000101110101000101110101000111011101110000000 = HELLO[end of word]

10111011100011101110111000101110100010111010100011101010000000 = WORLD[end of word]

This is a standard US Morse code table, but there are other versions. If we were to implement this code table in Angular, this would be a good candidate for an InjectionToken. With second parameter having factory function we can provide this implementation as default value. And projects can swap it for their Morse code table:

export const MAP = new InjectionToken<Map<string, readonly (0 | 1)[]>>(
  'Morse code dictionary',
  {
    factory: () => new Map([
      [' ', [0, 0, 0, 0]],
      ['a', [1, 0, 1, 1, 1]],
      // ...
    ])
  }
)

We want to allow users to type on a regular keyboard then encode letters into Morse code. After we pass them over an imaginary telegraph line we will decode them and print them on the screen. It all starts with a single fromEvent(document, 'keydown') and then we will branch the heck out of this Observable!

Encoding

We can encode every letter synchronously. Each letter would turn into an array of 0s and 1s instantly. But since we emulate an analog device which is based on duration of signals — we will designate a certain length to every unit.

Let's make unit duration an InjectionToken as well so it can be customized!

It means that letters are translated into sequences that have duration. Because we want to preserve the order of emissions and never lose a character from previous emission, we will use concatMap operator. It is a member of the Higher Order Observables family of operators and we use it to map every value to an Observable. Whenever a new value comes (a user presses a key) — it waits for the previous Observable to complete (a sequence is played out) before proceeding to create the next sequence Observable. This way we can type really fast and we'll never lose a letter in transition.

For the sake of a demo let's also make a simple service to send those sequences. We will use it to send letter with mouse and virtual on-screen keyboard:

@Injectable({
  providedIn: 'root'
})
export class MorseService extends Subject<readonly (0 | 1)[]> {
  constructor(
    @Inject(MAP) private readonly chars: Map<string, readonly (0 | 1)[]>
  ) {
    super();
  }

  send(char: string) {
    this.next(this.chars.get(char));
  }
}

Now we can make a token that translates letters into Morse code sequence. First we get our mapping table and unit duration from the DI, as well as the service from the snippet above. Next we map all keydown events from document to their corresponding sequences from the table. We merge those key events with emissions from the service and use concatMap to losslessly turn them into a stream of 0s and 1s. We end each character sequence with a space sequence (three 0s in a row):

export const MORSE = new InjectionToken<Observable<0 | 1>(
  'A sequence of Morse code', 
  {
    factory: () => {
      const chars = inject(MAP);
      const duration = inject(UNIT);
      const service$ = inject(MorseService);
      const keydown$ = fromEvent(inject(DOCUMENT), 'keydown').pipe(
        map(({ key }: KeyboardEvent) => chars.get(key)),
        filter(Boolean)
      );
	
      return merge(service$, keydown$).pipe(
        concatMap(sequence =>
          from(sequence).pipe(
            endWith(...SPACE),
            delayEach(duration)
          )
        ),
        share(),
      );
    }
  }
);

Note usage of from instead of of. When used with Arrays it would turn them into sequence, whereas of will make a single emission with an entire array.

There's a custom operator in there — delayEach. It delays every emission from Observable, rather than an entire stream:

export function delayEach<T>(duration: number): MonoTypeOperatorFunction<T> {
  return concatMap(x => of(x).pipe(delay(duration)));
}

Now we can listen to the incoming Morse code by injecting MORSE token and subscribing!

Decoding

Decoding is much more fun. We need to create an operator to compare values with a sequence. It must reset if a value does not equal to the digit in the corresponding sequence position, emit a letter after the whole sequence is finished, followed by three 0s indicating the end of a letter. Then it repeats the same pattern.

We terminate and restart the stream so emission indices match those of our sequence.

We also want to visualize it so let's create a special module for characters. It will consist of a Component, Directive and a Service. Why do we need all those? For a good logic separation. We need to know what character we are working with. But we need it in both Component and Service. To avoid cyclic dependency we will create a Directive with single purpose — let us know the character:

@Directive({
  selector: '[char]'
})
export class CharDirective {
  @Input() char = '';
}

We can use the same selector for a Component. We will use it to send signals with mouse and to visualize our demo:

@Component({
  selector: '[char]',
  templateUrl: 'char.template.html',
  styleUrls: ['char.style.less'],
  providers: [CharService]
})
export class CharComponent {
  constructor(
    @Inject(CharService) readonly service: Observable<number | string>,
    @Inject(CharDirective) readonly directive: CharDirective,
    @Inject(MorseService) private readonly emitter: MorseService
  ) {}

  @HostListener('click')
  onClick() {
    this.emitter.send(this.directive.char);
  }
}

Now we need to implement actual decoding. Notice that CharService has the type of Observable<number | string>. It will emit decoding progress as numbers between 0 and 1 and the letter once the decoding is complete. We will later display decoding progress for each character in the demo app.

Service

Creating this service is the hardest part. I'm not gonna lie, it took me quite some time to figure this RxJS puzzle out. Biggest difficulty came from space character. It looks a lot like a character end sequence which is also a bunch of consequent 0s.

We will start by creating all the helper functions. Our service extends Observable and it will have a private internal stream based on Morse code Observable we created earlier:

@Injectable()
export class CharService extends Observable<number | string> {
  private readonly inner$ = this.morse$.pipe(
    // ...
  )

  constructor(
    @Inject(MORSE) private readonly morse$: Observable<0 | 1>,
    @Inject(MAP) private readonly chars: Map<string, readonly (0 | 1)[]>,
    @Inject(CharDirective) private readonly directive: CharDirective
  ) {
    super(subscriber => this.inner$.subscribe(subscriber));
  }
}

We need to know what sequence are we looking for. It has to be a getter and not a read-only property because at the construction time Directive input has not been processed yet:

private get sequence(): readonly (0 | 1)[] {
  return [...this.chars.get(this.directive.char), ...SPACE];
}

If it was a static string value and not an input we could have changed Directive to @Attribute instead of @Input and get its value in constructor. But then we will not be able to create them in *ngFor.

There's one thing that we do not need and that is — recreating the Array everytime we request it. For this purposes we can use @tuiPure decorator from Taiga UI. We use it a lot in our library for lazy getters. This is a memoization pattern for delayed computation. After the value is requested for the first time we replace getter with a plain property containing the result. All we need to do is add decorator on top of the getter.

It works on methods too. It would check for arguments and if they all match arguments from the previous call it would just return the previous result, without calling the method.

We also need to check that value at particular index of the sequence matches the given value. This way we would check that our Morse code stream matches the sequence we are looking for:

private isValid(value: number, index: number): boolean {
  return this.sequence[index] === value;
}

Next we need a value that the service is going to emit. We want to know the progress of decoding and the decoded letter:

private getValue(index: number): number | string {
  return this.sequence.length === index + 1 
    ? this.char 
    : (index + 2) / this.sequence.length;
}

With all those helpers we can build internal stream:

private readonly inner$ = this.morse$.pipe(
  takeWhile(this.isValid.bind(this)),
  map((_, index) => this.getValue(index)),
  startWith(0),
  endWith(0),
  takeWhile(isNumber, true),
)

We have two takeWhile operators. First one terminates the stream if the sequence didn't match. Second is used to terminate if a letter went through (isNumber is a simple helper to check type).

Note second argument for takeWhile — it lets value that broke the condition though as well.

Restarting the stream

Terminating and restarting the stream makes it easy for us to check that sequences match. It makes emission indices in sync with the sequence we are looking for. Once we matched the whole sequence and a character went through (second takeWhile) we can just restart. But if decoding failed (the sequences didn't match) — we need to wait for the new character. It means that we must wait for three consecutive 0s.

My initial thought on restarting failed decoding was to use repeatWhen. But the trick with repeatWhen is that its factory function is only called once upon first termination. Then it continues to listen to the Observable that it returned:

repeatWhen(() => threeZeroes$),

This means that it would work well for us only for the second and all forthcoming restarts. But at the first one, those three 0s indicating the end of the letter have already passed by the time we start to listen to this restart stream.

First, let's add code to listen for three consecutive 0s. It's a great case for scan which is a lot like reduce in Array but runs on the fly (RxJS operator reduce is the same but it is only triggered upon completion).

export function consecutive<T>(
  value: T,
  amount: number
): OperatorFunction<T, unknown> {
  return pipe(
    startWith(...new Array(amount).fill(value)),
    scan((result, item) => (item === value ? ++result : 0), 0),
    filter(v => v >= amount),
  );
}

We use startWith with three 0s to kickstart the stream. Then we add up the number of consecutive 0s and only let through 3 or more. We cannot use exactly 3 because then space character (which translates to seven 0s) would break all other letters.

Now we need to separate two takeWhile operators. We want first one to terminate some inner stream. This means we need a Higher Order Observable again and concatMap is here again to save the day:

private readonly inner$ = this.morse$.pipe(
  consecutive(0, SPACE.length),
  concatMapTo(this.morse$.pipe(
    takeWhile(this.isValid.bind(this)),
    map((_, index) => this.getValue(index)),
    startWith(0),
    endWith(0),
  )),
  takeWhile(isNumber, true),
  repeat(),
)

Now when inner stream is terminated by failed decoding three consecutive 0s will restart it. Space character causes multiple groups of three consecutive 0s but they all will wait for internal stream to complete and will be dismissed once space character restarts the whole stream through second takeWhile.

Demo

We got everything ready, now it's time to create a demo app! It will listen to keyboard presses and display a virtual keyboard. Demo will also print received message on screen and each letter button will display decoding progress.

Since it's a telegraph with Morse code it just wouldn't feel authentic without the sound. And simple bleeps are easy to create with Web Audio API and OscillatorNode in particular. We can use my wrapper library @ng-web-apis/audio.

It's a part of our open-source initiative Web APIs for Angular. Our goal is to build high quality lightweight wrappers for native Web APIs for idiomatic use with Angular. Check out what we have so far!

With it we can turn declarative Angular directives into Web Audio API node graph:

<ng-container waOscillatorNode autoplay frequency="523">
  <ng-container
    waGainNode
    gain="0"
    [gain]="morse$ | async | waAudioParam : 0.02"
  >
    <ng-container waAudioDestinationNode></ng-container>
  </ng-container>
</ng-container>

It would play a bleep for 1s and be silent for 0s by controlling the volume. Now let's layout our keyboard and decoded output. It also makes sense to add «Clear» button:

<section>
  <button *ngFor="let char of chars" type="button" [char]="char">
    {{char}}
  </button>
</section>
<output>{{ output$ | async }}</output>
<footer>
  <button type="reset" (click)="reset$.next()">Clear</button>
</footer>

But how do we get the output$? By querying services from template! Did you know that Angular queries can reach for services and other instances from node injectors?

@ViewChildren(CharService)
readonly services: QueryList<Observable<string | number[]>>
Keep in mind they are unavailable until ngAfterViewInit hook!

We can compose output$ stream this way:

readonly reset$ = new Subject<void>();

readonly output$ = this.reset$.pipe(
  switchMap(() => merge(...this.services.toArray()).pipe(
    filter(x => !isNumber(x)),
    scan((result, char) => result + char, ''),
    startWith(''),
  )),
  startWith(''),
);

ngAfterViewInit() {
  this.reset$.next();
}

And we also want to indicate decoding progress. To do so we will add a simple span block to our CharComponent and control its height and color with these two streams:

readonly progress$ = this.service.pipe(
  filter(isNumber),
  map(value => value * 100),
);

readonly pending$ = this.service.pipe(
  filter(Boolean),
  map(isNumber),
);

And one last neat trick I wanted to show is controlling CSS transitions with the same token we create for UNIT duration! Since Angular moved to Ivy renderer we are able to bind to CSS variables, so lets use --duration in our styles and add this to our main component:

@Component({
  // ...
  host: {
    '[style.--tui-duration]': 'unit + "ms"',
  },
})
export class AppComponent implements AfterViewInit {
  constructor(
    // ...
    @Inject(UNIT) readonly unit: number,
  ) {}

  // ...
}

Everything is finished and you are welcome to try the final product!