A little polymorphism, part 2: channels, combinators and const generics

In part 1 of this article, we saw how we could use traits to make a function work with various built-in types, how matching traits to types works, and what happens when we violate the coherence rules. In this part we’ll look at some more types, letting us work with concurrency and different techniques for routing input and output. We’ll also look at const generics, an upcoming feature that lets us make better use of types that are parameterised over sizes.

Let’s go back to our exponentiating program. Conceptually, what we’ve done is we’ve chained a set of machines together, where each takes two inputs and produces one output. Some of the inputs are hard-coded (the power 3) while others are the outputs from other machines.

              3                 3                 3
              |                 |                 |
              v                 v                 v
        +-----------+     +-----------+     +-----------+
 2 ---->| pow(x, y) |---->| pow(x, y) |---->| pow(x, y) |----> result
        +-----------+     +-----------+     +-----------+

There’s nothing surprising about this, it’s what we do all the time when we compose functions in our programs. In this system, data flows in one direction and there are no cycles in the design. Things get more interesting when the output of one machine can loop back around and feed the input of a previous machine in the sequence:

              3                 3                 3
              |                 |                 |
              v                 v                 v
        +-----------+     +-----------+     +-----------+
 2 ---->| pow(x, y) |---->| pow(x, y) |---->| pow(x, y) |---+--> result
        +-----------+     +-----------+     +-----------+   |
              ^                                             |
              |                                             |
              +---------------------------------------------+

This situation does not have a simple translation to function composition – how can the result of one function be fed back into a call that produced one of its inputs? This problem is more analogous to a set of concurrent communicating processes; all the machines run in parallel, but at any time one can emit output that’s consumed as an input message by another.

One such system occurs on day 7, and the program below is structurally similar to the program used in that puzzle. I have listed the program broken down into instructions, and various data regions, labelled with the start address of each line and a high-level description of what that line does.

let program = [
    3, 28,            //  0     #28 = input()
    1, 45, 28, 28,    //  2     |
    2, 49, 28, 28,    //  6     | #28 = 50 + 6 * (#28 - 1)
    1, 70, 28, 28,    // 10     |
    1, 49, 72, 72,    // 14     #72 = 6

    6, 72, 69,        // 18     if #72 == 0 then goto 44
    1, 45, 72, 72,    // 21     #72 = #72 - 1
    3, 71,            // 25     #71 = input()
    1, 0, 71, 71,     // 27     #71 = #71 + #(#28)
    2, 46, 71, 71,    // 31     #71 = #71 * 3
    4, 71,            // 35     output(#71)
    1, 47, 28, 28,    // 37     #28 = #28 + 1
    6, 48, 68,        // 41     goto 18

    99,               // 44     halt

    -1, 3, 1, 0, 6,   // 45     constants

    5, 2, 7, 4, 3, 6, // 50     |
    2, 3, 6, 5, 4, 7, // 56     | addition parameters
    7, 4, 5, 6, 2, 3, // 62     |

    18, 44, 50,       // 68     jump addresses
    0, 0,             // 71     variables
];

The main body of this program (beginning at 18) is a loop. It iterates six times; the counter is stored in #72 and initialised by the instruction at 14. On each turn of the loop, it fetches an input value (storing it in #71), adds some number from 2 to 7 to it, multiplies the result by 3, and then emits the final value as output. The values it adds on each turn are given by one of the sequences beginning at 50, 56 and 62. The input instruction at the start of the program fetches an input, which should be 1, 2 or 3, into #28, which stores the address of the number to be added to the loop input. The instructions at 2, 6 and 10 manipulate this first input so that #28 points to the start of one of the addition parameter blocks, and the instruction at 37 increments #28 so it points to the next value in the sequence.

So in all this program takes seven inputs – one configuration variable and six data values – and emits six outputs. In the puzzle, several machines running this program are connected in a loop, so that output from one flows as input into the next. Each machine is first fed its initialisation parameter (here 1, 2 or 3), and then the first machine is fed the value 0, which causes it to do some work and emit an output value. That output flows into the second machine, and so the cycle continues, until each machine has processed six inputs and stops.

                3                 1                 2
                |                 |                 |
                v                 v                 v
          +-----------+     +-----------+     +-----------+
 0 ---+-->| machine A |---->| machine B |---->| machine C |---+--> result
      |   +-----------+     +-----------+     +-----------+   |
      |                                                       |
      +---------------------------<---------------------------+

The number of inputs and outputs is intended to be opaque from the outside; the real Advent program contains ten distinct sub-routines selected by the first input, where the first five have a single input and output and the latter five take and return ten values each. A later puzzle has the machines connected in a network where one can send a message to any other, so the data flow pattern is unknown in advance.

In this situation, we can’t simply run machine A to completion and then feed its outputs into machine B, because there’s a feedback loop: machine A’s inputs depend on its outputs. If we feed machine A the initialisation parameter 3 and then the value 0, it will complete one turn of the processing loop and then either get stuck or crash, because it wants more input than we’ve provided. Instead we have to run the machines concurrently: it should be possible for A to process its first input, and then pause when it requests a second value. Meanwhile, B processes A’s first output, and it too pauses to wait for a second input. C consumes the output of B and feeds a result to A, which can then resume operation.

One way to solve this is to modify Machine::run so that it can return if the machine needs more input, rather than only if the machine hits the halt instruction. We could then loop over the machines, running each until it gets stuck, allowing each one to make more progress each time until they explicitly halt. However, in a language with threads, we can run Machine::run in parallel without changing its logic at all, as long as we have an I/O implementation that works in this situation.

We can immediately rule out Vec as an appropriate option. We’re calling Vec::remove to fetch values from the input, and if the Vec is empty, this function panics, whereas we need a function that will block until there is more data to be fetched. Vec would also be inappropriate because of the borrowing rules; we can’t have a Vec that one machine writes to and another one reads from, as that would require two &mut references. We could make a input Vec for each machine and use closures to handle output, but this is a lot of annoying boilerplate and there’s a better solution built into std.

The mpsc module (short for multiple producer, single consumer) implements channels that can send values between threads. mpsc::channel returns a Sender and a Receiver; Sender can be shared between threads, letting them send messages into the channel, and a single thread can use the Receiver to retrieve the sent messages. Importantly, Receiver::recv will block if the channel is empty, and wait for more messages to arrive, so it’s perfect for making a thread wait until it has more data to consume.

Let’s use this by creating a channel for each machine. Every Machine will run in its own thread, and use a Receiver to retrieve input. To queue up input for a machine, we use its corresponding Sender, so for example machine A will need the Sender for machine B so it can send output to it, and machine C will need the Sender for machine A.

Below, we create three senders and receivers – mpsc::channel returns (Sender<T>, Receiver<T>), and Iterator::unzip converts an iterator of (A, B) into the pair (Vec<A>, Vec<B>), so we get the senders and receivers as two Vecs. Then, we feed the initialisation parameters [3, 1, 2] into the three senders, and we feed the first data value 0 into the first sender, so now all the initial inputs are queued up. Finally, we rotate_left the senders by one place, so that machine A’s sender is now last in the list, with B’s first and C’s second – this will let us give each Machine the Sender for the machine next in the chain.

use std::sync::mpsc::{self, Receiver, Sender};

let (mut senders, receivers): (Vec<Sender<i64>>, Vec<Receiver<i64>>) =
    (0..3).map(|_| mpsc::channel()).unzip();

for (&n, tx) in [3, 1, 2].iter().zip(&senders) {
    tx.send(n).unwrap();
}
senders[0].send(0).unwrap();
senders.rotate_left(1);

Next, we can create the threads that run the machines. Iterating over the paired-up senders and receivers, we create a new Machine, and a variable we’ll use to record the latest output of the machine. Inside a new thread, we run the machine, using the Receiver to get input, and writing the output to the last_sent variable and to the Sender for the next machine. Each thread returns the last output that the machine produced, which in the puzzle is the value we need to give as the solution.

use std::thread;

let mut threads = Vec::new();

for (tx, rx) in senders.into_iter().zip(receivers) {
    let mut machine = Machine::new(&program);
    let mut last_sent = 0;

    threads.push(thread::spawn(move || {
        machine.run_with_io(
            || rx.recv().unwrap(),
            |x| {
                last_sent = x;
                tx.send(x).ok();
            },
        );
        last_sent
    }));
}

The machines now run in parallel, feeding input and output between each other. Each machine will run to completion and the threads will exit, and so we can join all of them and collect their outputs. Getting the last value in this list gives the final output of the last machine:

let result = threads.into_iter().flat_map(|t| t.join()).last().unwrap();
println!("{:?}", result);
// -> 3519891687

Once again, closures have let us get something working, but if we find ourselves using mpsc channels a lot in this system, it would be nice to make them first-class input/output sources. Looking at the program above, all the input closure does is fetch a value from a Receiver, so we can make Receiver an Input implementation and shorten that code a bit:

for (tx, rx) in senders.into_iter().zip(receivers) {
    let mut machine = Machine::new(&program);
    let mut last_sent = 0;

    threads.push(thread::spawn(move || {
        machine.run_with_io(rx, |x| {
            last_sent = x;
            tx.send(x).ok();
        });
        last_sent
    }));
}

Here’s the implementation that makes this work: we turn the closure we’re using into the get() function for the Input trait.

impl<T> Input<T> for Receiver<T> {
    fn get(&mut self) -> T {
        self.recv().unwrap()
    }
}

The output closure is more complicated: it sends the value to a Sender, and writes it to a &mut i64. The latter is already an Output implementation, and we could make Sender one two, but that would only let us use one of these as the output destination.

impl<T> Output<T> for Sender<T> {
    fn put(&mut self, value: T) {
        self.send(value).ok();
    }
}

We now have two output destinations of different types, both of which implement Output; those types are &mut i64 and mpsc::Sender<i64>. Rust has a type that lets us make ad-hoc combinations of other types and treat the combination as a type on its own: tuples. We can implement Output for a pair where both of its members are Output, and then a pair of such values can work as a single output handler.

impl<A, B, T> Output<T> for (A, B)
where
    A: Output<T>,
    B: Output<T>,
    T: Clone,
{
    fn put(&mut self, value: T) {
        self.0.put(value.clone());
        self.1.put(value.clone());
    }
}

T has to be Clone for this to work, because unless T is Copy, we can’t pass value into two different functions – the first call would move it. This impl means we can now shrink our concurrent machine code down to:

for (tx, rx) in senders.into_iter().zip(receivers) {
    let mut machine = Machine::new(&program);
    let mut last_sent = 0;

    threads.push(thread::spawn(move || {
        machine.run_with_io(rx, (tx, &mut last_sent));
        last_sent
    }));
}

We could write more implementations for larger tuples, but we could also just nest tuples if we want more output destinations. (A, (B, C)) is Output<T> if A, B and C are all Output<T>. It’s also tempting to use fixed-size arrays here, but tuples let us combine several outputs of different types whereas in an array all the outputs would need the same type. The existence of these built-in types, plus the ability to add functionality to them via traits, lets us write highly expressive code that be challenging to pull off effectively in many untyped languages.

Let’s wrap up by looking at some patterns for extending this expressiveness to an even wider set of types. One very widely used trait is Iterator; it’s implemented by a lot of types, most of which you will never refer to by name as they’re just the internal structs that various iterator adaptors and consumers use. It’s awfully tempting to use Iterator as an input source, for example would could set up the program above with an initial configuration value (3) followed by an infinite stream of the data values 5, 6 and 7, which would let it run to completion without being connected to another machine.

let mut machine = Machine::new(&program);
let input = [3].iter().chain([5, 6, 7].iter().cycle()).cloned();
let mut output = 0;

machine.run_with_io(input, |x| println!("{}", x));

Maybe we could write a blanket implementation of Input<T> for Iterator<Item = T>?

impl<I, T> Input<T> for I
where
    I: Iterator<Item = T>,
{
    fn get(&mut self) -> T {
        self.next().unwrap()
    }
}

Nope, that’s a coherence violation, we can’t implement for both F: FnMut() -> T and I: Iterator<Item = T>.

error[E0119]: conflicting implementations of trait `Input<_>`:
    |
    | / impl<F, T> Input<T> for F
    | | where
    | |     F: FnMut() -> T,
    | | {
...   |
    | |     }
    | | }
    | |_- first implementation here
...
    | / impl<I, T> Input<T> for I
    | | where
    | |     I: Iterator<Item = T>,
    | | {
...   |
    | |     }
    | | }
    | |_^ conflicting implementation

Although FnMut is fundamental and so won’t be implemented for any new types, Iterator is not fundamental, in fact its value comes from being implemented for a large and expanding set of types! We could easily imagine implementing it for all FnMut() -> T closures:

impl Iterator for F
where
    F: FnMut() -> T
{
    type Item = T;

    fn next(&mut self) -> Option<Self::Item> {
        Some(self())
    }
}

So we can’t have both these blanket implementations. But, just as it’s common to use a newtype wrapper to add new semantics to existing types, we can use a converter function to turn an Iterator into something else that implements Input, namely a closure:

fn iter<I, T>(mut iter: I) -> impl Input<T>
where
    I: Iterator<Item = T>,
{
    move || iter.next().unwrap()
}

Wrapping this iter() function around our input Iterator converts it into a closure that implements Input<i64>, so the program now compiles and runs.

let mut machine = Machine::new(&program);
let input = [3].iter().chain([5, 6, 7].iter().cycle()).cloned();

machine.run_with_io(iter(input), |x| println!("{}", x));
// -> prints: 36, 30, 36, 33, 24, 30

A similar pattern can be used to make Stdin and Stdout first-class input/output values. Say we wanted to remove the println! boilerplate and just pass stdout itself as the output:

machine.run_with_io(iter(input), std::io::stdout());

It would be tempting to write a blanket implementation for Write types:

use std::fmt::Display;
use std::io::Write;

impl<T, W> Output<T> for W
where
    T: Display,
    W: Write,
{
    fn put(&mut self, value: T) {
        writeln!(self, "{}", value).unwrap();
    }
}

However, as Write is not fundamental, this conflicts with the implementation for F: FnMut(T). So, we either need to implement for Stdout specifically, or use the converter pattern above to turn any Write value into a FnMut(T) closure.

use std::fmt::Debug;
use std::io::{Stdout, Write};

impl<T> Output<T> for Stdout
where
    T: Debug,
{
    fn put(&mut self, value: T) {
        writeln!(self, "{:?}", value).unwrap();
    }
}

A similar pattern can be used to support Stdin, or any Read type where the input type implements FromStr.

The last trick I want to share is once for “chunking” output using an upcoming Rust feature called const generics. Some of the Advent puzzles involve machines that produce multi-value outputs, for example some machines in a network that send pairs of co-ordinates to other machines by emitting the ID of the target machine, and then the two values making up the co-ordinate. To process this output we need to group it into threes and then process it. Const generics give us a way of expressing this in the type system, so that we can write a closure that accepts [i64; 3] and automatically convert it into an Output<i64> type that does the chunking for us.

Let’s set up the program we’ve been using throughout this post, that takes a configuration parameter and then six data values, producing six outputs. We’ll use an mpsc channel to feed its output back in as input; the configuration parameter is 3 and 0 is the first data value. We’d like to see the output in chunks of size 3, and I’ve annotated the closure parameter with [_; 3] to express this.

let mut machine = Machine::new(&program);
let (tx, rx) = mpsc::channel();

tx.send(3).unwrap();
tx.send(0).unwrap();

let output = |values: [_; 3]| {
    println!("{:?}", values);
};
machine.run_with_io(rx, (tx, chunked(output)));

// -> [21, 75, 240]
// -> [738, 2220, 6669]

This works by using a helper function called chunked. It takes a type that implements Output<[T; N]> – for example our FnMut([_; 3]) closure – and returns a closure that implements Output<T>. It can do this conversion automatically because of the type parameter const N: usize; as long as N can be resolved to a constant value at compile time, we can use this as a type parameter and use its value in the code. We allocate an array of size N (note we need to require [T; N]: Default rather than just T: Default), and a cursor that contains the next offset at which we’ll store a value. The closure we return writes a value at the current cursor position, and then increments the cursor. If it’s wrapped around to zero, that means we’ve filled the buffer and we can call the original closure with a copy of the buffer.

fn chunked<O, T, const N: usize>(mut output: O) -> impl Output<T>
where
    O: Output<[T; N]>,
    [T; N]: Default,
    T: Clone,
{
    let mut buffer: [T; N] = Default::default();
    let mut cursor = 0;

    move |value| {
        buffer[cursor] = value;
        cursor += 1;

        if cursor == N {
            cursor = 0;
            output.put(buffer.clone());
        }
    }
}

Letting the length parameter of the buffer array be used dynamically in the code means that if we want the output in different size chunks, all we need to do is change the type of the closure parameter to, say, [_; 2]:

let output = |values: [_; 2]| {
    println!("{:?}", values);
};
machine.run_with_io(rx, (tx, chunked(output)));

// -> [21, 75]
// -> [240, 738]
// -> [2220, 6669]

If we use a value that means the buffer won’t be filled a second time, then we only see one chunk of output:

let output = |values: [_; 4]| {
    println!("{:?}", values);
};
machine.run_with_io(rx, (tx, chunked(output)));

// -> [21, 75, 240, 738]

This new feature gives us another way to manipulate types that can prove very powerful for automatically converting between different things. In general, we’ve seen that Rust’s trait system lets us write code that makes very dynamic use of types, even though everything is completely checked at compile time and we don’t pay any runtime cost for deciding which fn to call. In Ruby, it would actually be harder to be this expressive because there’s no distinction equivalent to Rust’s array, vector and tuple types. If we used a Ruby Array as output, it would be unclear if we meant to push output onto the end of it, or treat all its existing members as output destinations themselves. We’d need to invent a set of wrapper objects to express these differences, and pay a runtime cost for indirecting function calls through those objects.

The other comparison this brought to mind is how, when Ruby launched, much was made of the ability to “re-open” classes: you can add new methods to built-in classes, in a way that was illegal in the dominant statically typed object-oriented language at the time, Java. The Ruby community (and the JavaScript community) eventually decided that modifying built-in classes (or any class you don’t own) is dangerous, as you can’t prevent multiple modules adding conflicting implementations of the same method. Eventually, Ruby added refinements, which let you add methods to other classes, but with limited scope, so you’re not affecting the entire program by doing this. Uptake of this feature has not been great, as really there are better design approaches in most cases where one would be tempted to use it.

With traits, Rust lets us add new behaviour for built-in types in a completely safe way, because the coherence rules mean it’s impossible for two crates to write conflicting implementations for the same function name. This lets us write very expressive code that doesn’t affect the behaviour of built-in types in other crates, and we don’t pay any runtime cost for this dynamism. In some cases it makes Rust feel more dynamic than many dynamic languages, and it led me to solutions I’d never have thought of in my usual languages.