Faye 0.6: it’s all about clustering

After a longer-than-I-would-have-liked gestation period that began around six months ago, Faye 0.6 is finally out. The major feature in this release is the introduction of ‘engines’, a mechanism that gives us a great deal of freedom about how the back-end is implemented, and lets us do useful things like clustering the HTTP front-end.

In previous releases, the Bayeux protocol and the pub/sub logic backing it up were part of the same class, Faye.Server. All the application state – clients and their subscriptions – was stored in memory by this class. In the new release, the protocol layer and the core pub/sub logic have been separated – the protocol still lives in Faye.Server but the messaging logic has been factored out into several classes called ‘engines’. This means we can change how the message distribution layer is implemented without affecting the protocol.

The first big advantage of this is that we can write engines that store state in an external service, meaning we can distribute a single Faye service across many web servers if connection volume becomes a problem. The new Redis engine does just that:

var bayeux = new Faye.NodeAdapter({
  mount:   '/bayeux',
  timeout: 60,
  engine:  {
    type: 'redis',
    host: 'localhost',
    port: 6379
  }
});

You can start up a cluster of Faye web servers that all connect to the same Redis database, and clients can connect to any one of these web servers to interact with the service. The default engine is an in-memory one, so your existing code will continue to store state in memory. The in-memory engine has lower latency because it doesn’t perform external I/O, but obviously it cannot be clustered.

This engine system also opens up the possibility to turn Faye into a web front-end for whatever messaging technology you use internally. Because all the protocol validation is handled higher up the stack, the engines themselves are very small classes. Here are the in-memory and Redis engines, and the unit tests that are run against both:

So you can see it’s not a lot of work to implement a new engine and register it. Once you’ve called

Faye.Engine.register('my-engine', MyEngine)

you can use it by setting engine.type when creating your NodeAdapter.

The only API change resulting from this is that it’s no longer possible to publish on wildcard channels like /foo/*. Other Bayeux servers prohibit this already but previous versions of Faye allowed it. In order to keep the engines simple and fast I’ve decided to remove this feature. Subscribing to wildcard channels is still supported.

The other major change in this release comes from Matthijs Langenberg, who’s added a CORS transport to the client. This will now be used instead of the JSON-P transport for cross-domain communication in browsers that do not support WebSockets. It will particularly help in Firefox, which shipped without WebSocket in version 4.0 and guarantees execution order of injected scripts. This meant the long-polling JSON-P request from Faye would block calls made to other APIs like Google Maps, making UIs unresponsive. This should now be a thing of the past.

Finally I just want to thank Martyn Loughran, whose em-hiredis powers the Ruby Redis engine in Faye. Martyn works on Pusher, a really polished hosted push service that uses Redis internally, and it’s really nice of them to open-source their client for others to use. I tried three other Redis clients in Ruby and none of them do pub/sub as nicely as em-hiredis.

As usual, install through gem or npm and hop on the mailing list if you have any problems. I’m on holiday at the moment but I’ll get onto your feedback when I’m home next week.

36 laptops = 1 musical instrument

Video by Scott Schiller

Not a lot of people know this, but the first thing I ever built with Faye was at the first Music Hack Day in London back in 2009. It was also the first time I’d demo’d at a hack day, and needless to say some combination of Faye’s immaturity, a hasty deployment and flaky wi-fi meant the demo failed. The demo was called Outcast, and let you push what you were listening to in iTunes to other machines over HTTP. It was a bit like iTunes library sharing, with a push instead of a pull model, and over the Internet rather than just LAN. You can still get the code, though I doubt it works.

Since then I’ve had an impressive track record of failed demos so for the recent San Francisco MHD (thank you to my lovely employers at Songkick for sending me – we’re hiring, don’t you know) I thought I’d embrace failure and try to use as many risky technologies as possible. Against all the odds the result is what you see in the video above: 36 laptops playing a buzzy ambient version of ‘Happy Birthday’ for MHD organiser Dave Haynes.

The cocktail of technologies included the venue’s wi-fi, Faye, the Mozilla audio API, and 36 audience members. (I wanted to throw Fargo in there but I think the performance overhead would have killed the sound generation completely.) I’m not open-sourcing the code for the time being, but there’s not an awful lot of it and I thought it’d be fun to explain what I learned over the weekend. I think mass remote control is an interesting application of WebSocket-based messaging and I’d like to see more experiments like this.

So, where to start? Like the hack itself this is going to be a bit of brain-dump but hopefully it more-or-less hangs together. The basic functionality of the app is that it’s a free-form synth. It’s not a sequencer, doesn’t do any recording/looping, just lets you make tones and play them. To drive the app we need a clock that runs all the tones we’ve created. It doesn’t need to be very fancy: since we’re not building a sequencer we don’t need explicit beats and bars. A simple timer loop that iterates on all the registered tones and tells them to emit a segment of audio will suffice.

Clock = {
  SAMPLE_RATE:    44100,
  TICK_INTERVAL:  10,
  
  // Storage for all the tones currently in use
  _tones: [],
  
  // Register tones with the application clock
  bind: function(tone) {
    this._tones.push(tone);
  },
  
  // Main timer loop, tells all the tones to emit a slice
  // of audio using the current time and timer interval
  start: function() {
    var self     = this,
        interval = this.TICK_INTERVAL,
        rate     = this.SAMPLE_RATE,
        time     = new Date().getTime();
    
    setInterval(function() {
      var tones = self._tones,
          i     = tones.length;
      
      time += interval;
      while (i--)
        tones[i].outputTimeSlice(time / 1000, interval / 1000, rate);
        
    }, interval);
  }
};

Clock.start();

The SAMPLE_RATE describes how we generate audio data. Running a synth basically means calculating a lot of data points from sound waves, and to do this we need to know the unit of time between data points that the audio output device expects. The Mozilla audio API uses the common sample rate of 44.1kHz, meaning we need to generate 44,100 data points for every sound wave per second.

In the main timer loop we tell each tone in the system to emit some audio using outputTimeSlice(). This method will take the current time and an interval (both in seconds) and produce sound wave data over the given time slice.

The tone objects are responsible for modelling the tones we’re playing and generating their audio. A tone is produced by a set of parameters that describe the sound’s properties – its frequency, amplitude, waveform (sine, sawtooth, or square wave) as well as modulations like amplitude and frequency modulation. To emit sound I used Ben Firshman’s DynamicAudio, which is a thin wrapper around the Mozilla audio API that provides a Flash-based shim in other browsers.

Tone = function(options) {
  this.generateWave(options);
  this._audio = new DynamicAudio({swf: '/path/to/dynamicaudio.swf'});
};

Tone.prototype.outputTimeSlice = function(time, interval, rate) {
  var samples = Math.ceil(interval * rate),
      dt      = 1 / rate,
      data    = [];
  
  for (var i = 0; i < samples; i++) {
    var value = this.valueAt(time + i * dt);
    data.push(value); // left channel
    data.push(value); // right channel
  }
  this._audio.write(data);
};

Here we see the outputTimeSlice() method. Using the SAMPLE_RATE, it figures out how many data points we need to generate in the given time interval, and then builds an array of values by stepping through the time interval and calculating the value of the sound wave at each step. At each time step we need to provide values for both the left and right channel, so in this case I just push each value twice to produce evenly balanced output. Once we’ve built the array, we call DynamicAudio#write() which takes all those numbers and turns them into sound for us.

The next step down the stack is the Tone#valueAt(time) method, which calculates the wave value for the tone at any point in time. A tone is is formed by combining a set of waves: there’s the basic wave that produces the note you’re playing, and then there may be modulations of amplitude or frequency applied, which themselves are described by waves. We need a model for waves:

Wave = function(options) {
  options = options || {};
  this.frequency = options.frequency || 1;
  this.amplitude = options.amplitude || 0;
  this.waveform  = options.waveform  || 'sine';
};

A wave has three properties: its frequency in Hz, its amplitude between 0 and 1, and a waveform, which describes the shape of the wave: sine, square or sawtooth. Let’s make some functions to represent all the waveforms; each takes a value in the range 0 to 1 that represents how far through a cycle we are, and returns a value between -1 and 1.

Wave.sine = function(x) {
  return Math.sin(2*Math.PI * x);
};

Wave.square = function(x) {
  return x < 0.5 ? 1 : -1;
};

Wave.sawtooth = function(x) {
  return -1 + x*2;
};

With these defined we can add a valueAt(time) method to Wave so we can calculate how it changes as our clock ticks.

Wave.prototype.valueAt = function(time) {
  var form = Wave[this.waveform],     // a function
      x    = this.frequency * time,   // a number
      y    = x - Math.floor(x),       // a number in [0,1]
      A    = this.amplitude;
  
  return A * form(y);
};

Now that we can model waves we can go back and fill in the generateWave() and valueAt() methods on Tone. We’ll construct a tone like this:

new Tone({
  note:       'C',
  octave:     3,
  amplitude:  0.8,
  waveform:   'square',
  am: {
    amplitude: 0.3,
    frequency: 2,
    waveform:  'sine'
  }
})

This data describes the basic note we want to play, its amplitude and waveform, and a wave to use as amplitude modulation (the am field). We turn this data into a couple of Waves, one to represent the base note and one to represent the AM wave:

Tone.prototype.generateWave = function(options) {
  this._note = new Wave({
    amplitude: options.amplitude,
    waveform:  options.waveform,
    frequency: Wave.noteFrequency(options.note, options.octave)
  });
  this._am = new Wave(options.am);
};

We convert the note and octave into a frequency using a little bit of maths. The details aren’t that important, what matters is that we can manipulate sound in terms of musical notes rather than frequencies.

Wave.noteFrequency = function(note, octave) {
  if (octave === undefined) octave = 4;
  
  var semitones = Wave.NOTES[note],
      frequency = Wave.MIDDLE_C * Math.pow(2, semitones/12),
      shift     = octave - 4;
  
  return frequency * Math.pow(2, shift);
};

Wave.MIDDLE_C = 261.626; // Hz

Wave.NOTES = {
  'C' : 0,
  'C#': 1,  'Db': 1,
  'D' : 2,
  'D#': 3,  'Eb': 3,
  'E' : 4,
  'F' : 5,
  'F#': 6,  'Gb': 6,
  'G' : 7,
  'G#': 8,  'Ab': 8,
  'A' : 9,
  'A#': 10, 'Bb': 10,
  'B' : 11
};

The Tone#valueAt(time) method simply composes the note and the amplitude modulation to produce the final sound:

Tone.prototype.valueAt = function(time) {
  var base = this._note.valueAt(time),
      am   = this._am.valueAt(time);
  
  return base * (1 + am);
};

So that’s all the sound generation taken care of, and we now need a way to interact with the tones and change them. For example you could implement a way to change the note of a tone:

Tone.prototype.setNote = function(note, octave) {
  this._note.frequency = Wave.noteFrequency(note, octave);
};

Similarly you can add interfaces for changing any of the parameters of the tone: changing its amplitude or waveform, adjusting the frequency of the amplitude modulation, adding further modulators, etc. After I’d implemented the methods I needed to manipulate the tones, I hacked together a GUI using jQuery UI:

Photo by Martyn Davies

To play to the audience I set up a bunch of initial tones that produced a buzzing drone-like sound – these are the tones you see in the photo above. The UI had controls for changing the note’s waveform and amplitude, and changing the amplitude and frequency of the AM and FM modulations. The final control that would let me play the tune is a set of keybindings that mapped letters on the keyboard to musical notes. I mapped the home row (Z, X, C, V, B, N, M) to the notes G, A, B, C, D, E and F. But this control has a twist: I want to change the note on everyone else’s laptop, so I need to send note change instructions to them. To achieve this I gave each tone a name and stored them in an object called TONES, and used Faye to send the tone name and the note to switch to over the wire:

KEY_BINDINGS = {
  90: ['G', 3],
  88: ['A', 3],
  67: ['B', 3],
  86: ['C', 4],
  66: ['D', 4],
  78: ['E', 4],
  77: ['F', 4]
};

$(document).keydown(function(event) {
  var note = KEY_BINDINGS[event.keyCode];
  faye.publish('/notes', {
    name:   'melody',
    note:   note[0],
    octave: note[1]
  });
});

faye.subscribe('/notes', function(message) {
  var tone   = TONES[message.name],
      note   = message.note,
      octave = message.octave;
  
  tone.setNote(note, octave);
});

And that pretty much covers it. If you want to dig into audio generation, I highly recommend the DynamicAudio library, as well as this LRUG talk by Chris Lowis on audio generation and analysis in JavaScript.

Fargo: a Lisp for asynchronous programming

A couple weeks ago, I tooted:

CoffeeScript ought to take this opportunity to add new features that help with async e.g. tail calls, continuations.

Now there’s a problem with this: it’s a core part of CoffeeScript’s usability and resultant popularity that it compiles to readable JavaScript. If you want to introduce new execution semantics, a straightforward one-to-one syntax transformation is not such a viable option.

But I maintain that, given we’re reinventing the language we work with on the web, now’s as good a time as any to reexamine the tools we use to write programs. We’re all annoyed by working with callbacks, and with good reason. Programming with explicit callbacks introduces race conditions and uncertainty; it’s not the syntactic burden of typing function that causes the real pain, it’s the creation of regions of your program whose order of execution is unpredictable.

In the Ruby world, we’ve been using fibers to great effect to hide the asynchronous nature of code. A fiber is just a special kind of single-use function, whose execution can be suspended and resumed by the user. They are perfect for pausing a block of code while some I/O is done, but they do not block the interpreter: when you pause a fiber the interpreter can carry on doing other work until the fiber is resumed. Ilya’s first example is perfect:

def http_get(url)
  f = Fiber.current
  http = EventMachine::HttpRequest.new(url).get
 
  # resume fiber once http call is done
  http.callback { f.resume(http) }
  http.errback  { f.resume(http) }
 
  return Fiber.yield
end

Calling http_get() initiates an async HTTP request, and just before returning it ‘yields’, i.e. it pauses the current fiber. When the request completes, we resume the fiber with the response. A call to http_get() looks like synchronous code, but behind the scenes the I/O is being done in a way that doesn’t block the interpreter.

I thought it would be fun to try this out in JavaScript, so I hacked up a very basic Scheme-like interpreter to support fibers, and called it Fargo. Scheme usually includes a function called call-with-current-continuation or call/cc, of which fibers are a simplified version. Call/cc allows the current state of a computation (the “continuation”) to be saved and resumed any number of times. This means, at least in a naive implementation, that the saved state of the stack must be copied every time you invoke a continuation, otherwise state would leak between uses. Fibers can only be resumed once from their last yield-point, making running and resuming them cheaper.

Fibers as implemented in Ruby also require the user to explicitly initiate a fiber. Supporting call/cc requires constantly keeping track of the current continuation since it may be captured at any time. Because fibers must be explicitly initiated, when not in a fiber we can use a faster stackless execution engine which doesn’t need to track state so much.

Here’s an example in Fargo of getting a value that’s produced by an async API (set-timeout) without using callbacks in the calling code:

; get-message function: captures the current fiber, yields,
; then resumes after a timeout when we have a message to return
(define (get-message)
  (define f (current-fiber))
  (set-timeout (lambda ()
    (f "Hello, world!")
  ) 5000)
  (yield))

; main program fiber
(define main (fiber ()
  ; get the message using a sync-looking API
  (define msg (get-message))
  ; alert the message
  (alert msg)
))

; run the main fiber
(main)

Because it’s tail-recursive, you can use recursion to implement infinite streams:

> (define strea­m (fibe­r ()
    (let loop ((i 0))
      (yiel­d i)
      (loop­ (+ i 1))))­)

> (stream)
; => 0
> (stream)
; => 1
> (stream)
; => 2

So where is Fargo going? Right now, with my current ban on committing myself to any new large side-projects, it remains a quick (12 days old) experiment. Maybe someone will like the idea enough to turn it into a production language but for now I’m happy to let people try it out and let me know what they think. I’ve been making small improvements to it the last few days to get it to run as much of Heist‘s core library as possible and it now runs all the macros and the list and numeric libraries.

I’m going to keep thinking about how we can make async programming easier and I’d love your ideas – the nice thing about working on a Lisp is that changing the interface it provides is really simple.

Promises are the monad of asynchronous programming

In my introduction to monads in JavaScript we saw a couple of monads and examined the commonality between them to expose an underlying design pattern. Before I get on to how that applies to asynchronous programming we need to take one final diversion and discuss polymorphism.

Consider the list monad that we implemented before:

var compose = function(f, g) {
  return function(x) { return f(g(x)) };
};

// unit :: a -> [a]
var unit = function(x) { return [x] };

// bind :: (a -> [a]) -> ([a] -> [a])
var bind = function(f) {
  return function(list) {
    var output = [];
    for (var i = 0, n = list.length; i < n; i++) {
      output = output.concat(f(list[i]));
    }
    return output;
  };
};

In our previous example, we just had functions that accepted an HTMLElement and returned a list of HTMLElements. Notice the type signature of bind above: (a -> [a]) -> ([a] -> [a]). That ‘a‘ just means we can put any type in its place, but the signature a -> [a] implies that the functions must return a list of things of the same type as the input. This is actually not the case, and the correct signature is:

bind :: (a -> [b]) -> ([a] -> [b])

For example, bind may take a function that maps a single String to a list of HTMLElements, and return a function that maps a list of Strings to a list of HTMLElements. Consider these two functions: the first takes a string and returns a list of nodes with that tag name, and the second takes a node and returns a list of all the class names it has.

// byTagName :: String -> [HTMLElement]
var byTagName = function(name) {
  var nodes = document.getElementsByTagName(name);
  return Array.prototype.slice.call(nodes);
};

// classNames :: HTMLElement -> [String]
var classNames = function(node) {
  return node.className.split(/\s+/);
};

If we ignore the ‘returns a list of’ aspect, we’d expect to be able to compose these to get all the class names of all the links in a document:

var classNamesByTag = compose(classNames, byTagName);

Of course, we do have lists to contend with, but because the monad just cares about the lists, not what’s in the lists, we can use it to compose the functions:

// classNamesByTag :: [String] -> [String]
var classNamesByTag = compose(bind(classNames), bind(byTagName));

classNamesByTag(unit('a'))
// -> ['profile-links', 'signout-button', ...]

The monad just cares about the ‘list of’ part, not the contents of the list. Just as a reminder, let’s recast this using the pipeline syntax we developed in the previous article:

// bind :: [a] -> (a -> [b]) -> [b]
var bind = function(list, f) {
  var result = [];
  for (var i = 0, n = list.length; i < n; i++) {
    result = result.concat(f(list[i]));
  }
  return result;
};

// pipe :: [a] -> [a -> [b]] -> [b]
var pipe = function(x, functions) {
  for (var i = 0, n = functions.length; i < n; i++) {
    x = bind(x, functions[i]);
  }
  return x;
};

// for example
pipe(unit('a'), [byTagName, classNames])
// -> ['profile-links', 'signout-button', ...]

The pipe function doesn’t actually care that bind deals with lists, and we can write its signature in a more generic way:

pipe :: m a -> [a -> m b] -> m b

In this notation, m a means ‘a monadic wrapper around a‘. For example, if the input is a list of strings, under the List monad the m refers to ‘list of’, and a refers to ‘string’. This concept of generic containers becomes extremely important when we consider how we can apply these ideas to asynchronous programming.

One common complaint levelled against Node.js is that it is easy to become mired in ‘callback hell’, where callbacks become nested so deeply it’s impossible to maintain the resulting code. There are various ways to solve this, but I think monads provide an interesting approach that highlights the need to separate business logic from code that simply glues data together.

Let’s consider a fairly contrived example: we have a file called urls.json, that contains a JSON document that contains a URL. We want to read the file, extract this URL, request the URL from the web, and print the response body. Monads encourage us to think in terms of types of data, and how they flow through a pipeline. We can model this problem using our pipeline syntax:

pipe(unit(__dirname + '/urls.json'),
        [ readFile,
          getUrl,
          httpGet,
          responseBody,
          print         ]);

Beginning with the pathname (a String), we can trace the data as it flows through this pipe:

  • readFile takes a String (pathname) and returns a String (file contents)
  • getUrl takes a String (a JSON document) and returns a URI object
  • httpGet takes a URI and returns a Response
  • responseBody takes a Response and returns a String
  • print takes a String and returns nothing

So each link in the chain produces a data type that is consumed by the next. But in Node, many of these operations are asynchronous. Rather than use continuation-passing style and deeply nested callbacks, how can we modify the return types of these functions to indicate the result might not be known yet? By wrapping them in Promises:

readFile      :: String   -> Promise String
getUrl        :: String   -> Promise URI
httpGet       :: URI      -> Promise Response
responseBody  :: Response -> Promise String
print         :: String   -> Promise null

The Promise monad needs three things: a wrapper object, a unit function to wrap a value in this object, and a bind function to help us compose the above functions. We can implement promises using the Deferrable module from JS.Class:

var Promise = new JS.Class({
  include: JS.Deferrable,
  
  initialize: function(value) {
    // if value is already known, succeed immediately
    if (value !== undefined) this.succeed(value);
  }
});

With this class, we can then implement the functions we need to solve our problem. The functions that can return a value immediately just return a value wrapped in a Promise object:

// readFile :: String -> Promise String
var readFile = function(path) {
  var promise = new Promise();
  fs.readFile(path, function(err, content) {
    promise.succeed(content);
  });
  return promise;
};

// getUrl :: String -> Promise URI
var getUrl = function(json) {
  var uri = url.parse(JSON.parse(json).url);
  return new Promise(uri);
};

// httpGet :: URI -> Promise Response
var httpGet = function(uri) {
  var client  = http.createClient(80, uri.hostname),
      request = client.request('GET', uri.pathname, {'Host': uri.hostname}),
      promise = new Promise();
  
  request.addListener('response', function(response) {
    promise.succeed(response);
  });
  request.end();
  return promise;
};

// responseBody :: Response -> Promise String
var responseBody = function(response) {
  var promise = new Promise(),
      body    = '';
  
  response.addListener('data', function(c) { body += c });
  response.addListener('end', function() {
    promise.succeed(body);
  });
  return promise;
};

// print :: String -> Promise null
var print = function(string) {
  return new Promise(sys.puts(string));
};

So we’ve now got a way to represent a deferred value simply using data types, no nested callbacks required. The next step is to implement unit and bind in such a way that the callback glue is contained within these functions. unit is very simple, it just needs to wrap a value in a Promise. bind is more complex: it accepts a Promise and a function. It must extract the value of the Promise, give this value to the function, which returns another Promise, then wait for this final Promise to complete.

// unit :: a -> Promise a
var unit = function(x) {
  return new Promise(x);
};

// bind :: Promise a -> (a -> Promise b) -> Promise b
var bind = function(input, f) {
  var output = new Promise();
  input.callback(function(x) {
    f(x).callback(function(y) {
      output.succeed(y);
    });
  });
  return output;
};

With these definitions, you should find that the pipeline shown above just works, for example if I’ve placed my GitHub API URL in the urls.json file:

pipe(unit(__dirname + '/urls.json'),
        [ readFile,
          getUrl,
          httpGet,
          responseBody,
          print         ]);

// prints:
// {"user":{"name":"James Coglan","location":"London, UK", ...

I hope this has shown why Promises (also known as Deferreds in jQuery and Dojo) are valuable, how they can help you pipe data through your code, and how this piping glue can be contained in quite a neat way. On the client side, you’ll find this pattern applies equally well to Ajax and animation; in fact MethodChain (which I’ve been using as async glue for years) is just the Promise monad applied to method calls instead of function calls.

The full working code for this article is on GitHub if you’d like to tinker with it. After that, you can dive into jQuery’s Deferred API. There’s also talk of promises making a return to Node, after they were banished in favour of continuation-passing style, so you may find you can do this without writing so much plumbing in future.

Primer: the cache that knows too much

Every so often at Songkick we have what we call innovation days – a day or two where everyone can work on whatever they like to make our product, technology or workplace better. We did one this last Friday, and I made a start on something I’ve been wanting to do for a long time: it’s called Primer.

This year a couple of development platforms have sprung up that make doing real-time applications considerably easier: LunaScript and Fun. They both aim to let you write a web app by writing your templates in the usual way, but then they deal with updating the client and synching state for you. From the Fun intro:

When you say "Hello " user.name, you don’t mean just “render the value of user.name right now”. Rather, you mean “display the value of user.name here, and any time user.name changes update the UI.

Fun will update your UI in real-time, keystroke by keystroke. I can’t do that yet, but I can do something similar: automatically update caches when your ActiveRecord model changes. Consider:

<% primer '/users/1/name' do %>
  <p><%= @user.full_name %></p>
<% end %>

This is just wrapping part of a template in a caching block, much like Rails fragment caching. Now, surely your development tools ought to be able to figure out that the cache key /users/1/name depends on the full_name attribute of @user, right? It says so right there. So this is what Primer does: it caches your HTML fragments and figures out which bits of your database they depend on. When your database changes, it updates the cache. No sweeper code, no combing through call stacks, no dealing with message buses.

Now it turns out that using paths as cache keys has two really nice side effects. Firstly, you can use a declarative routing scheme to tell Primer how to calculate values:

Primer.cache.routes do
  get '/users/:id/name' do
    user = User.find_by_id(params[:id])
    user.full_name
  end
end

This means that, by moving some rendering logic out of your templates, you can tell Primer how to regenerate cache keys, so it can update the cache with new data instead of just invalidating it. This can be useful for long-tail pages that don’t get much user traffic but need to be fast for the Googlebots.

Secondly, it means that all your cache keys are valid Faye channel names, and can be used to update the client in real time. If you put this in your view:

<%= primer '/users/1/name' %>

Primer will use the '/users/:id/name' route to generate the value, cache it, and also will update any clients viewing the page when that cache slot is updated. You don’t need to write any of the network code for this, it’s all done for you, so that template fragment gets you real-time updates for free.

Right now it’s very much a proof-of-concept, just to show that this sort of thing is possible. It needs to store a lot of data about your model to do its job, and needs to perform a lot of ugly reflection on ActiveRecord to propagate changes correctly. It still probably screws up a lot of edge cases, most glaringly it can’t figure out when a has_and_belongs_to_many collection changes. It can figure out a lot of common has_many/belongs_to/has_many :through relationships. I have taken some steps to ease the load by making it really easy to create background workers to update your caches, but it’s definitely not for production.

I’ve rambled on enough, so if you want to poke at a half-baked week-old project I’d really like feedback on whether this can be turned into a viable bit of production software. Just gem install primer or get it on GitHub.

Terminus: a client-side Capybara driver

Last week the Capybara project released version 0.4. Since the 0.3.9 release, which added support for third-party drivers, I’ve been working on turning Terminus into fully compatible driver for it. It’s still an experiment but it’s in the sort of semi-useful state that means I’m okay throwing it out to see if anyone’s interested in it.

So what is it? Terminus is a Capybara driver where most of the driver functions are implemented in client-side JavaScript. It lets you script any browser on any machine using the Capybara API, without any browser plugins or extensions. ‘Any browser’ really means ‘any browser that supports the document.evaluate() XPath API’ for now, so you can’t use IE, but the ‘any machine’ part is true: any browser that can see your development machine can be controlled using Terminus, including any phone, iPad or desktop machine on your local network.

This is very much still an experiment. Because every little Capybara call has to go over the network (it uses Faye to send commands to browsers) it’s very slow, and the connection sometimes flakes out. Under ideal circumstances though it does pass most of the Capybara test suite. It supports JavaScript, cookies and window switching (assuming the connected browser has these features enabled). The major omissions are:

  • attach_file does not work since file uploads cannot be controlled by JavaScript for security reasons.
  • A few selectors and HTML5 features in the specs don’t work due to browser inconsistencies.
  • Redirects work but infinite redirection cannot be detected.

The original intention was for Terminus to be a tool for automating cross-browser testing, so the lack of IE support is kind of a problem. I actually went some way towards fixing this; I wrote a pretty stupid document.evaluate() replacement called Pathology, and to support that a PEG parser compiler called Canopy so I could write a declarative XPath grammar. Unfortunately these don’t yet perform well enough to support the workload that Capybara throws at a browser.

I also want to add an API for switching browsers based on vendor and OS versions, but this release is just enough to support the required Capybara API.

You can find out more on the project’s GitHub page, and installation is the usual gem install terminus command – I’d love if a few of you could kick its tyres a bit and figure out if it’s actually useful for anything.

Improving the observer pattern with subscription objects

Way back in February I wrote a series of articles on event-driven programming; the pattern I find myself using the most out of all those I covered is the observer pattern: an object that collects callback functions in order to notify other parts of a system when interesting things happen. The basic pattern looks something like this:

Observable = {
  bind: function(eventType, listener, context) {
    this._listeners = this._listeners || {};
    var list = this._listeners[eventType] = this._listeners[eventType] || [];
    list.push([listener, context]);
  },

  trigger: function(eventType, args) {
    if (!this._listeners) return;
    var list = this._listeners[eventType];
    if (!list) return;
    list.forEach(function(listener) {
      listener[0].apply(listener[1], args);
    });
  }
};

This is pretty simple: we can add callbacks to a list, and call them all when something happens. But what if we want to remove a listener? The approach taken by many DOM interfaces is to provide essentially the opposite of the bind() method:

Observable.unbind = function(eventType, listener, context) {
  if (!this._listeners) return;
  var list = this._listeners[eventType];
  if (!list) return;
  
  var i = list.length;
  while (i--) {
    if (list[i][0] === listener && list[i][1] === context)
      list.splice(i, 1);
  }
};

This just takes the original event type and listener function that we gave to bind(), and removes the listener from that event type’s list of listeners. Internally, that’s exactly what we want to achieve, but the API is hard to use. It reeks of something inspired by Java, where listeners are objects and easier to keep references to. Idiomatic JavaScript rests heavily on anonymous functions, and being forced keep references to a function and a context object just so you can remove a listener often feels clunky. It would be nicer if the bind() API returned something to represent the subscription so we could cancel it later:

var subscription = object.bind('someEvent', function(event) {
  // Handle the event
});

// Later on
subscription.cancel();

This is easily done, we just need an object to store the event type, listener function and observed object so that it can call the unbind() method for us when we want to cancel the subscription.

Subscription = function(target, eventType, listener, context) {
  this._target    = target;
  this._eventType = eventType;
  this._listener  = listener;
  this._context   = context;
  this._active    = true;
};

Subscription.prototype.cancel = function() {
  if (!this._active) return;
  this._target.unbind(this._eventType, this._listener, this._context);
  this._active = false;
};

We can then change the bind() method to return one of these objects:

Observable.bind = function(eventType, listener, context) {
  this._listeners = this._listeners || {};
  var list = this._listeners[eventType] = this._listeners[eventType] || [];
  list.push([listener, context]);
  return new Subscription(this, eventType, listener, context);
};

There are further refactorings we could make; maybe it would be better to store the subscription objects rather than the listener functions on the observable object. We’d probably then move the listener calling logic into the Subscription class, although this could introduce extra function call overhead while dispatching events. For now I’m happy leaving it as sugar over the unbind() method. (This is cribbed from the Faye client, which is required by the protocol to support an unsubscribe() method that takes a channel name and listener to remove.)

So we’ve got an API that we’re happy with but its internals are sub-optimal: it takes O(n) time to remove a listener from an object, and if we need to do this operation a lot at scale we’ll have a performance problem. In Faye, for example, every client that subscribes to a channel must eventually unsubscribe, so we’d like that to be fast. In this situation we have two things we can use to make this faster: every client has a unique ID, and the list of subscribers to a channel must contain no duplicates – otherwise a client would receive some messages twice.

Putting all this together, we could model a channel’s subscribers using a JavaScript object to produce set-like behaviour:

var Channel = function() {
  this._subscribers = {};
};

Channel.prototype.bind = function(clientId, listener, context) {
  this._subscribers[clientId] = [listener, context];
};

Channel.prototype.unbind = function(clientId) {
  delete this._subscribers[clientId];
};

Channel.prototype.trigger = function(message) {
  for (var clientId in this._subscriptions) {
    var client = this._subscriptions[clientId];
    client[0].call(client[1], message);
  }
};

This would make it easy for us to add and remove subscriptions from a channel, but it means that message dispatch is slowed down due to the poor performance of for/in. But as I mentioned last week, there’s an easy solution to that: keep a sorted list of all the subscribed client IDs. The list speeds up iteration and message dispatch, while sorting the list makes removal O(log n) rather than O(n). Combining this with the Subscriber class from above, we get a final draft of our Channel API:

Channel = function() {
  this._clientIds   = [];
  this._subscribers = {};
};

Channel.prototype.bind = function(clientId, listener, context) {
  if (!this._subscribers.hasOwnProperty(clientId)) {
    var index = this._indexOf(clientId);
    this._clientIds.splice(index, 0, clientId);
  }
  this._subscribers[clientId] = [listener, context];
  return new Subscription(this, clientId);
};

Channel.prototype.unbind = function(clientId) {
  if (!this._subscribers.hasOwnProperty(clientId)) return;
  delete this._subscribers[clientId];
  var index = this._indexOf(clientId);
  this._clientIds.splice(index, 1);
};

Channel.prototype.trigger = function(message) {
  var clientIds = this._clientIds,
      i         = clientIds.length,
      client;
  
  while (i--) {
    client = this._subscribers[clientIds[i]];
    client[0].call(client[1], message);
  }
};

Channel.prototype._indexOf = function(key) {
  var keys = this._clientIds,
      n    = keys.length,
      i    = 0,
      d    = n;

  if (n === 0)         return 0;
  if (key < keys[0])   return 0;
  if (key > keys[n-1]) return n;

  while (key !== keys[i] && d > 0.5) {
    d = d / 2;
    i += (key > keys[i] ? 1 : -1) * Math.round(d);
    if (key > keys[i-1] && key < keys[i]) d = 0;
  }
  return i;
};

All we’ve done here is use the implementation of SortedTable from the previous article to model the channel’s set of subscribers. It supports O(log n) insertion and removal of subscribers and should iterate over them reasonably quickly. Of course in practise we would want to measure the total load of binding, dispatching and unbinding under production traffic to see which model has the best trade-off for our use case. For example, in Ruby the iteration performance of a Hash is nowhere near as bad as for JavaScript objects, so we could use the simpler Channel implementation and get O(1) insertion and removal of clients.

One final benefit of this API is that all that’s required to unbind from a channel is the clientId, since that uniquely identifies the listener to remove. The use of a Subscription object hides the mechanics of unbinding from the caller so we don’t have to change its code if we change the Channel class’s storage model.

Speaking at RubyConf 2010

A quick bit of self-promotional guff: I’m thrilled to announce I’ll be presenting at RubyConf 2010 in New Orleans this November. I’ll be speaking about the Ruby incarnation of Faye, and more broadly about doing asynchronous programming and testing using Ruby and EventMachine.

I gave a similar talk based on the JavaScript/Node version at the London Ajax User Group a couple months back, if you want a preview.

You can grab a ticket from the conference site, assuming they’ve not sold out by the time you read this. It’s a pretty daunting prospect sharing a bill with Matz and DHH, so fingers-crossed my first proper conference spot goes well.

The potentially asynchronous loop

If you write a lot of asynchronous or event-driven code, you’re probably going to end up needing an asynchronous for loop. That is, a loop that runs each iteration sequentially but those iterations may contain non-blocking logic that must halt the loop until the async action resumes. In my case, I need the main loop of JS.Test, the testing tool to be bundled with JS.Class 3.0, to run each test in sequence but each test has to support a suspend/resume system for async tests.

I’ll use a slightly more contrived example here: fetching a series of responses over Ajax in sequence using jQuery. It should be apparent that this will not do the job:

listOfUrls.forEach(function(url) {
  $.get(url, function(response) {
    // handle response
  });
});

The requests are made in sequence, but they overlap because each async request does not block the loop. We want each iteration to hold up the loop until the request it opens has completed. To do this, I’m going to introduce a resume callback as an argument to the iterator; each iteration must call this to continue the loop.

listOfUrls.asyncEach(function(url, resume) {
  $.get(url, function(response) {
    // handle response
    resume();
  });
});

An initial stab at implementing asyncEach() might look like this. The method takes an iterator function, and creates an internal function that moves a counter forward one index. As long as we’ve not reached the end of the list, we call iterator with the current element and the internal function as the resume callback.

Array.prototype.asyncEach = function(iterator) {
  var list = this,
      n    = list.length,
      i    = -1;
  
  var resume = function() {
    i += 1;
    if (i === n) return;
    iterator(list[i], resume);
  };
  resume();
};

This works just fine as long as every iteration contains an async action. Async code allows us to empty the call stack and start again when the async logic resumes. In JS.Test, each test might contain async code, but probably won’t, at least for my uses. If too many tests in a row don’t contain any async code, we get a stack overflow because of resume calling itself indirectly without giving the stack a break.

So we need a looping construct for iterations that might contain async code, that must on all JavaScript platforms only some of which have async functions built-in. Initially we can get rid of the stack overflow problem by scheduling the next iteration using setTimeout() instead of calling directly and growing the stack:

Array.prototype.asyncEach = function(iterator) {
  var list = this,
      n    = list.length,
      i    = -1;
  
  var iterate = function() {
    i += 1;
    if (i === n) return;
    iterator(list[i], resume);
  };
  
  var resume = function() {
    setTimeout(iterate, 1);
  };
  resume();
};

We should now be able to handle a large list of tests because the resume callback uses setTimeout() to essentially clear the call stack between iterations. But we now have the problem that this won’t run on platforms without setTimeout(). How do we turn it into a simple loop on such platforms without blowing the stack?

The trick is to have resume() act as a scheduling device, but implement the scheduling differently. On non-async platforms, we can do this by keeping a count of iterations left to run: calling resume() adds to this count, while calling iterate() decrements it. We keep a single loop running that runs the iteration as long as there are calls remaining. The finished code looks like this:

Array.prototype.asyncEach = function(iterator) {
  var list    = this,
      n       = list.length,
      i       = -1,
      calls   = 0,
      looping = false;

  var iterate = function() {
    calls -= 1;
    i += 1;
    if (i === n) return;
    iterator(list[i], resume);
  };

  var loop = function() {
    if (looping) return;
    looping = true;
    while (calls > 0) iterate();
    looping = false;
  };

  var resume = function() {
    calls += 1;
    if (typeof setTimeout === 'undefined') loop();
    else setTimeout(iterate, 1);
  };
  resume();
};

Notice how the looping flag blocks any more that one loop running at a time. After the first call to loop(), the effect of each iteration calling the resume() function is simply to increment calls and keep the loop going. This setup lets us iterate over a list on non-async platforms, and allows async code to exist within iterations on compatible platforms, keeping all the complexity in one place. Anywhere we want to iterate, we just call

list.asyncEach(function(item, resume) {
  // handle item
  resume();
});

and asyncEach figures out the best way to handle the loop.

Evented programming patterns: Testing event-driven apps

This post is part of a series on event-driven programming. The complete series is:

Thus far all the articles in this series have focused on methods for structuring applications to make them more modular and maintainable. They all help in their own way when correctly applied, but all of them leave one major area with something to be desired: scripting.

To be clear, it’s not that these techniques make code unscriptable. The problem is that even though the components may be well-designed and conceptually easy to plug together, writing quick scripts to manipulate an event-driven application can require a lot of boilerplate. One place this really shows up is in tests. Tests should, nay, must be easy to read. If they’re not, then their role as specifications and design/debugging tools is lost. And if tests are easy to write, you end up writing more tests, and you make better software. Wins all around.

But have you tried integration-testing a heavily event-driven app? I don’t care how fancy your test framework is, when every step in a test scenario needs some async work you’re going to end up with this:

ClientSpec = JS.Test.describe(Faye.Client, function() {
  before(function(resume) {
    var server = new Faye.NodeAdapter({mount: '/'})
    server.listen(8000)
    setTimeout(function() {
      resume(function() {
        var endpoint = 'http://0.0.0.0:8000'
        this.clientA = new Faye.Client(endpoint)
        this.clientB = new Faye.Client(endpoint)
      })
    }, 500)
  })
  
  it('sends a message from A to B', function(resume) {
    clientA.subscribe('/channel', function(message) {
      this.message = message
    }, this)
    setTimeout(function() {
      clientB.publish('/channel', {hello: 'world'})
      setTimeout(function() {
        resume(function() {
          assertEqual( {hello: 'world'}, message )
        })
      }, 250)
    }, 100)
  })
})

All this test does is make sure one client can send a message to another using a Faye messaging server. While working on Faye, I’ve ended up getting the tests into a state where that test reads like this:

Scenario.run('Two clients, single message send',
function() { with(this) {
  server( 8000 )
  client( 'A', ['/channel'] )
  client( 'B', [] )
  publish( 'B', '/channel', {hello: 'world'} )
  checkInbox( 'A', [{hello: 'world'}] )
  checkInbox( 'B', [] )
}})

This should be pretty self-explanatory: start a server on port 8000, make client A subscribe to /channel, make client B with no subscriptions, make B publish the message {hello: 'world'} to /channel, and make sure A and only A received the message.

Now the problem is, executing all those steps synchronously won’t work. There are network delays and other timeouts that need to happen between steps to make the test work. We need an abstraction that lets us write scripts at a very high level like this and hides all the inconsequential (and possibly volatile) glue code between the lines. This is pretty easy to do by breaking the script into two components, which here I’ll call a Scenario and a CommandQueue.

The job of the Scenario is to implement all the script steps involved in running the test. It should have a method for each type of command that accepts the right arguments, and also accepts a callback that it should call when the scenario is ready to continue running steps. For example, let’s get our Faye scenario class started:

Scenario = function() {
  this._clients = {};
  this._inboxes = {};
};

Scenario.prototype.server = function(port, resume) {
  this._port = port;
  var server = new Faye.NodeAdapter({mount: '/'});
  server.listen(port);
  setTimeout(resume, 500);
};

The server() command in our test just takes a port number, so our scenario method needs to take the port number and the resume callback function. It runs resume after a delay to let the server spin up so it’s ready to take requests before we continue our test.

Next up we need the client() method. This takes a name and a list of channels to subscribe to, and the callback as before.

Scenario.prototype.client = function(name, channels, resume) {
  var endpoint = 'http://0.0.0.0:' + this._port,
      client   = new Faye.Client(endpoint);
  
  channels.forEach(function(channel) {
    client.subscribe(channel, function(message) {
      this._inboxes[name].push(message);
    }, this);
  }, this);
  
  this._clients[name] = client;
  this._inboxes[name] = [];
  setTimeout(resume, 100);
};

A similar pattern here: we create a channel, make somewhere to store the messages it receives, set up subscriptions, then wait a little for the subscriptions to have time to register with the server. By now the publish() and checkInbox() methods should be fairly predictable:

Scenario.prototype.publish = function(name, channel, message, resume) {
  var client = this._clients[name];
  client.publish(channel, message);
  setTimeout(resume, 250);
};

Scenario.prototype.checkInbox = function(name, messages, resume) {
  assert.deepEqual(messages, this._inboxes[name]);
  resume();
};

Note how in the final method we still accept the resume callback even though the method is synchronous and calls the callback immediately. The job of this component is to provide a convention that in general, you should pass each method a callback and it decides when the program should continue. Here we’ve used timeouts, but it could be after an Ajax call, and animation, a user-triggered event, anything at all.

Now we’ve implemented all the steps, we need something that can run the test as we’ve written it, without callbacks. I’ll call this the CommandQueue: rather than executing commands immediately it stores them in a queue.

CommandQueue = function() {
  this._scenario = new Scenario();
  this._commands = [];
};

CommandQueue.prototype = {
  server: function(port) {
    this.enqueue(['server', port]);
  },
  client: function(name, channels) {
    this.enqueue(['client', name, channels]);
  },
  publish: function(name, channel, message) {
    this.enqueue(['publish', name, channel, message]);
  },
  checkInbox: function(name, messages) {
    this.enqueue(['checkInbox', name, messages]);
  },
};

This implements the API that we want to use in our test, but so far the script is inert: nothing actually gets run. We need a method to run the next command in the queue. This takes the next command off the queue, and adds a callback to the argument list that will run the next command once called. The methods in the Scenario will use that callback to resume the test when ready.

CommandQueue.prototype.runNext = function() {
  var command = this._commands.shift().slice(),
      method  = command.shift(),
      self    = this;
  
  var resume = function() { self.runNext() };
  command.push(resume);
  
  this._scenario[method].apply(this._scenario, command);
};

We also need the first command addition to trigger the execution of the queue, so we’ll implement enqueue() to deal with this. We start the execution with a timeout, since if we do it synchronously no more commands will have been added by the time the first command returns.

CommandQueue.prototype.enqueue = function(command) {
  this._commands.push(command);
  if (this._started) return;
  
  this._started = true;

  var self = this;
  setTimeout(function() { self.runNext() }, 100);
};

As the final piece of glue, we need the Scenario.run() function, which takes our test script and executes it using a CommandQueue to do the work.

Scenario.run = function(testName, block) {
  var commandQueue = new CommandQueue();
  block.call(commandQueue);
};

If you have a lot of test scenarios, you can use another command queue to help sequence them into a single test suite without too much bother. For a more complete example of some of these patterns, you can read through the Faye test suite, which also includes a variation on the above done in Ruby with EventMachine and Test::Unit.