Fargo: a Lisp for asynchronous programming

A couple weeks ago, I tooted:

CoffeeScript ought to take this opportunity to add new features that help with async e.g. tail calls, continuations.

Now there’s a problem with this: it’s a core part of CoffeeScript’s usability and resultant popularity that it compiles to readable JavaScript. If you want to introduce new execution semantics, a straightforward one-to-one syntax transformation is not such a viable option.

But I maintain that, given we’re reinventing the language we work with on the web, now’s as good a time as any to reexamine the tools we use to write programs. We’re all annoyed by working with callbacks, and with good reason. Programming with explicit callbacks introduces race conditions and uncertainty; it’s not the syntactic burden of typing function that causes the real pain, it’s the creation of regions of your program whose order of execution is unpredictable.

In the Ruby world, we’ve been using fibers to great effect to hide the asynchronous nature of code. A fiber is just a special kind of single-use function, whose execution can be suspended and resumed by the user. They are perfect for pausing a block of code while some I/O is done, but they do not block the interpreter: when you pause a fiber the interpreter can carry on doing other work until the fiber is resumed. Ilya’s first example is perfect:

def http_get(url)
  f = Fiber.current
  http = EventMachine::HttpRequest.new(url).get
 
  # resume fiber once http call is done
  http.callback { f.resume(http) }
  http.errback  { f.resume(http) }
 
  return Fiber.yield
end

Calling http_get() initiates an async HTTP request, and just before returning it ‘yields’, i.e. it pauses the current fiber. When the request completes, we resume the fiber with the response. A call to http_get() looks like synchronous code, but behind the scenes the I/O is being done in a way that doesn’t block the interpreter.

I thought it would be fun to try this out in JavaScript, so I hacked up a very basic Scheme-like interpreter to support fibers, and called it Fargo. Scheme usually includes a function called call-with-current-continuation or call/cc, of which fibers are a simplified version. Call/cc allows the current state of a computation (the “continuation”) to be saved and resumed any number of times. This means, at least in a naive implementation, that the saved state of the stack must be copied every time you invoke a continuation, otherwise state would leak between uses. Fibers can only be resumed once from their last yield-point, making running and resuming them cheaper.

Fibers as implemented in Ruby also require the user to explicitly initiate a fiber. Supporting call/cc requires constantly keeping track of the current continuation since it may be captured at any time. Because fibers must be explicitly initiated, when not in a fiber we can use a faster stackless execution engine which doesn’t need to track state so much.

Here’s an example in Fargo of getting a value that’s produced by an async API (set-timeout) without using callbacks in the calling code:

; get-message function: captures the current fiber, yields,
; then resumes after a timeout when we have a message to return
(define (get-message)
  (define f (current-fiber))
  (set-timeout (lambda ()
    (f "Hello, world!")
  ) 5000)
  (yield))

; main program fiber
(define main (fiber ()
  ; get the message using a sync-looking API
  (define msg (get-message))
  ; alert the message
  (alert msg)
))

; run the main fiber
(main)

Because it’s tail-recursive, you can use recursion to implement infinite streams:

> (define strea­m (fibe­r ()
    (let loop ((i 0))
      (yiel­d i)
      (loop­ (+ i 1))))­)

> (stream)
; => 0
> (stream)
; => 1
> (stream)
; => 2

So where is Fargo going? Right now, with my current ban on committing myself to any new large side-projects, it remains a quick (12 days old) experiment. Maybe someone will like the idea enough to turn it into a production language but for now I’m happy to let people try it out and let me know what they think. I’ve been making small improvements to it the last few days to get it to run as much of Heist‘s core library as possible and it now runs all the macros and the list and numeric libraries.

I’m going to keep thinking about how we can make async programming easier and I’d love your ideas – the nice thing about working on a Lisp is that changing the interface it provides is really simple.

Promises are the monad of asynchronous programming

In my introduction to monads in JavaScript we saw a couple of monads and examined the commonality between them to expose an underlying design pattern. Before I get on to how that applies to asynchronous programming we need to take one final diversion and discuss polymorphism.

Consider the list monad that we implemented before:

var compose = function(f, g) {
  return function(x) { return f(g(x)) };
};

// unit :: a -> [a]
var unit = function(x) { return [x] };

// bind :: (a -> [a]) -> ([a] -> [a])
var bind = function(f) {
  return function(list) {
    var output = [];
    for (var i = 0, n = list.length; i < n; i++) {
      output = output.concat(f(list[i]));
    }
    return output;
  };
};

In our previous example, we just had functions that accepted an HTMLElement and returned a list of HTMLElements. Notice the type signature of bind above: (a -> [a]) -> ([a] -> [a]). That ‘a‘ just means we can put any type in its place, but the signature a -> [a] implies that the functions must return a list of things of the same type as the input. This is actually not the case, and the correct signature is:

bind :: (a -> [b]) -> ([a] -> [b])

For example, bind may take a function that maps a single String to a list of HTMLElements, and return a function that maps a list of Strings to a list of HTMLElements. Consider these two functions: the first takes a string and returns a list of nodes with that tag name, and the second takes a node and returns a list of all the class names it has.

// byTagName :: String -> [HTMLElement]
var byTagName = function(name) {
  var nodes = document.getElementsByTagName(name);
  return Array.prototype.slice.call(nodes);
};

// classNames :: HTMLElement -> [String]
var classNames = function(node) {
  return node.className.split(/\s+/);
};

If we ignore the ‘returns a list of’ aspect, we’d expect to be able to compose these to get all the class names of all the links in a document:

var classNamesByTag = compose(classNames, byTagName);

Of course, we do have lists to contend with, but because the monad just cares about the lists, not what’s in the lists, we can use it to compose the functions:

// classNamesByTag :: [String] -> [String]
var classNamesByTag = compose(bind(classNames), bind(byTagName));

classNamesByTag(unit('a'))
// -> ['profile-links', 'signout-button', ...]

The monad just cares about the ‘list of’ part, not the contents of the list. Just as a reminder, let’s recast this using the pipeline syntax we developed in the previous article:

// bind :: [a] -> (a -> [b]) -> [b]
var bind = function(list, f) {
  var result = [];
  for (var i = 0, n = list.length; i < n; i++) {
    result = result.concat(f(list[i]));
  }
  return result;
};

// pipe :: [a] -> [a -> [b]] -> [b]
var pipe = function(x, functions) {
  for (var i = 0, n = functions.length; i < n; i++) {
    x = bind(x, functions[i]);
  }
  return x;
};

// for example
pipe(unit('a'), [byTagName, classNames])
// -> ['profile-links', 'signout-button', ...]

The pipe function doesn’t actually care that bind deals with lists, and we can write its signature in a more generic way:

pipe :: m a -> [a -> m b] -> m b

In this notation, m a means ‘a monadic wrapper around a‘. For example, if the input is a list of strings, under the List monad the m refers to ‘list of’, and a refers to ‘string’. This concept of generic containers becomes extremely important when we consider how we can apply these ideas to asynchronous programming.

One common complaint levelled against Node.js is that it is easy to become mired in ‘callback hell’, where callbacks become nested so deeply it’s impossible to maintain the resulting code. There are various ways to solve this, but I think monads provide an interesting approach that highlights the need to separate business logic from code that simply glues data together.

Let’s consider a fairly contrived example: we have a file called urls.json, that contains a JSON document that contains a URL. We want to read the file, extract this URL, request the URL from the web, and print the response body. Monads encourage us to think in terms of types of data, and how they flow through a pipeline. We can model this problem using our pipeline syntax:

pipe(unit(__dirname + '/urls.json'),
        [ readFile,
          getUrl,
          httpGet,
          responseBody,
          print         ]);

Beginning with the pathname (a String), we can trace the data as it flows through this pipe:

  • readFile takes a String (pathname) and returns a String (file contents)
  • getUrl takes a String (a JSON document) and returns a URI object
  • httpGet takes a URI and returns a Response
  • responseBody takes a Response and returns a String
  • print takes a String and returns nothing

So each link in the chain produces a data type that is consumed by the next. But in Node, many of these operations are asynchronous. Rather than use continuation-passing style and deeply nested callbacks, how can we modify the return types of these functions to indicate the result might not be known yet? By wrapping them in Promises:

readFile      :: String   -> Promise String
getUrl        :: String   -> Promise URI
httpGet       :: URI      -> Promise Response
responseBody  :: Response -> Promise String
print         :: String   -> Promise null

The Promise monad needs three things: a wrapper object, a unit function to wrap a value in this object, and a bind function to help us compose the above functions. We can implement promises using the Deferrable module from JS.Class:

var Promise = new JS.Class({
  include: JS.Deferrable,
  
  initialize: function(value) {
    // if value is already known, succeed immediately
    if (value !== undefined) this.succeed(value);
  }
});

With this class, we can then implement the functions we need to solve our problem. The functions that can return a value immediately just return a value wrapped in a Promise object:

// readFile :: String -> Promise String
var readFile = function(path) {
  var promise = new Promise();
  fs.readFile(path, function(err, content) {
    promise.succeed(content);
  });
  return promise;
};

// getUrl :: String -> Promise URI
var getUrl = function(json) {
  var uri = url.parse(JSON.parse(json).url);
  return new Promise(uri);
};

// httpGet :: URI -> Promise Response
var httpGet = function(uri) {
  var client  = http.createClient(80, uri.hostname),
      request = client.request('GET', uri.pathname, {'Host': uri.hostname}),
      promise = new Promise();
  
  request.addListener('response', function(response) {
    promise.succeed(response);
  });
  request.end();
  return promise;
};

// responseBody :: Response -> Promise String
var responseBody = function(response) {
  var promise = new Promise(),
      body    = '';
  
  response.addListener('data', function(c) { body += c });
  response.addListener('end', function() {
    promise.succeed(body);
  });
  return promise;
};

// print :: String -> Promise null
var print = function(string) {
  return new Promise(sys.puts(string));
};

So we’ve now got a way to represent a deferred value simply using data types, no nested callbacks required. The next step is to implement unit and bind in such a way that the callback glue is contained within these functions. unit is very simple, it just needs to wrap a value in a Promise. bind is more complex: it accepts a Promise and a function. It must extract the value of the Promise, give this value to the function, which returns another Promise, then wait for this final Promise to complete.

// unit :: a -> Promise a
var unit = function(x) {
  return new Promise(x);
};

// bind :: Promise a -> (a -> Promise b) -> Promise b
var bind = function(input, f) {
  var output = new Promise();
  input.callback(function(x) {
    f(x).callback(function(y) {
      output.succeed(y);
    });
  });
  return output;
};

With these definitions, you should find that the pipeline shown above just works, for example if I’ve placed my GitHub API URL in the urls.json file:

pipe(unit(__dirname + '/urls.json'),
        [ readFile,
          getUrl,
          httpGet,
          responseBody,
          print         ]);

// prints:
// {"user":{"name":"James Coglan","location":"London, UK", ...

I hope this has shown why Promises (also known as Deferreds in jQuery and Dojo) are valuable, how they can help you pipe data through your code, and how this piping glue can be contained in quite a neat way. On the client side, you’ll find this pattern applies equally well to Ajax and animation; in fact MethodChain (which I’ve been using as async glue for years) is just the Promise monad applied to method calls instead of function calls.

The full working code for this article is on GitHub if you’d like to tinker with it. After that, you can dive into jQuery’s Deferred API. There’s also talk of promises making a return to Node, after they were banished in favour of continuation-passing style, so you may find you can do this without writing so much plumbing in future.

Primer: the cache that knows too much

Every so often at Songkick we have what we call innovation days – a day or two where everyone can work on whatever they like to make our product, technology or workplace better. We did one this last Friday, and I made a start on something I’ve been wanting to do for a long time: it’s called Primer.

This year a couple of development platforms have sprung up that make doing real-time applications considerably easier: LunaScript and Fun. They both aim to let you write a web app by writing your templates in the usual way, but then they deal with updating the client and synching state for you. From the Fun intro:

When you say "Hello " user.name, you don’t mean just “render the value of user.name right now”. Rather, you mean “display the value of user.name here, and any time user.name changes update the UI.

Fun will update your UI in real-time, keystroke by keystroke. I can’t do that yet, but I can do something similar: automatically update caches when your ActiveRecord model changes. Consider:

<% primer '/users/1/name' do %>
  <p><%= @user.full_name %></p>
<% end %>

This is just wrapping part of a template in a caching block, much like Rails fragment caching. Now, surely your development tools ought to be able to figure out that the cache key /users/1/name depends on the full_name attribute of @user, right? It says so right there. So this is what Primer does: it caches your HTML fragments and figures out which bits of your database they depend on. When your database changes, it updates the cache. No sweeper code, no combing through call stacks, no dealing with message buses.

Now it turns out that using paths as cache keys has two really nice side effects. Firstly, you can use a declarative routing scheme to tell Primer how to calculate values:

Primer.cache.routes do
  get '/users/:id/name' do
    user = User.find_by_id(params[:id])
    user.full_name
  end
end

This means that, by moving some rendering logic out of your templates, you can tell Primer how to regenerate cache keys, so it can update the cache with new data instead of just invalidating it. This can be useful for long-tail pages that don’t get much user traffic but need to be fast for the Googlebots.

Secondly, it means that all your cache keys are valid Faye channel names, and can be used to update the client in real time. If you put this in your view:

<%= primer '/users/1/name' %>

Primer will use the '/users/:id/name' route to generate the value, cache it, and also will update any clients viewing the page when that cache slot is updated. You don’t need to write any of the network code for this, it’s all done for you, so that template fragment gets you real-time updates for free.

Right now it’s very much a proof-of-concept, just to show that this sort of thing is possible. It needs to store a lot of data about your model to do its job, and needs to perform a lot of ugly reflection on ActiveRecord to propagate changes correctly. It still probably screws up a lot of edge cases, most glaringly it can’t figure out when a has_and_belongs_to_many collection changes. It can figure out a lot of common has_many/belongs_to/has_many :through relationships. I have taken some steps to ease the load by making it really easy to create background workers to update your caches, but it’s definitely not for production.

I’ve rambled on enough, so if you want to poke at a half-baked week-old project I’d really like feedback on whether this can be turned into a viable bit of production software. Just gem install primer or get it on GitHub.

Terminus: a client-side Capybara driver

Last week the Capybara project released version 0.4. Since the 0.3.9 release, which added support for third-party drivers, I’ve been working on turning Terminus into fully compatible driver for it. It’s still an experiment but it’s in the sort of semi-useful state that means I’m okay throwing it out to see if anyone’s interested in it.

So what is it? Terminus is a Capybara driver where most of the driver functions are implemented in client-side JavaScript. It lets you script any browser on any machine using the Capybara API, without any browser plugins or extensions. ‘Any browser’ really means ‘any browser that supports the document.evaluate() XPath API’ for now, so you can’t use IE, but the ‘any machine’ part is true: any browser that can see your development machine can be controlled using Terminus, including any phone, iPad or desktop machine on your local network.

This is very much still an experiment. Because every little Capybara call has to go over the network (it uses Faye to send commands to browsers) it’s very slow, and the connection sometimes flakes out. Under ideal circumstances though it does pass most of the Capybara test suite. It supports JavaScript, cookies and window switching (assuming the connected browser has these features enabled). The major omissions are:

  • attach_file does not work since file uploads cannot be controlled by JavaScript for security reasons.
  • A few selectors and HTML5 features in the specs don’t work due to browser inconsistencies.
  • Redirects work but infinite redirection cannot be detected.

The original intention was for Terminus to be a tool for automating cross-browser testing, so the lack of IE support is kind of a problem. I actually went some way towards fixing this; I wrote a pretty stupid document.evaluate() replacement called Pathology, and to support that a PEG parser compiler called Canopy so I could write a declarative XPath grammar. Unfortunately these don’t yet perform well enough to support the workload that Capybara throws at a browser.

I also want to add an API for switching browsers based on vendor and OS versions, but this release is just enough to support the required Capybara API.

You can find out more on the project’s GitHub page, and installation is the usual gem install terminus command – I’d love if a few of you could kick its tyres a bit and figure out if it’s actually useful for anything.

Improving the observer pattern with subscription objects

Way back in February I wrote a series of articles on event-driven programming; the pattern I find myself using the most out of all those I covered is the observer pattern: an object that collects callback functions in order to notify other parts of a system when interesting things happen. The basic pattern looks something like this:

Observable = {
  bind: function(eventType, listener, context) {
    this._listeners = this._listeners || {};
    var list = this._listeners[eventType] = this._listeners[eventType] || [];
    list.push([listener, context]);
  },

  trigger: function(eventType, args) {
    if (!this._listeners) return;
    var list = this._listeners[eventType];
    if (!list) return;
    list.forEach(function(listener) {
      listener[0].apply(listener[1], args);
    });
  }
};

This is pretty simple: we can add callbacks to a list, and call them all when something happens. But what if we want to remove a listener? The approach taken by many DOM interfaces is to provide essentially the opposite of the bind() method:

Observable.unbind = function(eventType, listener, context) {
  if (!this._listeners) return;
  var list = this._listeners[eventType];
  if (!list) return;
  
  var i = list.length;
  while (i--) {
    if (list[i][0] === listener && list[i][1] === context)
      list.splice(i, 1);
  }
};

This just takes the original event type and listener function that we gave to bind(), and removes the listener from that event type’s list of listeners. Internally, that’s exactly what we want to achieve, but the API is hard to use. It reeks of something inspired by Java, where listeners are objects and easier to keep references to. Idiomatic JavaScript rests heavily on anonymous functions, and being forced keep references to a function and a context object just so you can remove a listener often feels clunky. It would be nicer if the bind() API returned something to represent the subscription so we could cancel it later:

var subscription = object.bind('someEvent', function(event) {
  // Handle the event
});

// Later on
subscription.cancel();

This is easily done, we just need an object to store the event type, listener function and observed object so that it can call the unbind() method for us when we want to cancel the subscription.

Subscription = function(target, eventType, listener, context) {
  this._target    = target;
  this._eventType = eventType;
  this._listener  = listener;
  this._context   = context;
  this._active    = true;
};

Subscription.prototype.cancel = function() {
  if (!this._active) return;
  this._target.unbind(this._eventType, this._listener, this._context);
  this._active = false;
};

We can then change the bind() method to return one of these objects:

Observable.bind = function(eventType, listener, context) {
  this._listeners = this._listeners || {};
  var list = this._listeners[eventType] = this._listeners[eventType] || [];
  list.push([listener, context]);
  return new Subscription(this, eventType, listener, context);
};

There are further refactorings we could make; maybe it would be better to store the subscription objects rather than the listener functions on the observable object. We’d probably then move the listener calling logic into the Subscription class, although this could introduce extra function call overhead while dispatching events. For now I’m happy leaving it as sugar over the unbind() method. (This is cribbed from the Faye client, which is required by the protocol to support an unsubscribe() method that takes a channel name and listener to remove.)

So we’ve got an API that we’re happy with but its internals are sub-optimal: it takes O(n) time to remove a listener from an object, and if we need to do this operation a lot at scale we’ll have a performance problem. In Faye, for example, every client that subscribes to a channel must eventually unsubscribe, so we’d like that to be fast. In this situation we have two things we can use to make this faster: every client has a unique ID, and the list of subscribers to a channel must contain no duplicates – otherwise a client would receive some messages twice.

Putting all this together, we could model a channel’s subscribers using a JavaScript object to produce set-like behaviour:

var Channel = function() {
  this._subscribers = {};
};

Channel.prototype.bind = function(clientId, listener, context) {
  this._subscribers[clientId] = [listener, context];
};

Channel.prototype.unbind = function(clientId) {
  delete this._subscribers[clientId];
};

Channel.prototype.trigger = function(message) {
  for (var clientId in this._subscriptions) {
    var client = this._subscriptions[clientId];
    client[0].call(client[1], message);
  }
};

This would make it easy for us to add and remove subscriptions from a channel, but it means that message dispatch is slowed down due to the poor performance of for/in. But as I mentioned last week, there’s an easy solution to that: keep a sorted list of all the subscribed client IDs. The list speeds up iteration and message dispatch, while sorting the list makes removal O(log n) rather than O(n). Combining this with the Subscriber class from above, we get a final draft of our Channel API:

Channel = function() {
  this._clientIds   = [];
  this._subscribers = {};
};

Channel.prototype.bind = function(clientId, listener, context) {
  if (!this._subscribers.hasOwnProperty(clientId)) {
    var index = this._indexOf(clientId);
    this._clientIds.splice(index, 0, clientId);
  }
  this._subscribers[clientId] = [listener, context];
  return new Subscription(this, clientId);
};

Channel.prototype.unbind = function(clientId) {
  if (!this._subscribers.hasOwnProperty(clientId)) return;
  delete this._subscribers[clientId];
  var index = this._indexOf(clientId);
  this._clientIds.splice(index, 1);
};

Channel.prototype.trigger = function(message) {
  var clientIds = this._clientIds,
      i         = clientIds.length,
      client;
  
  while (i--) {
    client = this._subscribers[clientIds[i]];
    client[0].call(client[1], message);
  }
};

Channel.prototype._indexOf = function(key) {
  var keys = this._clientIds,
      n    = keys.length,
      i    = 0,
      d    = n;

  if (n === 0)         return 0;
  if (key < keys[0])   return 0;
  if (key > keys[n-1]) return n;

  while (key !== keys[i] && d > 0.5) {
    d = d / 2;
    i += (key > keys[i] ? 1 : -1) * Math.round(d);
    if (key > keys[i-1] && key < keys[i]) d = 0;
  }
  return i;
};

All we’ve done here is use the implementation of SortedTable from the previous article to model the channel’s set of subscribers. It supports O(log n) insertion and removal of subscribers and should iterate over them reasonably quickly. Of course in practise we would want to measure the total load of binding, dispatching and unbinding under production traffic to see which model has the best trade-off for our use case. For example, in Ruby the iteration performance of a Hash is nowhere near as bad as for JavaScript objects, so we could use the simpler Channel implementation and get O(1) insertion and removal of clients.

One final benefit of this API is that all that’s required to unbind from a channel is the clientId, since that uniquely identifies the listener to remove. The use of a Subscription object hides the mechanics of unbinding from the caller so we don’t have to change its code if we change the Channel class’s storage model.

Speaking at RubyConf 2010

A quick bit of self-promotional guff: I’m thrilled to announce I’ll be presenting at RubyConf 2010 in New Orleans this November. I’ll be speaking about the Ruby incarnation of Faye, and more broadly about doing asynchronous programming and testing using Ruby and EventMachine.

I gave a similar talk based on the JavaScript/Node version at the London Ajax User Group a couple months back, if you want a preview.

You can grab a ticket from the conference site, assuming they’ve not sold out by the time you read this. It’s a pretty daunting prospect sharing a bill with Matz and DHH, so fingers-crossed my first proper conference spot goes well.

The potentially asynchronous loop

If you write a lot of asynchronous or event-driven code, you’re probably going to end up needing an asynchronous for loop. That is, a loop that runs each iteration sequentially but those iterations may contain non-blocking logic that must halt the loop until the async action resumes. In my case, I need the main loop of JS.Test, the testing tool to be bundled with JS.Class 3.0, to run each test in sequence but each test has to support a suspend/resume system for async tests.

I’ll use a slightly more contrived example here: fetching a series of responses over Ajax in sequence using jQuery. It should be apparent that this will not do the job:

listOfUrls.forEach(function(url) {
  $.get(url, function(response) {
    // handle response
  });
});

The requests are made in sequence, but they overlap because each async request does not block the loop. We want each iteration to hold up the loop until the request it opens has completed. To do this, I’m going to introduce a resume callback as an argument to the iterator; each iteration must call this to continue the loop.

listOfUrls.asyncEach(function(url, resume) {
  $.get(url, function(response) {
    // handle response
    resume();
  });
});

An initial stab at implementing asyncEach() might look like this. The method takes an iterator function, and creates an internal function that moves a counter forward one index. As long as we’ve not reached the end of the list, we call iterator with the current element and the internal function as the resume callback.

Array.prototype.asyncEach = function(iterator) {
  var list = this,
      n    = list.length,
      i    = -1;
  
  var resume = function() {
    i += 1;
    if (i === n) return;
    iterator(list[i], resume);
  };
  resume();
};

This works just fine as long as every iteration contains an async action. Async code allows us to empty the call stack and start again when the async logic resumes. In JS.Test, each test might contain async code, but probably won’t, at least for my uses. If too many tests in a row don’t contain any async code, we get a stack overflow because of resume calling itself indirectly without giving the stack a break.

So we need a looping construct for iterations that might contain async code, that must on all JavaScript platforms only some of which have async functions built-in. Initially we can get rid of the stack overflow problem by scheduling the next iteration using setTimeout() instead of calling directly and growing the stack:

Array.prototype.asyncEach = function(iterator) {
  var list = this,
      n    = list.length,
      i    = -1;
  
  var iterate = function() {
    i += 1;
    if (i === n) return;
    iterator(list[i], resume);
  };
  
  var resume = function() {
    setTimeout(iterate, 1);
  };
  resume();
};

We should now be able to handle a large list of tests because the resume callback uses setTimeout() to essentially clear the call stack between iterations. But we now have the problem that this won’t run on platforms without setTimeout(). How do we turn it into a simple loop on such platforms without blowing the stack?

The trick is to have resume() act as a scheduling device, but implement the scheduling differently. On non-async platforms, we can do this by keeping a count of iterations left to run: calling resume() adds to this count, while calling iterate() decrements it. We keep a single loop running that runs the iteration as long as there are calls remaining. The finished code looks like this:

Array.prototype.asyncEach = function(iterator) {
  var list    = this,
      n       = list.length,
      i       = -1,
      calls   = 0,
      looping = false;

  var iterate = function() {
    calls -= 1;
    i += 1;
    if (i === n) return;
    iterator(list[i], resume);
  };

  var loop = function() {
    if (looping) return;
    looping = true;
    while (calls > 0) iterate();
    looping = false;
  };

  var resume = function() {
    calls += 1;
    if (typeof setTimeout === 'undefined') loop();
    else setTimeout(iterate, 1);
  };
  resume();
};

Notice how the looping flag blocks any more that one loop running at a time. After the first call to loop(), the effect of each iteration calling the resume() function is simply to increment calls and keep the loop going. This setup lets us iterate over a list on non-async platforms, and allows async code to exist within iterations on compatible platforms, keeping all the complexity in one place. Anywhere we want to iterate, we just call

list.asyncEach(function(item, resume) {
  // handle item
  resume();
});

and asyncEach figures out the best way to handle the loop.

Evented programming patterns: Testing event-driven apps

This post is part of a series on event-driven programming. The complete series is:

Thus far all the articles in this series have focused on methods for structuring applications to make them more modular and maintainable. They all help in their own way when correctly applied, but all of them leave one major area with something to be desired: scripting.

To be clear, it’s not that these techniques make code unscriptable. The problem is that even though the components may be well-designed and conceptually easy to plug together, writing quick scripts to manipulate an event-driven application can require a lot of boilerplate. One place this really shows up is in tests. Tests should, nay, must be easy to read. If they’re not, then their role as specifications and design/debugging tools is lost. And if tests are easy to write, you end up writing more tests, and you make better software. Wins all around.

But have you tried integration-testing a heavily event-driven app? I don’t care how fancy your test framework is, when every step in a test scenario needs some async work you’re going to end up with this:

ClientSpec = JS.Test.describe(Faye.Client, function() {
  before(function(resume) {
    var server = new Faye.NodeAdapter({mount: '/'})
    server.listen(8000)
    setTimeout(function() {
      resume(function() {
        var endpoint = 'http://0.0.0.0:8000'
        this.clientA = new Faye.Client(endpoint)
        this.clientB = new Faye.Client(endpoint)
      })
    }, 500)
  })
  
  it('sends a message from A to B', function(resume) {
    clientA.subscribe('/channel', function(message) {
      this.message = message
    }, this)
    setTimeout(function() {
      clientB.publish('/channel', {hello: 'world'})
      setTimeout(function() {
        resume(function() {
          assertEqual( {hello: 'world'}, message )
        })
      }, 250)
    }, 100)
  })
})

All this test does is make sure one client can send a message to another using a Faye messaging server. While working on Faye, I’ve ended up getting the tests into a state where that test reads like this:

Scenario.run('Two clients, single message send',
function() { with(this) {
  server( 8000 )
  client( 'A', ['/channel'] )
  client( 'B', [] )
  publish( 'B', '/channel', {hello: 'world'} )
  checkInbox( 'A', [{hello: 'world'}] )
  checkInbox( 'B', [] )
}})

This should be pretty self-explanatory: start a server on port 8000, make client A subscribe to /channel, make client B with no subscriptions, make B publish the message {hello: 'world'} to /channel, and make sure A and only A received the message.

Now the problem is, executing all those steps synchronously won’t work. There are network delays and other timeouts that need to happen between steps to make the test work. We need an abstraction that lets us write scripts at a very high level like this and hides all the inconsequential (and possibly volatile) glue code between the lines. This is pretty easy to do by breaking the script into two components, which here I’ll call a Scenario and a CommandQueue.

The job of the Scenario is to implement all the script steps involved in running the test. It should have a method for each type of command that accepts the right arguments, and also accepts a callback that it should call when the scenario is ready to continue running steps. For example, let’s get our Faye scenario class started:

Scenario = function() {
  this._clients = {};
  this._inboxes = {};
};

Scenario.prototype.server = function(port, resume) {
  this._port = port;
  var server = new Faye.NodeAdapter({mount: '/'});
  server.listen(port);
  setTimeout(resume, 500);
};

The server() command in our test just takes a port number, so our scenario method needs to take the port number and the resume callback function. It runs resume after a delay to let the server spin up so it’s ready to take requests before we continue our test.

Next up we need the client() method. This takes a name and a list of channels to subscribe to, and the callback as before.

Scenario.prototype.client = function(name, channels, resume) {
  var endpoint = 'http://0.0.0.0:' + this._port,
      client   = new Faye.Client(endpoint);
  
  channels.forEach(function(channel) {
    client.subscribe(channel, function(message) {
      this._inboxes[name].push(message);
    }, this);
  }, this);
  
  this._clients[name] = client;
  this._inboxes[name] = [];
  setTimeout(resume, 100);
};

A similar pattern here: we create a channel, make somewhere to store the messages it receives, set up subscriptions, then wait a little for the subscriptions to have time to register with the server. By now the publish() and checkInbox() methods should be fairly predictable:

Scenario.prototype.publish = function(name, channel, message, resume) {
  var client = this._clients[name];
  client.publish(channel, message);
  setTimeout(resume, 250);
};

Scenario.prototype.checkInbox = function(name, messages, resume) {
  assert.deepEqual(messages, this._inboxes[name]);
  resume();
};

Note how in the final method we still accept the resume callback even though the method is synchronous and calls the callback immediately. The job of this component is to provide a convention that in general, you should pass each method a callback and it decides when the program should continue. Here we’ve used timeouts, but it could be after an Ajax call, and animation, a user-triggered event, anything at all.

Now we’ve implemented all the steps, we need something that can run the test as we’ve written it, without callbacks. I’ll call this the CommandQueue: rather than executing commands immediately it stores them in a queue.

CommandQueue = function() {
  this._scenario = new Scenario();
  this._commands = [];
};

CommandQueue.prototype = {
  server: function(port) {
    this.enqueue(['server', port]);
  },
  client: function(name, channels) {
    this.enqueue(['client', name, channels]);
  },
  publish: function(name, channel, message) {
    this.enqueue(['publish', name, channel, message]);
  },
  checkInbox: function(name, messages) {
    this.enqueue(['checkInbox', name, messages]);
  },
};

This implements the API that we want to use in our test, but so far the script is inert: nothing actually gets run. We need a method to run the next command in the queue. This takes the next command off the queue, and adds a callback to the argument list that will run the next command once called. The methods in the Scenario will use that callback to resume the test when ready.

CommandQueue.prototype.runNext = function() {
  var command = this._commands.shift().slice(),
      method  = command.shift(),
      self    = this;
  
  var resume = function() { self.runNext() };
  command.push(resume);
  
  this._scenario[method].apply(this._scenario, command);
};

We also need the first command addition to trigger the execution of the queue, so we’ll implement enqueue() to deal with this. We start the execution with a timeout, since if we do it synchronously no more commands will have been added by the time the first command returns.

CommandQueue.prototype.enqueue = function(command) {
  this._commands.push(command);
  if (this._started) return;
  
  this._started = true;

  var self = this;
  setTimeout(function() { self.runNext() }, 100);
};

As the final piece of glue, we need the Scenario.run() function, which takes our test script and executes it using a CommandQueue to do the work.

Scenario.run = function(testName, block) {
  var commandQueue = new CommandQueue();
  block.call(commandQueue);
};

If you have a lot of test scenarios, you can use another command queue to help sequence them into a single test suite without too much bother. For a more complete example of some of these patterns, you can read through the Faye test suite, which also includes a variation on the above done in Ruby with EventMachine and Test::Unit.

Evented programming patterns: Asynchronous pipelines

This post is part of a series on event-driven programming. The complete series is:

In a previous article for this series, I covered the topic of asynchronous methods: methods or functions that “return” a value by passing it to a callback instead of using the return keyword. The problem with these methods is that they are not easily composable in the traditional sense: since they don’t have normal return values, expressions such as f(g(x)) don’t work when g returns a result asynchronously. It is possible to compose these functions but it takes a lot more work:

// Apply g to x, then apply f to the result,
// then use _that_ result for something else

g(x, function(gx) {
  f(gx, function(fgx) {
    // do something with "f(g(x))"
  });
});

Hardly the most pleasing thing to write or read. If we have an arbitrary list of functions to pass a value through, the problem becomes even harder. With synchronous functions this is straightforward:

passThroughFilters = function(filters, value) {
  for (var i = 0, n = filters.length; i < n; i++) {
    value = filters[i](value);
  }
  return value;
};

We just pass the initial value to the first function, and use the return value as the starting point for the next iteration.

You’re probably wondering what practical problem this relates to and hoping against hope that I won’t mention monads (don’t worry). I hinted at it with the name of the above function. Filtering systems pop up in web stacks all the time: Rails’ before_filter and Rack middleware being two obvious examples. In particular both of these have the property that any function in the filter chain can block the rest of the chain: a before_filter can return false to block access to the underlying controller action, and Rack middleware can decide whether it wants to respond to a request or delegate it down the stack.

But both these examples are synchronous and easily implemented using composition, but I’ve had a couple of problems recently that required asynchronous filters: each filter can hold the chain up indefinitely while some async action is run, and the filter may resume the chain at any time using a callback.

The main example I have is the Faye extension system, which allows the user to modify or replace messages as they pass in and out of the server. Each extension method accepts a message and must callback with a message once it’s done with its modifications and filters. I’m going to present a slightly modified and hopefully more generically useful API for this, or at least one that’s more in keeping with the style I’ve used in this series.

server.addExtension('incoming', function(message, callback) {
  someAsyncAuthCall(message, function(allowed) {
    if (allowed) callback(message);
    else callback(null);
  });
});

This simple extension does something asynchronous to figure out whether the client is allowed to send that message, and if it’s not then the message is replaced with null to stop it propagating any further. Any number of extensions can be added to the server and each message is piped through them before reaching the core server code. The server calls the extension internally like this:

Faye.Server.prototype.process = function(message, callback, scope) {
  // various setup steps...
  this.pipeThroughExtensions('incoming', message, function(message) {
    // handle message after extensions have run
  }, this);
};

This just passes the message through the incoming filters, then picks the message up at the other end of the pipeline using an inline callback. These two methods, addExtension() and pipeThroughExtensions() are provided in Faye by a mixin called Extensible. The addExtension() method simply needs to store the extension in a list:

Extensible.addExtension = function(type, handler, scope) {
  this._extensions = this._extensions || {};
  var list = this._extensions[type] = this._extensions[type] || [];
  list.push([handler, scope]);
};

pipeThroughExtensions() is a little more complex. Internally it uses a simple function whose job it is to process a single extension from a list. If we’ve reached the end of the chain, we can run the waiting callback. Otherwise, we call the next extension and pass it the pipe function so the extension can resume the chain when it’s done.

Extensible.pipeThroughExtensions = function(type, input, callback, scope) {
  if (!this._extensions) return callback.call(scope, input);
  
  if (!this._extensions.hasOwnProperty(type))
    return callback.call(scope, input);
  
  var list = this._extensions[type].slice();
  
  var pipe = function(data) {
    var extension = list.shift();
    if (!extension) return callback.call(scope, data);
    extension[0].call(extension[1], data, pipe);
  };
  pipe(input);
};

That call to extension[0].call(extension[1], data, pipe) calls the extension with the current value of the input, passing in the continuation function. When the extension calls its callback, we process the next extension in the list.

This isn’t a pattern I’ve used an awful lot, but it solved a problem on Songkick really nicely last week. We have various buttons on the site that let users start tracking things, for example to start following an artist or say they’re going to a concert. These buttons typically are forms that submit using Ajax. For some of these buttons we just added the ability to auto-publish the concert to your Facebook profile. To do this our server-side code needs a cookie to be set that gives us permission to post to Facebook on that user’s behalf. So before the form is submitted, we need this cookie to be set, which requires a lot of async calls the the Facebook JavaScript SDK to log the user in and ask for publishing permissions.

The initial approach was to try to intercept the DOM events bound to the form submission and insert more logic for certain types of tracking button, but using the above model it was much simpler (again, this has been simplified to illustrate a point):

Trackings = { /* ... */ };
$.extend(Trackings, Extensible);

$('form.tracking').bind('submit', function() {
  var form = $(this);
  Trackings.pipeThroughExtensions('beforesubmit', form, function() {
    form.ajaxSubmit();
  });
  return false;
});

Trackings.addExtension('beforesubmit', function(form, resume) {
  if (!form.hasClass('im-going')) return resume(form);
  FB.login(function(session) {
    // more login and permission logic to set the cookie
    resume(form);
  });
});

This keeps the DOM bindings simple without shoving a lot of business logic into the initial event handler, but allows quite powerful modifications. For example in some contexts you may decide you need to block a submission: just don’t call resume() and you’ll block the filter chain.

One caveat: this pattern needs a better name: Extensible is far too vague and non-descriptive. This is basically the Observable module with the modification that the event listeners are allowed to have side effects on the event publisher. In Faye, Extensible is used in classes that support the extension API, but I’d recommend choosing something more domain-specific depending on how you need to tweak it.

Evented programming patterns: Object lifecycle

This post is part of a series on event-driven programming. The complete series is:

Earlier in this series I covered a very common pattern in event-driven programming: the Observable object. This technique lets one object notify many others when interesting things happen. JavaScript developers will be very familiar with this: it’s the same pattern that underlies the DOM event model.

I while ago I rewrote the JS.Class package loader and noticed a variation of this pattern emerge, which I’m going to call the object lifecycle. The typical use case is when some part of your code needs to execute once, as soon as some condition becomes true. In the package loader, this looks something like:

thePackage.when('loaded', function() {
  // Run code that relies on thePackage
});

This says: if thePackage is already loaded, then run this callback immediately. Otherwise, wait until thePackage is loaded and then run the callback. The implication is that the package will become loaded, only once, at some point in its life, and as soon as that happens we want to be notified. (I tend to use when for one-shot lifecycle events, and on for multi-fire events.) The implementation is quite similar to the Observable pattern, so you might want to revisit that before reading on.

Obviously, our lifecycle object is going to need to store lists of callbacks, indexed by event name as before. But in this case, if we know the event has already been triggered on that object, we can run the callback immediately and forget about it. When we trigger events, we also want to remove all the old pending callbacks after running them, since they don’t need to be called again.

LifeCycle = {
  when: function(eventType, listener, scope) {
    this._firedEvents = this._firedEvents || {};
    if (this._firedEvents.hasOwnProperty(eventType))
      return listener.call(scope);

    this._listeners = this._listeners || {};
    var list = this._listeners[eventType] = this._listeners[eventType] || [];
    list.push([listener, scope]);
  },
  
  trigger: function(eventType) {
    this._firedEvents = this._firedEvents || {};
    
    if (this._firedEvents.hasOwnProperty(eventType)) return false;
    this._firedEvents[eventName] = true;
    
    if (!this._listeners) return true;
    var list = this._listeners[eventType];
    if (!list) return true;
    list.forEach(function(listener) {
      listener[0].apply(listener[1], args);
    });
    delete this._listeners[eventType];
    return true;
  }
};

Note how the trigger() method checks to see if the event has already been fired: we don’t want the same stage in the lifecycle to be triggered multiple times. It also removes the listeners from the object after calling them, and returns true or false to indicate whether the event fired. This makes it easy to tell whether some action that should only be done once has already happened; for example in my package system I do something like this:

JS.Package = new JS.Class({
  include: LifeCycle,
  
  // various methods
  
  load: function() {
    if (!this.trigger('request')) return;
    // perform download logic...
  }
});

This kills two birds with one stone: it lets other listeners know that the package has been requested and checks whether it’s already been requested, so we don’t try to download the package multiple times.

Naturally, one thing a package system has to deal with is dependencies. Dependencies are just prerequisites: you can’t load a package until all its dependencies are loaded. More precisely, a package is loaded once the browser has downloaded its source code, and it is complete once it is loaded and all its dependencies are complete. To fill in some more of the load() method, this is easily expressed as:

JS.Package.prototype.load = function() {
  if (!this.trigger('request')) return;
  
  when({complete: this._dependencies, load: [this]}, function() {
    this.trigger('complete');
  }, this);

  when({loaded: this._dependencies}, function() {
    loadFile(this._path, function() { this.trigger('load') }, this);
  }, this);
};

This reads quite naturally: if the package has already been requested, do nothing. When the dependencies are complete and this package is loaded, then this package is complete. When the dependencies are loaded, load this package and then trigger its load event. Note how the load event will trigger a complete event if there are no dependencies, and this will ripple down the tree and trigger dependent packages to load.

I’ve used when() above to express groups of prerequisites in a natural way, but we don’t have an implementation for that yet – we only have the when() method for individual objects. So let’s write one. This when() function will need to gather up the list of preconditions, keep a tally of how many have triggered, and when they’re all done we can fire our callback. The first step in the function converts preconditions, which maps event names to lists of objects, into a simple list of object-event pairs. That is it turns {complete: [foo, bar], load: [this]} into [[foo, 'complete'], [bar, 'complete'], [this, 'load']].

var when = function(preconditions, listener, scope) {
  var eventList = [];
  for (var eventType in preconditions) {
    for (var i = 0, n = preconditions[eventType].length; i < n; i++) {
      var object = preconditions[eventType][i];
      eventList.push([object, eventType]);
    }
  }
  
  var pending = eventList.length;
  if (pending === 0) return listener.call(scope);
  
  for (var i = 0, n = pending; i < n; i++) {
    eventList[i][0].when(eventList[i][1], function() {
      pending -= 1;
      if (pending === 0) listener.call(scope);
    });
  }
};

If there are no pending events, we can just call the listener immediately. Otherwise, we set up listeners for all the events, and when each one fires (and remember: some of them may have fired already) we count down how many events we’re waiting for. When this reaches zero, we can carry on with the work we wanted to do.

This pattern is essentially a cross between Observable and Deferrable: we’re deferring an action, but the deferred items – the events – aren’t complex enough to merit their own objects so the implementation is closer to an observable object. The technique lends itself really well to expressing prerequisites in a natural way, even if the work you’re doing is not asynchronous.

I’ll have a couple more articles on event-driven programming next week, and you can catch me speaking at the London Ajax User Group on August 10th where I’ll be talking about Faye, event-driven code and testing.