Mutexes and JavaScript

One of the great myths about JavaScript is that, because it doesn’t have threads, you don’t get all the concurrency problems associated with threads. While it’s certainly true that some thread-related problems don’t show up in JavaScript programs, there’s still plenty of potential for race conditions, and it’s worth looking to threaded languages for potential solutions to them.

Here’s an example. The following Ruby program assigns a variable counter the value 0. It then starts two concurrent threads, each of which increments the counter five hundred times. The program then waits for both threads to complete (it “joins” them), then prints the value of counter, which we expect to be 1000.

counter = 0

thread_1 = Thread.new do
  # and I would count five hundred times
  500.times do
    counter += 1
  end
end

thread_2 = Thread.new do
  # and I would count five hundred more
  500.times do
    counter += 1
  end
end

[thread_1, thread_2].each(&:join)

puts counter

If you run this program with JRuby, depending on your hardware you might see something other than 1000 in your terminal. If so, congratulations, you’ve found a race condition! Why does it happen?

The statement counter += 1 is syntactic sugar, what’s really going on is something more like this:

temp = counter
temp = temp + 1
counter = temp

That is, the value of counter is read out of memory into some temporary location (maybe a local variable, maybe a CPU register), it is incremented, and then the result is written back into the memory that counter points at. When two threads are executing this code at the same time, and the steps of the process can be interleaved, like so:

            THREAD 1              THREAD 2
            --------              --------

    1.      temp = counter
    2.                            temp = counter
    3.      temp = temp + 1
    4.      counter = temp
    5.                            temp = temp + 1
    6.                            counter = temp

Remember that temp is local to each thread, but counter is a global variable. Say counter is 3 before the above code runs. Line 1 reads counter and assigns its temp the value 3. Then line 2 runs and does exactly the same: both threads now have temp = 3. On lines 3 and 4, thread 1 increments its temp to 4 and writes it back to counter, and then on lines 5 and 6 thread 2 does the same, again assigning counter = 4.

So running counter += 1 twice has resulting in counter changing from 3 to 4, rather than to 5 as you would expect. This is a type of race condition called ‘lost updates’, and it happens whenever two threads or processes make a copy of some resource, make a change to their copy, and then write their copy back. One thread is unaware of changes made by the other, and fails to incorporate those changes into its own copy before writing, so its update just clobbers the first one.

In threaded languages, one solution to this problem is called a mutex, short for ‘mutual exclusion’. It’s a type of lock; before executing code that interacts with a shared resource, threads must acquire the lock. Once they have the lock, they can continue executing, and they should release the lock when they reach the end of the critical section. Mutexes make sure that only one thread can hold the lock at a time; if a you try to claim a mutex that’s locked by another thread, you will block until the lock is released.

In Ruby, we can apply mutexes to our code as follows:

counter = 0
mutex   = Mutex.new

thread_1 = Thread.new do
  500.times do
    mutex.synchronize { counter += 1 }
  end
end

thread_2 = Thread.new do
  500.times do
    mutex.synchronize { counter += 1 }
  end
end

[thread_1, thread_2].each(&:join)

puts counter # => 1000

By wrapping mutex.synchronize { ... } around the interactions with counter, we make sure only one thread is incrementing at a time. mutex.synchronize blocks until the current thread can claim the lock, then it executes the block passed to it, then it releases the lock.

Another way to look at mutexes is that they execute blocks of code atomically. They make sure each block passed to them runs to completion before another block is started. It means the interleaving situation described above never happens, and the work on one thread always completes before another thread takes over:

            THREAD 1              THREAD 2
            --------              --------

    1.      temp = counter
    2.      temp = temp + 1
    3.      counter = temp
    4.                            temp = counter
    5.                            temp = temp + 1
    6.                            counter = temp

Now, how does this relate to JavaScript? It doesn’t have threads, and if you have a function like:

var increment = function() {
  var temp = counter;
  temp = temp + 1;
  counter = temp;
};

That function will always run to completion, it won’t be interrupted (or pre-empted in the threading terminology) while some other function runs for a bit. However, we do have asynchronous functions that do suffer this problem. For example, take this function that lets you edit a JSON file by passing a function the transforms the data structure in some way:

var fs   = require('fs'),
    path = require('path');

var editConfig = function(edit) {
  var configPath = path.resolve(__dirname, 'config.json');

  fs.readFile('config.json', 'utf8', function(error, json) {
    var config = JSON.parse(json);
    edit(config);
    fs.writeFile(configPath, JSON.stringify(config), 'utf8');
  });
};

This function asynchronously reads the file from disk (making a copy of its contents in memory), JSON-parses it, transforms the parsed data using the passed-in edit() function, then asynchronously writes the result back to disk.

(Node.js does provide synchronous filesystem APIs, but that’s not the point here. There are many other situations where either the resource itself only has async APIs, or the work between the read and the write is async, and they have the same problem I’m about to demonstrate.)

If you try running the following code using this function, what do you expect to end up in the config file?

editConfig(function(config) { config.foo = 1 });
editConfig(function(config) { config.bar = 2 });

You might expect the file to contain {"foo":1,"bar":2}, but in fact it only contains {"bar":2}. Why is that? It’s just read/write interleaving again:

            THREAD 1              THREAD 2
            --------              --------

    1.      config = read()
    2.                            config = read()
    3.      edit(config)
    4.      write(config)
    5.                            edit(config)
    6.                            write(config)

Although editConfig() runs to completion like any other function, the logical task it represents is not over. When editConfig() returns, it has called fs.readFile() with a callback, but that callback will execute later, when Node’s I/O subsystem is done reading from the file.

So, both invocations of editConfig() read the file, and both get a copy of the file’s initial contents. Both calls make a change to their copy, and then write their copy back to disk. The last write clobbers the first, and only the second edit is committed to the file. We might not have threads, but we still have concurrent execution of logical tasks.

We have various ways we could fix this, for example, what if editConfig() took a callback or returned a promise, then we could chain the invocations (I’m ignoring error handling for the moment, for the sake of brevity):

var editConfig = function(edit) {
  var configPath = path.resolve(__dirname, 'config.json');

  return new Promise(function(resolve, reject) {
    fs.readFile('config.json', 'utf8', function(error, json) {
      var config = JSON.parse(json);
      edit(config);
      fs.writeFile(configPath, JSON.stringify(config), 'utf8', function(error) {
        resolve();
      });
    });
  });
};

editConfig(function(config) { config.foo = 1 })
.then(function() {
  return editConfig(function(config) { config.bar = 2 });
})
.then(function() {
  return editConfig(function(config) { config.qux = 3 });
});

This certainly works, and writes each property to the file sequentially. But what if the calls into editConfig() are not syntactically near each other, but come from distant unconnected parts of your program, or from multiple concurrent invocations of the same logic?

This is where mutexes come in. Recall that in Ruby, mutex.synchronize effectively makes sure that when blocks are passed into it, only one of those blocks is executed at a time. No other blocks are allowed to run until the current one is finished. Using promises, we can implement a similar idea in JavaScript.

Our Mutex will maintain a queue of functions, and those functions are expected to return promises. When a new function is passed in, it is added to the queue. If the mutex is not currently busy, the function is immediately shifted off the queue and executed. We use the promise returned by each function to determine when that logical task is over, and we then dequeue the next task and execute it, continuing until the queue is empty. Here’s the code:

var Mutex = function() {
  this._busy  = false;
  this._queue = [];
};

Mutex.prototype.synchronize = function(task) {
  this._queue.push(task);
  if (!this._busy) this._dequeue();
};

Mutex.prototype._dequeue = function() {
  this._busy = true;
  var next = this._queue.shift();

  if (next)
    this._execute(next);
  else
    this._busy = false;
};

Mutex.prototype._execute = function(task) {
  var self = this;

  task().then(function() {
    self._dequeue();
  }, function() {
    self._dequeue();
  });
};

(We need two callbacks to self._dequeue() because task() may produce either a fulfilled or rejected promise; this implementation continues to process queued functions even if one of them fails. Whether you want this semantics in your application is for you to decide.)

We can use a mutex to make sure all the edits to a file happen sequentially. Running the following program will result in the config file containing all three edits:

var mutex = new Mutex();

mutex.synchronize(function() {
  return editConfig(function(config) { config.foo = 1 });
});

mutex.synchronize(function() {
  return editConfig(function(config) { config.bar = 2 });
});

mutex.synchronize(function() {
  return editConfig(function(config) { config.qux = 3 });
});

This is still sub-optimal though: it requires every caller that interacts with editConfig() to know it must synchronize on the mutex, and have a reference to the mutex in order to do so. This isn’t really any better than requiring the different callers to co-ordinate via callback or promise chaining. And all the callers do need to go through the same mutex: it’s the relationship of one mutex to one resource that stops all the callers clobbering each other’s file updates.

Luckily, there’s a simple fix: move the synchronization into the editConfig() function:

var mutex = new Mutex();

var editConfig = function(edit) {
  var configPath = path.resolve(__dirname, 'config.json');

  return mutex.synchronize(function() {
    return new Promise(function(resolve, reject) {
      fs.readFile('config.json', 'utf8', function(error, json) {
        var config = JSON.parse(json);
        edit(config);
        fs.writeFile(configPath, JSON.stringify(config), 'utf8', function(error) {
          resolve();
        });
      });
    });
  });
};

Now, the callers can use editConfig() directly, without needing to know there’s a mutex involved, and all changes to the file, no matter when and where they are requested, will be done sequentially:

editConfig(function(config) { config.foo = 1 });
editConfig(function(config) { config.bar = 2 });
editConfig(function(config) { config.qux = 3 });

// -> config.json == {"foo":1,"bar":2,"qux":3}

This has taken care of the control flow part of the problem, but there’s a further enhancement we should make. The caller of an async operation often wants to know the result, that is, we would like to write code like this, that receives the result of the edit, especially if the edit failed for some reason (here I’m using the version of editConfig() without built-in mutex support):

var result = mutex.synchronize(function() {
  return editConfig(function(config) { config.foo = 1 });
});

result.catch(function(error) {
  // handle the error
});

Our current implementation doesn’t support this: mutex.synchronize() does not return anything. We want to know the result of the code in the function passed to mutex.synchronize(), whenever that gets executed; the above snippet suggests that mutex.synchronize(task) should return a promise, and that promise should resolve with the value of the promise returned by task(). This can be accomplished with a couple of small changes to synchronize() and _execute().

When calling synchronize(), we return a new promise, and stash the (resolve, reject) functions for it along with the task in the queue. When we execute the task, we bind the (resolve, reject) functions to its promise, which pipes the result of task() through to the promise returned from synchronize(). Note that we only need one callback to self._dequeue() now because neither resolve nor reject is expected to throw an error, so task().then(resolve, reject) should always produce a fulfilled promise. We chain dequeueing after resolving the promise so that the caller of synchronize() is not blocked on the queue being processed.

Mutex.prototype.synchronize = function(task) {
  var self = this;

  return new Promise(function(resolve, reject) {
    self._queue.push([task, resolve, reject]);
    if (!self._busy) self._dequeue();
  });
};

Mutex.prototype._execute = function(record) {
  var task    = record[0],
      resolve = record[1],
      reject  = record[2],
      self    = this;

  task().then(resolve, reject).then(function() {
    self._dequeue();
  });
};

You might wonder why this is necessary, for example, why can’t we just write the following if we want the result of the operation:

mutex.synchronize(function() {
  return editConfig(function(config) { config.foo = 1 })
  .catch(function(error) {
    // handle the error
  });
});

There are two problems with this. First, it means the mutex is blocked on whatever code is in your error handling function, since that’s now part of the promise returned to the mutex. You should confine mutexed code to only those instructions that interact with a shared resource, and then release the mutex as soon as possible so other code can use it. Hogging the mutex while you execute result/error handling reduces throughput. Second, it masks the result of the operation from the mutex: calling catch() with a function that doesn’t throw means the promise returned to the mutex will always be fulfilled, whereas the mutex might need to know that a task failed – it might want to halt all execution in this case.

For these reasons, the following form is preferred, since it releases the mutex as soon as possible and allows the mutex to see the result of the task:

var result = mutex.synchronize(function() {
  return editConfig(function(config) { config.foo = 1 });
});

Having mutex.synchronize() return a promise has one final benefit: it means it’s composable, and it allows you to take mutexes on multiple resources at once. Say you had an operation that involved editing multiple files, you can claim mutexes for each of them to make sure all changes to the collection of files are done sequentially:

var mutexA = new Mutex(),
    mutexB = new Mutex();

var result = mutexA.synchronize(function() {
  return mutexB.synchronize(function() {
    return Promise.all([
      editFile('a.txt', function(c) { c.foo = 1 }),
      editFile('b.txt', function(c) { c.bar = 2 })
    ]);
  });
});

Because mutexB.synchronize() returns a promise, the mutexA.synchronize() call will ‘block’ until mutexB is done executing the task, meaning all locks are held until the whole task completes. And, since we’ve made Mutex.synchronize() return a promise containing the result of the task, the result of editFile() bubbles back up to result at the top level. Since the two edits work on different files, we can perform those concurrently and group the results with Promise.all() – we just shouldn’t make concurrent edits to the same file.

To me, this Mutex abstraction and this last example in particular show off the real power of promises. By separating business logic from control flow, by treating pending results as values, they enable the creation of generic constructs for solving concurrency problems. They let you write code that concisely describes the concurrent structure of a problem, without having to explicitly specify when each operation should happen. You’re describing the problem constraints, and the abstractions make sure everything is correctly ordered.

Mutexes are just one of the patterns I find myself reinventing over and over as I work with concurrent JavaScript code. I’ve a lot more to say on this topic, so if you’re interested in learning more, please sign up for my upcoming book, Concurrent JavaScript.