Improving the observer pattern with subscription objects

Way back in February I wrote a series of articles on event-driven programming; the pattern I find myself using the most out of all those I covered is the observer pattern: an object that collects callback functions in order to notify other parts of a system when interesting things happen. The basic pattern looks something like this:

Observable = {
  bind: function(eventType, listener, context) {
    this._listeners = this._listeners || {};
    var list = this._listeners[eventType] = this._listeners[eventType] || [];
    list.push([listener, context]);
  },

  trigger: function(eventType, args) {
    if (!this._listeners) return;
    var list = this._listeners[eventType];
    if (!list) return;
    list.forEach(function(listener) {
      listener[0].apply(listener[1], args);
    });
  }
};

This is pretty simple: we can add callbacks to a list, and call them all when something happens. But what if we want to remove a listener? The approach taken by many DOM interfaces is to provide essentially the opposite of the bind() method:

Observable.unbind = function(eventType, listener, context) {
  if (!this._listeners) return;
  var list = this._listeners[eventType];
  if (!list) return;
  
  var i = list.length;
  while (i--) {
    if (list[i][0] === listener && list[i][1] === context)
      list.splice(i, 1);
  }
};

This just takes the original event type and listener function that we gave to bind(), and removes the listener from that event type’s list of listeners. Internally, that’s exactly what we want to achieve, but the API is hard to use. It reeks of something inspired by Java, where listeners are objects and easier to keep references to. Idiomatic JavaScript rests heavily on anonymous functions, and being forced keep references to a function and a context object just so you can remove a listener often feels clunky. It would be nicer if the bind() API returned something to represent the subscription so we could cancel it later:

var subscription = object.bind('someEvent', function(event) {
  // Handle the event
});

// Later on
subscription.cancel();

This is easily done, we just need an object to store the event type, listener function and observed object so that it can call the unbind() method for us when we want to cancel the subscription.

Subscription = function(target, eventType, listener, context) {
  this._target    = target;
  this._eventType = eventType;
  this._listener  = listener;
  this._context   = context;
  this._active    = true;
};

Subscription.prototype.cancel = function() {
  if (!this._active) return;
  this._target.unbind(this._eventType, this._listener, this._context);
  this._active = false;
};

We can then change the bind() method to return one of these objects:

Observable.bind = function(eventType, listener, context) {
  this._listeners = this._listeners || {};
  var list = this._listeners[eventType] = this._listeners[eventType] || [];
  list.push([listener, context]);
  return new Subscription(this, eventType, listener, context);
};

There are further refactorings we could make; maybe it would be better to store the subscription objects rather than the listener functions on the observable object. We’d probably then move the listener calling logic into the Subscription class, although this could introduce extra function call overhead while dispatching events. For now I’m happy leaving it as sugar over the unbind() method. (This is cribbed from the Faye client, which is required by the protocol to support an unsubscribe() method that takes a channel name and listener to remove.)

So we’ve got an API that we’re happy with but its internals are sub-optimal: it takes O(n) time to remove a listener from an object, and if we need to do this operation a lot at scale we’ll have a performance problem. In Faye, for example, every client that subscribes to a channel must eventually unsubscribe, so we’d like that to be fast. In this situation we have two things we can use to make this faster: every client has a unique ID, and the list of subscribers to a channel must contain no duplicates – otherwise a client would receive some messages twice.

Putting all this together, we could model a channel’s subscribers using a JavaScript object to produce set-like behaviour:

var Channel = function() {
  this._subscribers = {};
};

Channel.prototype.bind = function(clientId, listener, context) {
  this._subscribers[clientId] = [listener, context];
};

Channel.prototype.unbind = function(clientId) {
  delete this._subscribers[clientId];
};

Channel.prototype.trigger = function(message) {
  for (var clientId in this._subscriptions) {
    var client = this._subscriptions[clientId];
    client[0].call(client[1], message);
  }
};

This would make it easy for us to add and remove subscriptions from a channel, but it means that message dispatch is slowed down due to the poor performance of for/in. But as I mentioned last week, there’s an easy solution to that: keep a sorted list of all the subscribed client IDs. The list speeds up iteration and message dispatch, while sorting the list makes removal O(log n) rather than O(n). Combining this with the Subscriber class from above, we get a final draft of our Channel API:

Channel = function() {
  this._clientIds   = [];
  this._subscribers = {};
};

Channel.prototype.bind = function(clientId, listener, context) {
  if (!this._subscribers.hasOwnProperty(clientId)) {
    var index = this._indexOf(clientId);
    this._clientIds.splice(index, 0, clientId);
  }
  this._subscribers[clientId] = [listener, context];
  return new Subscription(this, clientId);
};

Channel.prototype.unbind = function(clientId) {
  if (!this._subscribers.hasOwnProperty(clientId)) return;
  delete this._subscribers[clientId];
  var index = this._indexOf(clientId);
  this._clientIds.splice(index, 1);
};

Channel.prototype.trigger = function(message) {
  var clientIds = this._clientIds,
      i         = clientIds.length,
      client;
  
  while (i--) {
    client = this._subscribers[clientIds[i]];
    client[0].call(client[1], message);
  }
};

Channel.prototype._indexOf = function(key) {
  var keys = this._clientIds,
      n    = keys.length,
      i    = 0,
      d    = n;

  if (n === 0)         return 0;
  if (key < keys[0])   return 0;
  if (key > keys[n-1]) return n;

  while (key !== keys[i] && d > 0.5) {
    d = d / 2;
    i += (key > keys[i] ? 1 : -1) * Math.round(d);
    if (key > keys[i-1] && key < keys[i]) d = 0;
  }
  return i;
};

All we’ve done here is use the implementation of SortedTable from the previous article to model the channel’s set of subscribers. It supports O(log n) insertion and removal of subscribers and should iterate over them reasonably quickly. Of course in practise we would want to measure the total load of binding, dispatching and unbinding under production traffic to see which model has the best trade-off for our use case. For example, in Ruby the iteration performance of a Hash is nowhere near as bad as for JavaScript objects, so we could use the simpler Channel implementation and get O(1) insertion and removal of clients.

One final benefit of this API is that all that’s required to unbind from a channel is the clientId, since that uniquely identifies the listener to remove. The use of a Subscription object hides the mechanics of unbinding from the caller so we don’t have to change its code if we change the Channel class’s storage model.