In part 1 of this article, we saw how we could use traits to make a function work with various built-in types, how matching traits to types works, and what happens when we violate the coherence rules. In this part we’ll look at some more types, letting us work with concurrency and different techniques for routing input and output. We’ll also look at const generics, an upcoming feature that lets us make better use of types that are parameterised over sizes.
Let’s go back to our exponentiating program. Conceptually, what we’ve done is
we’ve chained a set of machines together, where each takes two inputs and
produces one output. Some of the inputs are hard-coded (the power 3
) while
others are the outputs from other machines.
3 3 3
| | |
v v v
+-----------+ +-----------+ +-----------+
2 ---->| pow(x, y) |---->| pow(x, y) |---->| pow(x, y) |----> result
+-----------+ +-----------+ +-----------+
There’s nothing surprising about this, it’s what we do all the time when we compose functions in our programs. In this system, data flows in one direction and there are no cycles in the design. Things get more interesting when the output of one machine can loop back around and feed the input of a previous machine in the sequence:
3 3 3
| | |
v v v
+-----------+ +-----------+ +-----------+
2 ---->| pow(x, y) |---->| pow(x, y) |---->| pow(x, y) |---+--> result
+-----------+ +-----------+ +-----------+ |
^ |
| |
+---------------------------------------------+
This situation does not have a simple translation to function composition – how can the result of one function be fed back into a call that produced one of its inputs? This problem is more analogous to a set of concurrent communicating processes; all the machines run in parallel, but at any time one can emit output that’s consumed as an input message by another.
One such system occurs on day 7, and the program below is structurally similar to the program used in that puzzle. I have listed the program broken down into instructions, and various data regions, labelled with the start address of each line and a high-level description of what that line does.
let program = [
3, 28, // 0 #28 = input()
1, 45, 28, 28, // 2 |
2, 49, 28, 28, // 6 | #28 = 50 + 6 * (#28 - 1)
1, 70, 28, 28, // 10 |
1, 49, 72, 72, // 14 #72 = 6
6, 72, 69, // 18 if #72 == 0 then goto 44
1, 45, 72, 72, // 21 #72 = #72 - 1
3, 71, // 25 #71 = input()
1, 0, 71, 71, // 27 #71 = #71 + #(#28)
2, 46, 71, 71, // 31 #71 = #71 * 3
4, 71, // 35 output(#71)
1, 47, 28, 28, // 37 #28 = #28 + 1
6, 48, 68, // 41 goto 18
99, // 44 halt
-1, 3, 1, 0, 6, // 45 constants
5, 2, 7, 4, 3, 6, // 50 |
2, 3, 6, 5, 4, 7, // 56 | addition parameters
7, 4, 5, 6, 2, 3, // 62 |
18, 44, 50, // 68 jump addresses
0, 0, // 71 variables
];
The main body of this program (beginning at 18
) is a loop. It iterates six
times; the counter is stored in #72
and initialised by the instruction at
14
. On each turn of the loop, it fetches an input value (storing it in #71
),
adds some number from 2 to 7 to it, multiplies the result by 3, and then emits
the final value as output. The values it adds on each turn are given by one of
the sequences beginning at 50
, 56
and 62
. The input instruction at the
start of the program fetches an input, which should be 1
, 2
or 3
, into
#28
, which stores the address of the number to be added to the loop input. The
instructions at 2
, 6
and 10
manipulate this first input so that #28
points to the start of one of the addition parameter blocks, and the instruction
at 37
increments #28
so it points to the next value in the sequence.
So in all this program takes seven inputs – one configuration variable and six
data values – and emits six outputs. In the puzzle, several machines running
this program are connected in a loop, so that output from one flows as input
into the next. Each machine is first fed its initialisation parameter (here 1
,
2
or 3
), and then the first machine is fed the value 0
, which causes it to
do some work and emit an output value. That output flows into the second
machine, and so the cycle continues, until each machine has processed six inputs
and stops.
3 1 2
| | |
v v v
+-----------+ +-----------+ +-----------+
0 ---+-->| machine A |---->| machine B |---->| machine C |---+--> result
| +-----------+ +-----------+ +-----------+ |
| |
+---------------------------<---------------------------+
The number of inputs and outputs is intended to be opaque from the outside; the real Advent program contains ten distinct sub-routines selected by the first input, where the first five have a single input and output and the latter five take and return ten values each. A later puzzle has the machines connected in a network where one can send a message to any other, so the data flow pattern is unknown in advance.
In this situation, we can’t simply run machine A to completion and then feed its
outputs into machine B, because there’s a feedback loop: machine A’s inputs
depend on its outputs. If we feed machine A the initialisation parameter 3
and
then the value 0
, it will complete one turn of the processing loop and then
either get stuck or crash, because it wants more input than we’ve provided.
Instead we have to run the machines concurrently: it should be possible for A
to process its first input, and then pause when it requests a second value.
Meanwhile, B processes A’s first output, and it too pauses to wait for a second
input. C consumes the output of B and feeds a result to A, which can then resume
operation.
One way to solve this is to modify Machine::run
so that it can return if the
machine needs more input, rather than only if the machine hits the halt
instruction. We could then loop over the machines, running each until it gets
stuck, allowing each one to make more progress each time until they explicitly
halt. However, in a language with threads, we can run Machine::run
in parallel
without changing its logic at all, as long as we have an I/O implementation that
works in this situation.
We can immediately rule out Vec
as an appropriate option. We’re calling
Vec::remove
to fetch values from the input, and if the Vec
is empty,
this function panics, whereas we need a function that will block until there is
more data to be fetched. Vec
would also be inappropriate because of the
borrowing rules; we can’t have a Vec
that one machine writes to and another
one reads from, as that would require two &mut
references. We could make a
input Vec
for each machine and use closures to handle output, but this is a
lot of annoying boilerplate and there’s a better solution built into std
.
The mpsc
module (short for multiple producer, single consumer)
implements channels that can send values between threads. mpsc::channel
returns a Sender
and a Receiver
; Sender
can be shared between
threads, letting them send messages into the channel, and a single thread can
use the Receiver
to retrieve the sent messages. Importantly,
Receiver::recv
will block if the channel is empty, and wait for more
messages to arrive, so it’s perfect for making a thread wait until it has more
data to consume.
Let’s use this by creating a channel for each machine. Every Machine
will run
in its own thread, and use a Receiver
to retrieve input. To queue up input for
a machine, we use its corresponding Sender
, so for example machine A will need
the Sender
for machine B so it can send output to it, and machine C will need
the Sender
for machine A.
Below, we create three senders and receivers – mpsc::channel
returns
(Sender<T>, Receiver<T>)
, and Iterator::unzip
converts an iterator of
(A, B)
into the pair (Vec<A>, Vec<B>)
, so we get the senders and receivers
as two Vec
s. Then, we feed the initialisation parameters [3, 1, 2]
into the
three senders, and we feed the first data value 0
into the first sender, so
now all the initial inputs are queued up. Finally, we rotate_left
the
senders by one place, so that machine A’s sender is now last in the list, with
B’s first and C’s second – this will let us give each Machine
the Sender
for the machine next in the chain.
use std::sync::mpsc::{self, Receiver, Sender};
let (mut senders, receivers): (Vec<Sender<i64>>, Vec<Receiver<i64>>) =
(0..3).map(|_| mpsc::channel()).unzip();
for (&n, tx) in [3, 1, 2].iter().zip(&senders) {
tx.send(n).unwrap();
}
senders[0].send(0).unwrap();
senders.rotate_left(1);
Next, we can create the threads that run the machines. Iterating over the
paired-up senders and receivers, we create a new Machine
, and a variable we’ll
use to record the latest output of the machine. Inside a new thread
, we
run the machine, using the Receiver
to get input, and writing the output to
the last_sent
variable and to the Sender
for the next machine. Each thread
returns the last output that the machine produced, which in the puzzle is the
value we need to give as the solution.
use std::thread;
let mut threads = Vec::new();
for (tx, rx) in senders.into_iter().zip(receivers) {
let mut machine = Machine::new(&program);
let mut last_sent = 0;
threads.push(thread::spawn(move || {
machine.run_with_io(
|| rx.recv().unwrap(),
|x| {
last_sent = x;
tx.send(x).ok();
},
);
last_sent
}));
}
The machines now run in parallel, feeding input and output between each other.
Each machine will run to completion and the threads will exit, and so we can
join
all of them and collect their outputs. Getting the last value in
this list gives the final output of the last machine:
let result = threads.into_iter().flat_map(|t| t.join()).last().unwrap();
println!("{:?}", result);
// -> 3519891687
Once again, closures have let us get something working, but if we find ourselves
using mpsc
channels a lot in this system, it would be nice to make them
first-class input/output sources. Looking at the program above, all the input
closure does is fetch a value from a Receiver
, so we can make Receiver
an
Input
implementation and shorten that code a bit:
for (tx, rx) in senders.into_iter().zip(receivers) {
let mut machine = Machine::new(&program);
let mut last_sent = 0;
threads.push(thread::spawn(move || {
machine.run_with_io(rx, |x| {
last_sent = x;
tx.send(x).ok();
});
last_sent
}));
}
Here’s the implementation that makes this work: we turn the closure we’re using
into the get()
function for the Input
trait.
impl<T> Input<T> for Receiver<T> {
fn get(&mut self) -> T {
self.recv().unwrap()
}
}
The output closure is more complicated: it sends the value to a Sender
, and
writes it to a &mut i64
. The latter is already an Output
implementation, and
we could make Sender
one two, but that would only let us use one of these as
the output destination.
impl<T> Output<T> for Sender<T> {
fn put(&mut self, value: T) {
self.send(value).ok();
}
}
We now have two output destinations of different types, both of which implement
Output
; those types are &mut i64
and mpsc::Sender<i64>
. Rust has a type
that lets us make ad-hoc combinations of other types and treat the combination
as a type on its own: tuples. We can implement Output
for a pair where
both of its members are Output
, and then a pair of such values can work as a
single output handler.
impl<A, B, T> Output<T> for (A, B)
where
A: Output<T>,
B: Output<T>,
T: Clone,
{
fn put(&mut self, value: T) {
self.0.put(value.clone());
self.1.put(value.clone());
}
}
T
has to be Clone
for this to work, because unless T
is Copy
, we can’t
pass value
into two different functions – the first call would move it. This
impl
means we can now shrink our concurrent machine code down to:
for (tx, rx) in senders.into_iter().zip(receivers) {
let mut machine = Machine::new(&program);
let mut last_sent = 0;
threads.push(thread::spawn(move || {
machine.run_with_io(rx, (tx, &mut last_sent));
last_sent
}));
}
We could write more implementations for larger tuples, but we could also just
nest tuples if we want more output destinations. (A, (B, C))
is Output<T>
if
A
, B
and C
are all Output<T>
. It’s also tempting to use fixed-size
arrays here, but tuples let us combine several outputs of different types
whereas in an array all the outputs would need the same type. The existence of
these built-in types, plus the ability to add functionality to them via traits,
lets us write highly expressive code that be challenging to pull off effectively
in many untyped languages.
Let’s wrap up by looking at some patterns for extending this expressiveness to
an even wider set of types. One very widely used trait is Iterator
; it’s
implemented by a lot of types, most of which you will never refer to by name as
they’re just the internal structs that various iterator adaptors and consumers
use. It’s awfully tempting to use Iterator
as an input source, for example
would could set up the program above with an initial configuration value (3
)
followed by an infinite stream of the data values 5
, 6
and 7
, which would
let it run to completion without being connected to another machine.
let mut machine = Machine::new(&program);
let input = [3].iter().chain([5, 6, 7].iter().cycle()).cloned();
let mut output = 0;
machine.run_with_io(input, |x| println!("{}", x));
Maybe we could write a blanket implementation of Input<T>
for Iterator<Item =
T>
?
impl<I, T> Input<T> for I
where
I: Iterator<Item = T>,
{
fn get(&mut self) -> T {
self.next().unwrap()
}
}
Nope, that’s a coherence violation, we can’t implement for both F: FnMut() ->
T
and I: Iterator<Item = T>
.
error[E0119]: conflicting implementations of trait `Input<_>`:
|
| / impl<F, T> Input<T> for F
| | where
| | F: FnMut() -> T,
| | {
... |
| | }
| | }
| |_- first implementation here
...
| / impl<I, T> Input<T> for I
| | where
| | I: Iterator<Item = T>,
| | {
... |
| | }
| | }
| |_^ conflicting implementation
Although FnMut
is fundamental and so won’t be implemented for any new types,
Iterator
is not fundamental, in fact its value comes from being implemented
for a large and expanding set of types! We could easily imagine implementing it
for all FnMut() -> T
closures:
impl Iterator for F
where
F: FnMut() -> T
{
type Item = T;
fn next(&mut self) -> Option<Self::Item> {
Some(self())
}
}
So we can’t have both these blanket implementations. But, just as it’s common to
use a newtype wrapper to add new semantics to existing types, we can use a
converter function to turn an Iterator
into something else that implements
Input
, namely a closure:
fn iter<I, T>(mut iter: I) -> impl Input<T>
where
I: Iterator<Item = T>,
{
move || iter.next().unwrap()
}
Wrapping this iter()
function around our input Iterator
converts it into a
closure that implements Input<i64>
, so the program now compiles and runs.
let mut machine = Machine::new(&program);
let input = [3].iter().chain([5, 6, 7].iter().cycle()).cloned();
machine.run_with_io(iter(input), |x| println!("{}", x));
// -> prints: 36, 30, 36, 33, 24, 30
A similar pattern can be used to make Stdin
and Stdout
first-class
input/output values. Say we wanted to remove the println!
boilerplate and just
pass stdout itself as the output:
machine.run_with_io(iter(input), std::io::stdout());
It would be tempting to write a blanket implementation for Write
types:
use std::fmt::Display;
use std::io::Write;
impl<T, W> Output<T> for W
where
T: Display,
W: Write,
{
fn put(&mut self, value: T) {
writeln!(self, "{}", value).unwrap();
}
}
However, as Write
is not fundamental, this conflicts with the implementation
for F: FnMut(T)
. So, we either need to implement for Stdout
specifically, or
use the converter pattern above to turn any Write
value into a FnMut(T)
closure.
use std::fmt::Debug;
use std::io::{Stdout, Write};
impl<T> Output<T> for Stdout
where
T: Debug,
{
fn put(&mut self, value: T) {
writeln!(self, "{:?}", value).unwrap();
}
}
A similar pattern can be used to support Stdin
, or any Read
type where
the input type implements FromStr
.
The last trick I want to share is once for “chunking” output using an upcoming
Rust feature called const generics. Some of the Advent puzzles involve
machines that produce multi-value outputs, for example some machines in a
network that send pairs of co-ordinates to other machines by emitting the ID of
the target machine, and then the two values making up the co-ordinate. To
process this output we need to group it into threes and then process it. Const
generics give us a way of expressing this in the type system, so that we can
write a closure that accepts [i64; 3]
and automatically convert it into an
Output<i64>
type that does the chunking for us.
Let’s set up the program we’ve been using throughout this post, that takes a
configuration parameter and then six data values, producing six outputs. We’ll
use an mpsc
channel to feed its output back in as input; the configuration
parameter is 3
and 0
is the first data value. We’d like to see the output in
chunks of size 3
, and I’ve annotated the closure parameter with [_; 3]
to
express this.
let mut machine = Machine::new(&program);
let (tx, rx) = mpsc::channel();
tx.send(3).unwrap();
tx.send(0).unwrap();
let output = |values: [_; 3]| {
println!("{:?}", values);
};
machine.run_with_io(rx, (tx, chunked(output)));
// -> [21, 75, 240]
// -> [738, 2220, 6669]
This works by using a helper function called chunked
. It takes a type that
implements Output<[T; N]>
– for example our FnMut([_; 3])
closure – and
returns a closure that implements Output<T>
. It can do this conversion
automatically because of the type parameter const N: usize
; as long as N
can
be resolved to a constant value at compile time, we can use this as a type
parameter and use its value in the code. We allocate an array of size N
(note
we need to require [T; N]: Default
rather than just T: Default
), and a
cursor that contains the next offset at which we’ll store a value. The closure
we return writes a value at the current cursor position, and then increments the
cursor. If it’s wrapped around to zero, that means we’ve filled the buffer and
we can call the original closure with a copy of the buffer.
fn chunked<O, T, const N: usize>(mut output: O) -> impl Output<T>
where
O: Output<[T; N]>,
[T; N]: Default,
T: Clone,
{
let mut buffer: [T; N] = Default::default();
let mut cursor = 0;
move |value| {
buffer[cursor] = value;
cursor += 1;
if cursor == N {
cursor = 0;
output.put(buffer.clone());
}
}
}
Letting the length parameter of the buffer array be used dynamically in the code
means that if we want the output in different size chunks, all we need to do is
change the type of the closure parameter to, say, [_; 2]
:
let output = |values: [_; 2]| {
println!("{:?}", values);
};
machine.run_with_io(rx, (tx, chunked(output)));
// -> [21, 75]
// -> [240, 738]
// -> [2220, 6669]
If we use a value that means the buffer won’t be filled a second time, then we only see one chunk of output:
let output = |values: [_; 4]| {
println!("{:?}", values);
};
machine.run_with_io(rx, (tx, chunked(output)));
// -> [21, 75, 240, 738]
This new feature gives us another way to manipulate types that can prove very
powerful for automatically converting between different things. In general,
we’ve seen that Rust’s trait system lets us write code that makes very dynamic
use of types, even though everything is completely checked at compile time and
we don’t pay any runtime cost for deciding which fn
to call. In Ruby, it would
actually be harder to be this expressive because there’s no distinction
equivalent to Rust’s array, vector and tuple types. If we used a Ruby Array
as
output, it would be unclear if we meant to push output onto the end of it, or
treat all its existing members as output destinations themselves. We’d need to
invent a set of wrapper objects to express these differences, and pay a runtime
cost for indirecting function calls through those objects.
The other comparison this brought to mind is how, when Ruby launched, much was made of the ability to “re-open” classes: you can add new methods to built-in classes, in a way that was illegal in the dominant statically typed object-oriented language at the time, Java. The Ruby community (and the JavaScript community) eventually decided that modifying built-in classes (or any class you don’t own) is dangerous, as you can’t prevent multiple modules adding conflicting implementations of the same method. Eventually, Ruby added refinements, which let you add methods to other classes, but with limited scope, so you’re not affecting the entire program by doing this. Uptake of this feature has not been great, as really there are better design approaches in most cases where one would be tempted to use it.
With traits, Rust lets us add new behaviour for built-in types in a completely safe way, because the coherence rules mean it’s impossible for two crates to write conflicting implementations for the same function name. This lets us write very expressive code that doesn’t affect the behaviour of built-in types in other crates, and we don’t pay any runtime cost for this dynamism. In some cases it makes Rust feel more dynamic than many dynamic languages, and it led me to solutions I’d never have thought of in my usual languages.