My light JS implementation of RxJS subjects

Fredo Corleone
4 min readOct 10, 2021

Working on different apps I’ve found that subjects from RxJS proved to be a good way to handle communication between components and deal with asynchronicity in general.

What is a subject?

A subject is an object that delivers messages to one or more consumers at once.
This is the concept is called multicasting.

What I wanted to achieve

I wanted to provide an essential implementation of the following subjects from RxJS:

  1. Subject, doens’t have an initial value and doesn’t cache any value
  2. BehaviorSubject, requires an initial value and emits its current value to new consumers
  3. ReplaySubject, emits a specified quantity of last emitted values in first-in-first-out fashion to new consumers

I’ve never required to complete or error out a subject, therefore I won’t implement completion and erroring.

The API is compatible with that of RxJS subjects, I could drop-in replace them and use them with the “| async” in Angular.

A much better naming

I always thought subjects in RxJS are poorly named. Therefore I decided, for my light JS implementation, to rename them:

  1. Subject becomes ObsEmitter, because the concept is similar to an EventEmitter
  2. BehaviorSubject becomes ObsCacher, because it acts as a cache that consumers can read the latest value when they need it
  3. ReplaySubject becomes ObsReplayer, this is pretty similar

I decided to throw away the Subject word and replace it with Obs: Obs stands for Observable and the noun right after should convey the fact it’s also an Observer.

The API

The standard API is pretty simple:
- getValue() is used to get the latest value in ObsCacher
- getValues() is used to get the latest values in ObsReplayer
- next() is used to broadcast a new value
- subscribe() is used to add new consumers
- unsubscribe() is used to remove a consumer

One think to be aware is that unsubscribe() is not a method of the object itself, it’s the return value of the subscribe() call:

let emitter$ = new ObsEmitter();

let subscription = emitter$
.subscribe(val =>
console.log(val)
);
subscription.unsubscribe();

Differences between my interpretation and RxJS one

ObsCacher (BehaviorSubject)

You can initialize it empty and the default value will be undefined, in RxJS you have to give it a value.

ObsReplayer (ReplaySubject)

The difference is that it provides .getValues() that returns an array of values, in RxJS there’s no such method.

ObsEmitter implementation

The concept is similar to an EventEmitter that keeps a registry of multiple listeners. When an event happens, e.g. new value arrives, it notifies those listeners.

function ObsEmitter() {
this.consumers = {};
}
ObsEmitter.prototype.next = function (value) {
for (let key in this.consumers)
this.consumers[key](value);
};
ObsEmitter.prototype.subscribe = function (callback) { // This is necessary for "| async" in Angular
if ("next" in callback)
callback = callback.next;
// This is a kind of unique id
let namespace = Math.random().toString(36).slice(2);
this.consumers[namespace] = callback; return {
unsubscribe: () => delete this.consumers[namespace]
};
};

It’s very simple, on next method call it iterates over all consumers calling them with the value as argument.

ObsCacher implementation

It acts as a cache that subscribers can read the latest value when they need it.

function ObsCacher(value = undefined) {
this.value = value;
this.consumers = {};
}
ObsCacher.prototype.next = function (value) {
this.value = value;
for (let key in this.consumers)
this.consumers[key](value);
};
ObsCacher.prototype.getValue = function () {
return this.value;
};
ObsCacher.prototype.subscribe = function (callback) { // This is necessary for "| async" in Angular
if ("next" in callback)
callback = callback.next;
callback(this.value); // This is a kind of unique id
let namespace = Math.random().toString(36).slice(2);
this.consumers[namespace] = callback; return {
unsubscribe: () => delete this.consumers[namespace]
};
};

Pretty similar to that of ObsEmitter but it has an additional field to hold the last value and a method to retrieve it.

ObsReplayer implementation

You can specify how many values you want to store in the buffer (bufferSize) and the amount of time to hold a value in the buffer before removing it from it (windowTime). Both configurations may exist simultaneously.

For this one I needed a DoublyLinkedList in order to manage the removal of elements in an efficient way.

function ObsReplayer(bufferSize, expireTime) {
this.bufferSize = bufferSize || Number.MAX_SAFE_INTEGER;
this.expireTime = expireTime || Number.MAX_SAFE_INTEGER;
this.values = new DoublyLinkedList();
this.consumers = {};
}
ObsReplayer.prototype.next = function (value) {
this.values.push({
timestamp: (new Date()).getTime(),
value
});
for (let key in this.consumers)
this.consumers[key](
this.getValues()
);
};
ObsReplayer.prototype.getValues = function () {
let collected = [];
let count = this.bufferSize;
this.values.traverseReverse((node) => {
if (!count) {
this.values.remove(node);
return;
}
let dt = (new Date()).getTime() - node.data.timestamp;
if (dt > this.expireTime) {
this.values.remove(node);
return;
}
collected.push(node.data.value);
count--;
});
return collected.reverse();
};
ObsReplayer.prototype.subscribe = function (callback) { // This is necessary for "| async" in Angular
if ("next" in callback) callback = callback.next;
// Run it with last values
callback(this.getValues());
// This is a kind of unique id
let namespace = Math.random().toString(36).slice(2);
this.consumers[namespace] = callback; return {
unsubscribe: () => delete this.consumers[namespace]
};
};

When next method is called it adds the data along with a timestamp and pack both under a node to be pushed inside the DoublyLinkedList.

The method getValues() is the responsible for the filtering, it loops through nodes in reverse cronological order end remove all those that are too far back. Those that survived this first filter then have to pass an additional filter by time. If they don’t, then they get removed. Else they get added to a collected array and returned to the consumer.

Conclusion

If you want to fiddle with the code head over to my GitHub repo:

https://github.com/4skinSkywalker/AsyncUtils

Or go to the corresponding NPM package:

https://www.npmjs.com/package/bada55asyncutils

--

--