One of the great myths about JavaScript is that, because it doesn’t have threads, you don’t get all the concurrency problems associated with threads. While it’s certainly true that some thread-related problems don’t show up in JavaScript programs, there’s still plenty of potential for race conditions, and it’s worth looking to threaded languages for potential solutions to them.
Here’s an example. The following Ruby program assigns a variable counter
the
value 0
. It then starts two concurrent threads, each of which increments the
counter five hundred times. The program then waits for both threads to complete
(it “joins” them), then prints the value of counter
, which we expect to be
1000
.
counter = 0
thread_1 = Thread.new do
# and I would count five hundred times
500.times do
counter += 1
end
end
thread_2 = Thread.new do
# and I would count five hundred more
500.times do
counter += 1
end
end
[thread_1, thread_2].each(&:join)
puts counter
If you run this program with JRuby, depending on your hardware you might
see something other than 1000
in your terminal. If so, congratulations, you’ve
found a race condition! Why does it happen?
The statement counter += 1
is syntactic sugar, what’s really going on is
something more like this:
temp = counter
temp = temp + 1
counter = temp
That is, the value of counter
is read out of memory into some temporary
location (maybe a local variable, maybe a CPU register), it is incremented, and
then the result is written back into the memory that counter
points at. When two
threads are executing this code at the same time, and the steps of the process
can be interleaved, like so:
THREAD 1 THREAD 2
-------- --------
1. temp = counter
2. temp = counter
3. temp = temp + 1
4. counter = temp
5. temp = temp + 1
6. counter = temp
Remember that temp
is local to each thread, but counter
is a global
variable. Say counter
is 3
before the above code runs. Line 1 reads
counter
and assigns its temp
the value 3
. Then line 2 runs and does
exactly the same: both threads now have temp = 3
. On lines 3 and 4, thread 1
increments its temp
to 4
and writes it back to counter
, and then on lines
5 and 6 thread 2 does the same, again assigning counter = 4
.
So running counter += 1
twice has resulting in counter
changing from 3
to
4
, rather than to 5
as you would expect. This is a type of race condition
called ‘lost updates’, and it happens whenever two threads or processes make a
copy of some resource, make a change to their copy, and then write their copy
back. One thread is unaware of changes made by the other, and fails to
incorporate those changes into its own copy before writing, so its update just
clobbers the first one.
In threaded languages, one solution to this problem is called a mutex, short for ‘mutual exclusion’. It’s a type of lock; before executing code that interacts with a shared resource, threads must acquire the lock. Once they have the lock, they can continue executing, and they should release the lock when they reach the end of the critical section. Mutexes make sure that only one thread can hold the lock at a time; if a you try to claim a mutex that’s locked by another thread, you will block until the lock is released.
In Ruby, we can apply mutexes to our code as follows:
counter = 0
mutex = Mutex.new
thread_1 = Thread.new do
500.times do
mutex.synchronize { counter += 1 }
end
end
thread_2 = Thread.new do
500.times do
mutex.synchronize { counter += 1 }
end
end
[thread_1, thread_2].each(&:join)
puts counter # => 1000
By wrapping mutex.synchronize { ... }
around the interactions with counter
,
we make sure only one thread is incrementing at a time. mutex.synchronize
blocks until the current thread can claim the lock, then it executes the block
passed to it, then it releases the lock.
Another way to look at mutexes is that they execute blocks of code atomically. They make sure each block passed to them runs to completion before another block is started. It means the interleaving situation described above never happens, and the work on one thread always completes before another thread takes over:
THREAD 1 THREAD 2
-------- --------
1. temp = counter
2. temp = temp + 1
3. counter = temp
4. temp = counter
5. temp = temp + 1
6. counter = temp
Now, how does this relate to JavaScript? It doesn’t have threads, and if you have a function like:
var increment = function() {
var temp = counter;
temp = temp + 1;
counter = temp;
};
That function will always run to completion, it won’t be interrupted (or pre-empted in the threading terminology) while some other function runs for a bit. However, we do have asynchronous functions that do suffer this problem. For example, take this function that lets you edit a JSON file by passing a function the transforms the data structure in some way:
var fs = require('fs'),
path = require('path');
var editConfig = function(edit) {
var configPath = path.resolve(__dirname, 'config.json');
fs.readFile('config.json', 'utf8', function(error, json) {
var config = JSON.parse(json);
edit(config);
fs.writeFile(configPath, JSON.stringify(config), 'utf8');
});
};
This function asynchronously reads the file from disk (making a copy of its
contents in memory), JSON-parses it, transforms the parsed data using the
passed-in edit()
function, then asynchronously writes the result back to disk.
(Node.js does provide synchronous filesystem APIs, but that’s not the point here. There are many other situations where either the resource itself only has async APIs, or the work between the read and the write is async, and they have the same problem I’m about to demonstrate.)
If you try running the following code using this function, what do you expect to end up in the config file?
editConfig(function(config) { config.foo = 1 });
editConfig(function(config) { config.bar = 2 });
You might expect the file to contain {"foo":1,"bar":2}
, but in fact it only
contains {"bar":2}
. Why is that? It’s just read/write interleaving again:
THREAD 1 THREAD 2
-------- --------
1. config = read()
2. config = read()
3. edit(config)
4. write(config)
5. edit(config)
6. write(config)
Although editConfig()
runs to completion like any other function, the logical
task it represents is not over. When editConfig()
returns, it has called
fs.readFile()
with a callback, but that callback will execute later, when
Node’s I/O subsystem is done reading from the file.
So, both invocations of editConfig()
read the file, and both get a copy of the
file’s initial contents. Both calls make a change to their copy, and then write
their copy back to disk. The last write clobbers the first, and only the second
edit is committed to the file. We might not have threads, but we still have
concurrent execution of logical tasks.
We have various ways we could fix this, for example, what if editConfig()
took
a callback or returned a promise, then we could chain the invocations (I’m
ignoring error handling for the moment, for the sake of brevity):
var editConfig = function(edit) {
var configPath = path.resolve(__dirname, 'config.json');
return new Promise(function(resolve, reject) {
fs.readFile('config.json', 'utf8', function(error, json) {
var config = JSON.parse(json);
edit(config);
fs.writeFile(configPath, JSON.stringify(config), 'utf8', function(error) {
resolve();
});
});
});
};
editConfig(function(config) { config.foo = 1 })
.then(function() {
return editConfig(function(config) { config.bar = 2 });
})
.then(function() {
return editConfig(function(config) { config.qux = 3 });
});
This certainly works, and writes each property to the file sequentially. But
what if the calls into editConfig()
are not syntactically near each other, but
come from distant unconnected parts of your program, or from multiple concurrent
invocations of the same logic?
This is where mutexes come in. Recall that in Ruby, mutex.synchronize
effectively makes sure that when blocks are passed into it, only one of those
blocks is executed at a time. No other blocks are allowed to run until the
current one is finished. Using promises, we can implement a similar idea in
JavaScript.
Our Mutex
will maintain a queue of functions, and those functions are expected
to return promises. When a new function is passed in, it is added to the queue.
If the mutex is not currently busy, the function is immediately shifted off the
queue and executed. We use the promise returned by each function to determine
when that logical task is over, and we then dequeue the next task and execute
it, continuing until the queue is empty. Here’s the code:
var Mutex = function() {
this._busy = false;
this._queue = [];
};
Mutex.prototype.synchronize = function(task) {
this._queue.push(task);
if (!this._busy) this._dequeue();
};
Mutex.prototype._dequeue = function() {
this._busy = true;
var next = this._queue.shift();
if (next)
this._execute(next);
else
this._busy = false;
};
Mutex.prototype._execute = function(task) {
var self = this;
task().then(function() {
self._dequeue();
}, function() {
self._dequeue();
});
};
(We need two callbacks to self._dequeue()
because task()
may produce either
a fulfilled or rejected promise; this implementation continues to process queued
functions even if one of them fails. Whether you want this semantics in your
application is for you to decide.)
We can use a mutex to make sure all the edits to a file happen sequentially. Running the following program will result in the config file containing all three edits:
var mutex = new Mutex();
mutex.synchronize(function() {
return editConfig(function(config) { config.foo = 1 });
});
mutex.synchronize(function() {
return editConfig(function(config) { config.bar = 2 });
});
mutex.synchronize(function() {
return editConfig(function(config) { config.qux = 3 });
});
This is still sub-optimal though: it requires every caller that interacts with
editConfig()
to know it must synchronize on the mutex, and have a reference to
the mutex in order to do so. This isn’t really any better than requiring the
different callers to co-ordinate via callback or promise chaining. And all the
callers do need to go through the same mutex: it’s the relationship of one mutex
to one resource that stops all the callers clobbering each other’s file updates.
Luckily, there’s a simple fix: move the synchronization into the editConfig()
function:
var mutex = new Mutex();
var editConfig = function(edit) {
var configPath = path.resolve(__dirname, 'config.json');
return mutex.synchronize(function() {
return new Promise(function(resolve, reject) {
fs.readFile('config.json', 'utf8', function(error, json) {
var config = JSON.parse(json);
edit(config);
fs.writeFile(configPath, JSON.stringify(config), 'utf8', function(error) {
resolve();
});
});
});
});
};
Now, the callers can use editConfig()
directly, without needing to know
there’s a mutex involved, and all changes to the file, no matter when and where
they are requested, will be done sequentially:
editConfig(function(config) { config.foo = 1 });
editConfig(function(config) { config.bar = 2 });
editConfig(function(config) { config.qux = 3 });
// -> config.json == {"foo":1,"bar":2,"qux":3}
This has taken care of the control flow part of the problem, but there’s a
further enhancement we should make. The caller of an async operation often wants
to know the result, that is, we would like to write code like this, that
receives the result of the edit, especially if the edit failed for some reason
(here I’m using the version of editConfig()
without built-in mutex support):
var result = mutex.synchronize(function() {
return editConfig(function(config) { config.foo = 1 });
});
result.catch(function(error) {
// handle the error
});
Our current implementation doesn’t support this: mutex.synchronize()
does not
return anything. We want to know the result of the code in the function passed
to mutex.synchronize()
, whenever that gets executed; the above snippet
suggests that mutex.synchronize(task)
should return a promise, and that
promise should resolve with the value of the promise returned by task()
. This
can be accomplished with a couple of small changes to synchronize()
and
_execute()
.
When calling synchronize()
, we return a new promise, and stash the (resolve,
reject)
functions for it along with the task in the queue. When we execute the
task, we bind the (resolve, reject)
functions to its promise, which pipes the
result of task()
through to the promise returned from synchronize()
. Note
that we only need one callback to self._dequeue()
now because neither
resolve
nor reject
is expected to throw an error, so task().then(resolve,
reject)
should always produce a fulfilled promise. We chain dequeueing after
resolving the promise so that the caller of synchronize()
is not blocked on
the queue being processed.
Mutex.prototype.synchronize = function(task) {
var self = this;
return new Promise(function(resolve, reject) {
self._queue.push([task, resolve, reject]);
if (!self._busy) self._dequeue();
});
};
Mutex.prototype._execute = function(record) {
var task = record[0],
resolve = record[1],
reject = record[2],
self = this;
task().then(resolve, reject).then(function() {
self._dequeue();
});
};
You might wonder why this is necessary, for example, why can’t we just write the following if we want the result of the operation:
mutex.synchronize(function() {
return editConfig(function(config) { config.foo = 1 })
.catch(function(error) {
// handle the error
});
});
There are two problems with this. First, it means the mutex is blocked on
whatever code is in your error handling function, since that’s now part of the
promise returned to the mutex. You should confine mutexed code to only those
instructions that interact with a shared resource, and then release the mutex as
soon as possible so other code can use it. Hogging the mutex while you execute
result/error handling reduces throughput. Second, it masks the result of the
operation from the mutex: calling catch()
with a function that doesn’t throw
means the promise returned to the mutex will always be fulfilled, whereas the
mutex might need to know that a task failed – it might want to halt all
execution in this case.
For these reasons, the following form is preferred, since it releases the mutex as soon as possible and allows the mutex to see the result of the task:
var result = mutex.synchronize(function() {
return editConfig(function(config) { config.foo = 1 });
});
Having mutex.synchronize()
return a promise has one final benefit: it means
it’s composable, and it allows you to take mutexes on multiple resources at
once. Say you had an operation that involved editing multiple files, you can
claim mutexes for each of them to make sure all changes to the collection of
files are done sequentially:
var mutexA = new Mutex(),
mutexB = new Mutex();
var result = mutexA.synchronize(function() {
return mutexB.synchronize(function() {
return Promise.all([
editFile('a.txt', function(c) { c.foo = 1 }),
editFile('b.txt', function(c) { c.bar = 2 })
]);
});
});
Because mutexB.synchronize()
returns a promise, the mutexA.synchronize()
call will ‘block’ until mutexB
is done executing the task, meaning all locks
are held until the whole task completes. And, since we’ve made
Mutex.synchronize()
return a promise containing the result of the task, the
result of editFile()
bubbles back up to result
at the top level. Since the
two edits work on different files, we can perform those concurrently and group
the results with Promise.all()
– we just shouldn’t make concurrent edits to
the same file.
To me, this Mutex
abstraction and this last example in particular show off the
real power of promises. By separating business logic from control flow, by
treating pending results as values, they enable the creation of generic
constructs for solving concurrency problems. They let you write code that
concisely describes the concurrent structure of a problem, without having to
explicitly specify when each operation should happen. You’re describing the
problem constraints, and the abstractions make sure everything is correctly
ordered.
Mutexes are just one of the patterns I find myself reinventing over and over as I work with concurrent JavaScript code. I’ve a lot more to say on this topic, so if you’re interested in learning more, please sign up for my upcoming book, Concurrent JavaScript.