Evented programming patterns: Asynchronous pipelines

This post is part of a series on event-driven programming. The complete series is:

In a previous article for this series, I covered the topic of asynchronous methods: methods or functions that “return” a value by passing it to a callback instead of using the return keyword. The problem with these methods is that they are not easily composable in the traditional sense: since they don’t have normal return values, expressions such as f(g(x)) don’t work when g returns a result asynchronously. It is possible to compose these functions but it takes a lot more work:

// Apply g to x, then apply f to the result,
// then use _that_ result for something else

g(x, function(gx) {
  f(gx, function(fgx) {
    // do something with "f(g(x))"
  });
});

Hardly the most pleasing thing to write or read. If we have an arbitrary list of functions to pass a value through, the problem becomes even harder. With synchronous functions this is straightforward:

passThroughFilters = function(filters, value) {
  for (var i = 0, n = filters.length; i < n; i++) {
    value = filters[i](value);
  }
  return value;
};

We just pass the initial value to the first function, and use the return value as the starting point for the next iteration.

You’re probably wondering what practical problem this relates to and hoping against hope that I won’t mention monads (don’t worry). I hinted at it with the name of the above function. Filtering systems pop up in web stacks all the time: Rails’ before_filter and Rack middleware being two obvious examples. In particular both of these have the property that any function in the filter chain can block the rest of the chain: a before_filter can return false to block access to the underlying controller action, and Rack middleware can decide whether it wants to respond to a request or delegate it down the stack.

But both these examples are synchronous and easily implemented using composition, but I’ve had a couple of problems recently that required asynchronous filters: each filter can hold the chain up indefinitely while some async action is run, and the filter may resume the chain at any time using a callback.

The main example I have is the Faye extension system, which allows the user to modify or replace messages as they pass in and out of the server. Each extension method accepts a message and must callback with a message once it’s done with its modifications and filters. I’m going to present a slightly modified and hopefully more generically useful API for this, or at least one that’s more in keeping with the style I’ve used in this series.

server.addExtension('incoming', function(message, callback) {
  someAsyncAuthCall(message, function(allowed) {
    if (allowed) callback(message);
    else callback(null);
  });
});

This simple extension does something asynchronous to figure out whether the client is allowed to send that message, and if it’s not then the message is replaced with null to stop it propagating any further. Any number of extensions can be added to the server and each message is piped through them before reaching the core server code. The server calls the extension internally like this:

Faye.Server.prototype.process = function(message, callback, scope) {
  // various setup steps...
  this.pipeThroughExtensions('incoming', message, function(message) {
    // handle message after extensions have run
  }, this);
};

This just passes the message through the incoming filters, then picks the message up at the other end of the pipeline using an inline callback. These two methods, addExtension() and pipeThroughExtensions() are provided in Faye by a mixin called Extensible. The addExtension() method simply needs to store the extension in a list:

Extensible.addExtension = function(type, handler, scope) {
  this._extensions = this._extensions || {};
  var list = this._extensions[type] = this._extensions[type] || [];
  list.push([handler, scope]);
};

pipeThroughExtensions() is a little more complex. Internally it uses a simple function whose job it is to process a single extension from a list. If we’ve reached the end of the chain, we can run the waiting callback. Otherwise, we call the next extension and pass it the pipe function so the extension can resume the chain when it’s done.

Extensible.pipeThroughExtensions = function(type, input, callback, scope) {
  if (!this._extensions) return callback.call(scope, input);
  
  if (!this._extensions.hasOwnProperty(type))
    return callback.call(scope, input);
  
  var list = this._extensions[type].slice();
  
  var pipe = function(data) {
    var extension = list.shift();
    if (!extension) return callback.call(scope, data);
    extension[0].call(extension[1], data, pipe);
  };
  pipe(input);
};

That call to extension[0].call(extension[1], data, pipe) calls the extension with the current value of the input, passing in the continuation function. When the extension calls its callback, we process the next extension in the list.

This isn’t a pattern I’ve used an awful lot, but it solved a problem on Songkick really nicely last week. We have various buttons on the site that let users start tracking things, for example to start following an artist or say they’re going to a concert. These buttons typically are forms that submit using Ajax. For some of these buttons we just added the ability to auto-publish the concert to your Facebook profile. To do this our server-side code needs a cookie to be set that gives us permission to post to Facebook on that user’s behalf. So before the form is submitted, we need this cookie to be set, which requires a lot of async calls the the Facebook JavaScript SDK to log the user in and ask for publishing permissions.

The initial approach was to try to intercept the DOM events bound to the form submission and insert more logic for certain types of tracking button, but using the above model it was much simpler (again, this has been simplified to illustrate a point):

Trackings = { /* ... */ };
$.extend(Trackings, Extensible);

$('form.tracking').bind('submit', function() {
  var form = $(this);
  Trackings.pipeThroughExtensions('beforesubmit', form, function() {
    form.ajaxSubmit();
  });
  return false;
});

Trackings.addExtension('beforesubmit', function(form, resume) {
  if (!form.hasClass('im-going')) return resume(form);
  FB.login(function(session) {
    // more login and permission logic to set the cookie
    resume(form);
  });
});

This keeps the DOM bindings simple without shoving a lot of business logic into the initial event handler, but allows quite powerful modifications. For example in some contexts you may decide you need to block a submission: just don’t call resume() and you’ll block the filter chain.

One caveat: this pattern needs a better name: Extensible is far too vague and non-descriptive. This is basically the Observable module with the modification that the event listeners are allowed to have side effects on the event publisher. In Faye, Extensible is used in classes that support the extension API, but I’d recommend choosing something more domain-specific depending on how you need to tweak it.