Crafting ICMP-bearing IPv4 packets with the help of bitvec

👋 This page was last updated ~5 years ago. Just so you know.

So. Serializing IPv4 packets. Easy? Well, not exactly.

IPv4 was annoying to parse, because we had 3-bit integers, and 13-bit integers, and who knows what else. Serializing it is going to be exactly the same.

Right now, we don't have a way to serialize that.

Let's take the version and ihl fields, both of which are supposed to take 4 bits, together making a byte. We could serialize them like this:

// in `src/ipv4.rs`

impl Packet {
    pub fn serialize<'a, W: io::Write + 'a>(&'a self) -> impl cf::SerializeFn<W> + 'a {
        use cf::bytes::be_u8;
        // TODO: serialize dscp, ecn, etc.
        be_u8((u8::from(self.version) << 4) + u8::from(self.ihl))
    }
}

That would work, I think - let's try it out:

impl Packet {
    pub fn parse(i: parse::Input) -> parse::Result {
        let original_i = i;

        // omitted: actual parser
        // omitted: let res = Self { ... }

        use crate::blob::Blob;
        let serialized = cf::gen_simple(res.serialize(), Vec::new()).unwrap();
        println!("  original = {:?}", Blob::new(original_i));
        println!("serialized = {:?}", Blob::new(&serialized));

        Ok((i, res))
    }
}
$ cargo run --quiet
  original = [45 00 00 29 17 cf 40 00 80 06 00 00 c0 a8 01 10 2e 33 b3 5a + 21 bytes]
serialized = [45]
  original = [45 00 00 34 9f 03 40 00 27 06 11 7b 2e 33 b3 5a c0 a8 01 10 + 32 bytes]
serialized = [45]
  original = [45 00 00 47 5e e7 00 00 80 11 00 00 c0 a8 01 10 08 08 08 08 + 51 bytes]
serialized = [45]

Yeah, okay, that works.

It's not fun, though. We avoid doing bit twiddling by hand when parsing, and I'll be darned if we do it when serializing.

If only we had a way to deal with... bits.. as individual units. Like, what if we could have a slice of bits - no, a vector of bits. That we could extend to, subslice, etc.

Luckily, horn section revs up there is crate for that sax riff.

Introducing the bitvec crate. Which lets us do exactly that.

Let's take it for a spin.

$ cargo add bitvec
      Adding bitvec v0.16.0 to dependencies
// in `src/main.rs`

fn main() {
    use bitvec::prelude::*;
    let mut vec = BitVec::<BigEndian, u8>::new();
    let val = 4u8;
    vec.extend_from_slice(val.bits::<BigEndian>());
    println!("{:?}", vec);
}
$ cargo run --quiet
BitVec<BigEndian, u64> [00000100]

Okay, so we got bits. We only want the last 4 bits though.

Can we do that?

use bitvec::prelude::*;
let mut vec = BitVec::<BigEndian, u8>::new();
let val = 4u8;
vec.extend_from_slice(&val.bits::<BigEndian>()[4..]);
println!("{:?}", vec);
$ cargo run --quiet
BitVec<BigEndian, u64> [0100]

Hey, yeah!

Can.. can we do that twice?

use bitvec::prelude::*;
let mut vec = BitVec::<BigEndian, u8>::new();
let val = 4u8;
vec.extend_from_slice(&val.bits::<BigEndian>()[4..]);
let val = 5u8;
vec.extend_from_slice(&val.bits::<BigEndian>()[4..]);
println!("{:?}", vec);
$ cargo run --quiet
BitVec<BigEndian, u64> [01000101]

Yes, uh-huh, yep we can.

Can we get that back as an u8 slice though?

use bitvec::prelude::*;
let mut vec = BitVec::<BigEndian, u8>::new();
let val = 4u8;
vec.extend_from_slice(&val.bits::<BigEndian>()[4..]);
let val = 5u8;
vec.extend_from_slice(&val.bits::<BigEndian>()[4..]);
println!("{:?}", vec);
println!("{}", as_hex::AsHex(vec.as_slice()));
$ cargo run --quiet
BitVec<BigEndian, u8> [01000101]
45

Well then.

Let's make our bed and serialize in it, shall we?

How about a nice helper in these trying times:

// in `src/main.rs`

mod serialize;
// in `src/serialize.rs`

use bitvec::prelude::*;
use cookie_factory as cf;
use std::io;

pub type BitOutput = BitVec<BigEndian, u8>;

pub fn bits<W, F>(f: F) -> impl cf::SerializeFn<W>
where
    W: io::Write,
    F: Fn(&mut BitOutput),
{
    move |mut out: cf::WriteContext<W>| {
        let mut bo = BitOutput::new();
        f(&mut bo);

        io::Write::write(&mut out, bo.as_slice())?;
        Ok(out)
    }
}

Wonderful.

Next up: we'll want serialize methods on all those u3, u4, etc. types.

Let's start by adding a convenience method on BitOutput. Since BitOutput is just a type alias, not a newtype, we have to make our own trait, because of orphan rules.

Cool bear

Cool bear's hot tip

See Part 11 for further discussion on orphan rules.

pub trait WriteLastNBits {
    fn write_last_n_bits<B: Bits>(&mut self, b: B, num_bits: usize);
}

impl WriteLastNBits for BitOutput {
    fn write_last_n_bits<B: Bits>(&mut self, b: B, num_bits: usize) {
        let bitslice = b.bits::<BigEndian>();
        let start = bitslice.len() - num_bits;
        self.extend_from_slice(&bitslice[start..])
    }
}

Now let's take on u4, as an example:

pub trait BitSerialize {
    fn write(&self, b: &mut BitOutput);
}

use ux::*;

impl BitSerialize for u4 {
    fn write(&self, b: &mut BitOutput) {
        // more on the choice of `u16` later
        b.write_last_n_bits(u16::from(*self), 4);
    }
}

Cool, that was easy.

Can we use it in our serializer?

impl Packet {
    pub fn serialize<'a, W: io::Write + 'a>(&'a self) -> impl cf::SerializeFn<W> + 'a {
        use crate::serialize::{bits, BitSerialize};

        bits(move |bo| {
            self.version.write(bo);
            self.ihl.write(bo);
        })
    }
}

Looks like it!

Does it work?

$ cargo run --quiet
  original = [45 00 00 34 6d 79 00 00 35 06 20 b8 5d b8 d8 22 c0 a8 01 10 + 32 bytes]
serialized = [45]
  original = [45 00 00 28 9a 40 40 00 80 06 00 00 c0 a8 01 10 5d b8 d8 22 + 20 bytes]
serialized = [45]

That it does!

Let's industrialize BitSerialize using macros, the feature we all hate to love:

pub trait BitSerialize {
    fn write(&self, b: &mut BitOutput);
}

use ux::*;

macro_rules! impl_bit_serialize_for_ux {
    ($($width: expr),*) => {
        $(
            paste::item! {
                impl BitSerialize for [<u $width>] {
                    fn write(&self, b: &mut BitOutput) {
                        b.write_last_n_bits(u16::from(*self), $width);
                    }
                }
            }
        )*
    };
}

impl_bit_serialize_for_ux!(2, 3, 4, 6, 13);

Boom.

Serializing IPv4 is suddenly very easy.

Let's take care of Payload and Protocol first:

impl Payload {
    pub fn protocol(&self) -> Protocol {
        match self {
            Self::ICMP(_) => Protocol::ICMP,
            _ => unimplemented!(),
        }
    }

    pub fn serialize<'a, W: io::Write + 'a>(&'a self) -> impl cf::SerializeFn<W> + 'a {
        move |out| match self {
            Self::ICMP(ref icmp) => icmp.serialize()(out),
            _ => unimplemented!(),
        }
    }
}

impl Protocol {
    pub fn serialize<'a, W: io::Write + 'a>(&'a self) -> impl cf::SerializeFn<W> + 'a {
        use cf::bytes::be_u8;
        // note: `Protocol` needs to derive Clone and Copy for this
        be_u8(*self as u8)
    }
}

And now the meat of it:

impl Packet {
    pub fn serialize_no_checksum<'a, W: io::Write + 'a>(&'a self) -> impl cf::SerializeFn<W> + 'a {
        use crate::serialize::{bits, BitSerialize};
        use cf::{
            bytes::{be_u16, be_u8},
            sequence::tuple,
        };

        tuple((
            bits(move |bo| {
                // hard-code these, so we don't have to fill them out when
                // building new IPv4 packets
                let version = u4::new(4);
                let ihl = u4::new(5);
                let dscp = u6::new(0);
                let ecn = u2::new(0);

                version.write(bo);
                ihl.write(bo);
                dscp.write(bo);
                ecn.write(bo);
            }),
            be_u16(0), // length, to fill later
            be_u16(self.identification),
            bits(move |bo| {
                // again, hard-code these, we don't care about fragmentation
                // right now
                let flags = u3::new(0);
                let fragment_offset = u13::new(0);

                flags.write(bo);
                fragment_offset.write(bo);
            }),
            be_u8(self.ttl),
            // we need to do this to avoid capturing a temporary
            move |out| self.payload.protocol().serialize()(out),
            be_u16(0), // checksum, to fill later
            self.src.serialize(),
            self.dst.serialize(),
            self.payload.serialize(),
        ))
    }
}

Looking real good.

Let's compare notes:

// in `src/ipv4.rs`

impl Packet {
    pub fn parse(i: parse::Input) -> parse::Result<Self> {
        let original_i = i;

        // cut: meat of the parser

        if let Payload::ICMP(_) = res.payload {
            use crate::as_hex::AsHex;
            let serialized = cf::gen_simple(res.serialize(), Vec::new()).unwrap();
            println!("  original = {}", AsHex(original_i));
            println!("serialized = {}", AsHex(&serialized));
        }

        Ok((i, res))
    }
}

Running this, we get lines like these:

$ cargo run --quiet
(cut)
  original = 45 00 00 3c 00 00 00 00 36 01 b2 f9 08 08 08 08 c0 a8 01 10 00 00 45 2d 00 01 10 2e 61 62 63 64 65 66 67 68 69 6a 6b 6c 6d 6e 6f 70 71 72 73 74 75 76 77 61 62 63 64 65 66 67 68 69
serialized = 45 00 00 00 00 00 00 00 36 01 00 00 08 08 08 08 c0 a8 01 10 00 00 45 2d 00 01 10 2e 61 62 63 64 65 66 67 68 69 6a 6b 6c 6d 6e 6f 70 71 72 73 74 75 76 77 61 62 63 64 65 66 67 68 69

We can see that the original has 3c, whereas serialized has 00 - that's the length field. The original has a checksum of b2 f9, ours is empty.

Let's fill those out. The plan is the same as icmp::Packet::serialize - we make another function that fills in the missing fields.

// in `src/ipv4.rs`


impl Packet {
    // omitted: `fn serialize_no_checksum()`

    pub fn serialize<'a, W: io::Write + 'a>(&'a self) -> impl cf::SerializeFn<W> + 'a {
        use cf::{bytes::le_u16, combinator::slice};

        move |out| {
            let mut buf = cf::gen_simple(self.serialize_no_checksum(), Vec::new())?;

            // fill in length
            let length = buf.len() as u16;
            cf::gen_simple(le_u16(length), &mut buf[2..])?;

            // fill in checksum
            // it's important to do this last, because the length is
            // part of the checksum
            let checksum = crate::ipv4::checksum(&buf);
            cf::gen_simple(le_u16(checksum), &mut buf[10..])?;

            slice(buf)(out)
        }
    }
}

Let's try this again:

$ cargo run --quiet
  original = 45 00 00 3c 00 00 00 00 36 01 b2 f9
serialized = 45 00 3c 00 00 00 00 00 36 01 77 35
                   ^^ ^^                   ^^ ^^

Uh oh, we got both of these wrong somehow.

Cool bear

Cool bear's hot tip

If you want, take a minute and try to figure out what we messed up.

I'll wait.

To pad out the page (and avoid spoilers), let me tell you a few evil things we could do with a low-level ICMP implementation like ours:

First, we could try to ping every IP address in a subnet to find out which hosts are live. Why? Well, once we have a list of potential attack targets, we can start finding out if they have any UDP or TCP ports open, on which services are served, that could have potential vulnerabilities.

Second, we could just a lot of ICMP echo requests very quickly. Ideally, we'd do that from many different hosts. The idea is that the target would eventually no longer be able to handle sending responses - it would saturate its CPU, its network link, or both. That's a pretty simple Denial-of-service attack (aka DoS) right there.

Third, you know how we're able to specify an arbitrary payload (of reasonable length) in the ICMP echo request? And the host is supposed to reply with that same payload? Well it's our implementation - we can do anything we want. So we might have a host we control that stuffs replies in that payload. That way, we could implement IP over ICMP. This could be used to bypass WiFi paywalls - although it would be significantly slower than regular IP over Ethernet. But don't take it from me, give it a shot.

Fourth, we could (try to) use ICMP redirect messages to pull off a Man-in-the-middle attack (aka MITM). We'd pretend to be a router that says "hey, there's a better route to that host, you should totally send it to X.Y.Z.W instead", and that address is a host we control. This would probably be of limited use, as most network traffic nowadays is secured via protocols like TLS, but still fun.

Consequently, ICMP is often at least partially disabled for security reasons. To learn more, read Drew Branch's article, from which this list is paraphrased.

Okay, did you figure out what we got wrong?

First off, we used le_u16 to write the "total length" field. Because we copied the code used to write the checksum - also a 16-bit integer. But for the internet checksum, we need to write it as little-endian since it was calculated as little-endian. The length, however, needs to be written as big-endian, like all the IPv4 fields.

That explains why we got 3c 00 instead of 00 3c

Secondly, we computed a checksum over the whole packet. But the field is called "header checksum" for a reason.

Let's fix both of these:

// in `src/ipv4.rs`

impl Packet {
    // omitted: `fn serialize_no_checksum()`

    pub fn serialize<'a, W: io::Write + 'a>(&'a self) -> impl cf::SerializeFn<W> + 'a {
        use cf::{
            bytes::{be_u16, le_u16},
            combinator::slice,
        };

        move |out| {
            let mut buf = cf::gen_simple(self.serialize_no_checksum(), Vec::new())?;

            // fill in length - as big-endian
            let length = buf.len() as u16;
            cf::gen_simple(be_u16(length), &mut buf[2..])?;

            // fill in header checksum
            // note: this will break if we ever allow IP options.
            // luckily this is out of scope now.
            let header_slice = &buf[..5 * 4];
            let checksum = crate::ipv4::checksum(header_slice);
            cf::gen_simple(le_u16(checksum), &mut buf[10..])?;

            slice(buf)(out)
        }
    }
}

Let's run it once again - I've split the output over multiple lines so it's easier to compare:

orig | 45 00 00 3c 00 00 00 00 36 01 b2 f9 08 08 08 08
serd | 45 00 00 3c 00 00 00 00 36 01 b2 f9 08 08 08 08
       ^^    ^^^^^ ^^^^^ ^^^^^ ^^ ^^ ^^^^^ ^^^^^^^^^^^
       ipv4  length  |   flags TTL | sum   8.8.8.8
       ihl=5         ident         proto=icmp

orig | c0 a8 01 10 00 00 45 17 00 01 10 44 61 62 63 64
serd | c0 a8 01 10 00 00 45 17 00 01 10 44 61 62 63 64
       ^^^^^^^^^^^ ^^^^^ ^^^^^ ^^^^^ ^^^^^ ^^^^^^^^^^^
      192.168.1.16 |     sum   |     seqnb payload "abcd"
                   echo reply  ident

orig | 65 66 67 68 69 6a 6b 6c 6d 6e 6f 70 71 72 73 74
serd | 65 66 67 68 69 6a 6b 6c 6d 6e 6f 70 71 72 73 74
       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
                     "efghijklmnopqrst"

orig | 75 76 77 61 62 63 64 65 66 67 68 69
serd | 75 76 77 61 62 63 64 65 66 67 68 69
       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
                 "uvwabcdefghi"

Y'all, that looks really good.

It's time. The checks are summed and the bits are twiddled. We took our sweet, sweet time to arrive to this point. Our beast of a codebase is growling, impatient to be unleashed onto the greater internet.

Kill ping -t 8.8.8.8 - we won't be needing it anymore.

First, some ICMP helpers:

// in `src/icmp.rs`

impl Echo {
    pub fn as_echo_request<P: AsRef<[u8]>>(self, payload: P) -> Packet {
        Packet {
            typ: Type::EchoRequest,
            checksum: 0,
            header: Header::EchoRequest(self),
            payload: Blob::new(payload.as_ref()),
        }
    }
}

impl Packet {
    pub fn as_ipv4_payload(self) -> crate::ipv4::Payload {
        crate::ipv4::Payload::ICMP(self)
    }
}

Next, some IPv4 helpers:

$ cargo add rand
      Adding rand v0.7.2 to dependencies
// in `src/ipv4.rs`

impl Addr {
    pub fn zero() -> Self {
        Self([0, 0, 0, 0])
    }
}

impl Default for Packet {
    fn default() -> Self {
        Self {
            length: 0,
            identification: rand::random(),
            version: u4::new(4),
            ihl: u4::new(5),
            dscp: u6::new(0),
            ecn: u2::new(0),
            flags: u3::new(0),
            fragment_offset: u13::new(0),
            ttl: 128,
            protocol: None,
            checksum: 0,
            src: Addr::zero(),
            dst: Addr::zero(),
            payload: Payload::Unknown,
        }
    }
}

impl Payload {
    pub fn as_packet(self, src: Addr, dst: Addr) -> Packet {
        Packet {
            protocol: Some(self.protocol()),
            payload: self,
            src,
            dst,
            ..Default::default()
        }
    }
}

impl Packet {
    pub fn as_ethernet_payload(self) -> crate::ethernet::Payload {
        crate::ethernet::Payload::IPv4(self)
    }
}

Finally, let's send an echo request:

// in `src/main.rs`

fn make_queries(
    iface: &dyn rawsock::traits::DynamicInterface,
    nic: &netinfo::NIC,
    pending: &Mutex<PendingQueries>,
) {
    let gateway_ip = nic.gateway;

    let (tx, rx) = mpsc::channel();
    pending.lock().unwrap().arp.insert(gateway_ip, tx);

    arp::Packet::request(nic, gateway_ip)
        .as_ethernet_payload()
        .as_broadcast_frame(nic)
        .send(iface);

    let gateway_mac = rx.recv().unwrap();
    println!("gateway MAC: {:?}", gateway_mac);

    // send an echo request to 8.8.8.8
    icmp::Echo {
        identifier: 0xC0DE,
        sequence_number: 0xFACE,
    }
    .as_echo_request("I'm a little teapot".as_bytes())
    .as_ipv4_payload()
    .as_packet(nic.address, ipv4::Addr([8, 8, 8, 8]))
    .as_ethernet_payload()
    .as_frame(nic, gateway_mac)
    .send(iface);
}

And now our big moment:

$ cargo run --quiet
Using NIC {
    guid: "{0E89380B-814A-48FC-86C4-5C51B8040CB2}",
    gateway: 192.168.1.254,
    address: 192.168.1.16,
    phy_address: F4-D1-08-0B-7E-BC,
}
gateway MAC: 14-0C-76-6A-71-BD
The application panicked (crashed).
Message:  not yet implemented
Location: src\ethernet.rs:120

Backtrace omitted. Run with RUST_BACKTRACE=1 environment variable to display it.
Run with RUST_BACKTRACE=full to include source snippets.

Oh, ah, uh, just one second:

// in `src/ethernet.rs`

impl Payload {
    pub fn serialize<'a, W: io::Write + 'a>(&'a self) -> impl cf::SerializeFn<W> + 'a {
        use cf::sequence::tuple;
        move |out| match self {
            Self::ARP(ref packet) => tuple((EtherType::ARP.serialize(), packet.serialize()))(out),
            // that's the line:
            //                  |
            //                  v
            Self::IPv4(_) => unimplemented!(),
            Self::Unknown => unimplemented!(),
        }
    }
}

Let's fix that real quick:

// in `src/ethernet.rs`

impl Payload {
    pub fn serialize<'a, W: io::Write + 'a>(&'a self) -> impl cf::SerializeFn<W> + 'a {
        use cf::sequence::tuple;
        move |out| match self {
            Self::ARP(ref packet) => tuple((EtherType::ARP.serialize(), packet.serialize()))(out),
            Self::IPv4(ref packet) => tuple((EtherType::IPv4.serialize(), packet.serialize()))(out),
            Self::Unknown => unimplemented!(),
        }
    }
}

Okay, now - for real - the moment we've all been waiting for:

$ cargo run --quiet
Using NIC {
    guid: "{0E89380B-814A-48FC-86C4-5C51B8040CB2}",
    gateway: 192.168.1.254,
    address: 192.168.1.16,
    phy_address: F4-D1-08-0B-7E-BC,
}
gateway MAC: 14-0C-76-6A-71-BD
The application panicked (crashed).
Message:  checksum() input size should be a multiple of 2 bytes
Location: src\ipv4.rs:311

Backtrace omitted. Run with RUST_BACKTRACE=1 environment variable to display it.
Run with RUST_BACKTRACE=full to include source snippets.

Okay look, networks are complicated okay, we're fourteen articles in for crying out loud. You have to expect a few false starts.

So uh, what did we get wrong?

Well we always assumed that the input to the internet checksum would be a multiple of 2 bytes (and we made sure this assertion held by writing some code). And we were happy about it too, because the IPv4 header is always a multiple of 4 bytes, since its length is specified in "32-bit words".

But ICMP packets can have odd lengths, as we just found out.

What does RFC 792 say about that case??

Checksum

The checksum is the 16-bit ones's complement of the one's complement sum of the ICMP message starting with the ICMP Type.

For computing the checksum, the checksum field should be zero.

If the total length is odd, the received data is padded with one octet of zeros for computing the checksum.

This checksum may be replaced in the future.

Ah. Let's see. Our internet checksum routine is currently this:

// in `src/ipv4.rs`

pub fn checksum(slice: &[u8]) -> u16 {
    let (head, slice, tail) = unsafe { slice.align_to::<u16>() };
    if !head.is_empty() {
        panic!("checksum() input should be 16-bit aligned");
    }
    if !tail.is_empty() {
        panic!("checksum() input size should be a multiple of 2 bytes");
    }

    fn add(a: u16, b: u16) -> u16 {
        let s: u32 = (a as u32) + (b as u32);
        if s & 0x1_00_00 > 0 {
            (s + 1) as u16
        } else {
            s as u16
        }
    }

    !slice.iter().fold(0, |x, y| add(x, *y))
}

Now, we don't really want to do something like this:

// in `src/ipv4.rs`

pub fn checksum(slice: &[u8]) -> u16 {
    if slice.len() % 2 != 0 {
        let mut v = Vec::from(slice);
        v.push(0);
        return checksum(&v[..]);
    }

    // omitted: rest of checksum
}

Although that is the spirit of the RFC, and I did say on multiple occasions that we don't really care about performance, it seems silly to allocate a whole buffer just for that.

Instead, we can think about this: if our last byte is 0xFF, padding it with a null byte would look like 0xFF 0x00. Our processor (we're doing all this on x86) is little-endian, and FF as a 16-bit little-endian integer is... 0xFF 0x00 as well.

TL;DR we can just do this:

// in `src/ipv4.rs`

pub fn checksum(slice: &[u8]) -> u16 {
    let (head, slice, tail) = unsafe { slice.align_to::<u16>() };
    if !head.is_empty() {
        panic!("checksum() input should be 16-bit aligned");
    }
    // don't check tail, we'll use it

    fn add(a: u16, b: u16) -> u16 {
        let s: u32 = (a as u32) + (b as u32);
        if s & 0x1_00_00 > 0 {
            (s + 1) as u16
        } else {
            s as u16
        }
    }

    !add(
        slice.iter().fold(0, |x, y| add(x, *y)),
        // add only item of tail (as an u16), or 0 (which is a no-op)
        tail.iter().next().map(|&x| x as u16).unwrap_or_default(),
    )
}

Okay, let's move on to the next error:

$ cargo run --quiet
Using NIC {
    guid: "{0E89380B-814A-48FC-86C4-5C51B8040CB2}",
    gateway: 192.168.1.254,
    address: 192.168.1.16,
    phy_address: F4-D1-08-0B-7E-BC,
}
gateway MAC: 14-0C-76-6A-71-BD
(192.168.1.16) => (8.8.8.8) | Packet {
    typ: EchoRequest,
    header: EchoRequest(Echo { identifier: c0de, sequence_number: face }),
    payload: [49 27 6d 20 61 20 6c 69 74 74 6c 65 20 74 65 61 70 6f 74],
}
(8.8.8.8) => (192.168.1.16) | Packet {
    typ: EchoReply,
    header: EchoReply(Echo { identifier: c0de, sequence_number: face }),
    payload: [49 27 6d 20 61 20 6c 69 74 74 6c 65 20 74 65 61 70 6f 74],
}

Ok so the thing we got wrong here is..

..nothing? It worked?

IT WORKED.

Migrating sup to ersatz

The sup crate was our take on ping, using the Win32 ICMP APIs. Now that we have our own network stack, we can just use that!

We'll have to think about how to package ersatz as a proper library though.

I'm thinking that:

  • The ARP part should be completely abstracted away
  • We should have an easy way to send IP packets to a destination
  • We should have a way to wait for a packet that satisfies a specific filter.

In short, I want an API like that:

// fictional code
use ersatz::icmp::Echo;
use std::{time::Duration, process};

fn main() {
    let iface = ersatz::Interface::open_default();
    let addr = "8.8.8.8".parse().unwrap();

    for sequence_number in 0..3 {
        let echo_request = Echo {
            identifier: 0xC0DE,
            sequence_number,
        }
        .as_echo_request("I'm a little teapot".as_bytes())
        .as_ipv4_payload();

        // rx is an std::sync::mpsc::Receiver<ersatz::ipv4::Packet>
        let rx = iface.expect_ipv4(|packet| {
            // look for an ICMP echo reply packet from 'addr',
            // with identifier 0xC0DE and the right sequence number
            if is_the_one_we_want(packet) {
                return Some(packet)
            }

            None
        });
        iface.send_ipv4(echo_request, addr).unwrap();

        match res.recv_timeout(Duration::from_secs(3)) {
            Ok(packet) => {
                // print "Reply from {:?}: bytes={}, etc.
            }
            Err(_) => {
                println!("Timed out!")
                process::exit(1);
            },
        }
    }
}

Can we pull that off?

First, let's add ersatz as a dependency of sup:

# in Cargo.toml
# omitted: [package]

[dependencies]
memoffset = "0.5.1"
pretty-hex = "0.1.1"
once_cell = "1.2.0"

# new!
ersatz = { path = "../ersatz" }

We can then start with a very basic main:

// in `sup/src/main.rs`

fn main() -> Result<(), Box<dyn Error>> {
    Ok(())
}
$ cargo run
   Compiling sup v0.1.0 (C:\msys64\home\amos\ftl\sup)
error[E0433]: failed to resolve: use of undeclared type or module `ersatz`
 --> src\main.rs:8:17
  |
8 |     let iface = ersatz::default_iface();
  |                 ^^^^^^ use of undeclared type or module `ersatz`

error: aborting due to previous error

Ah. Well, although we did add ersatz to our Cargo.toml, it's a binary right now - not an executable. Let's change that by:

  • Renaming ersatz/src/main.rs to ersatz/src/lib.rs
  • Making all modules public
// in `ersatz/src/lib.rs`

pub mod arp;
pub mod as_hex;
pub mod blob;
pub mod error;
pub mod ethernet;
pub mod icmp;
pub mod ipv4;
pub mod loadlibrary;
pub mod netinfo;
pub mod parse;
pub mod serialize;
pub mod vls;

Let's try this again:

$ cargo run
   Compiling ersatz v0.1.0 (C:\msys64\home\amos\ftl\ersatz)
   Compiling sup v0.1.0 (C:\msys64\home\amos\ftl\sup)
error[E0433]: failed to resolve: could not find `Interface` in `ersatz`
 --> src\main.rs:8:25
  |
8 |     let iface = ersatz::Interface::open_default().unwrap();
  |                         ^^^^^^^^^ could not find `Interface` in `ersatz`

error: aborting due to previous error

Okay, better. Let's prototype what our Interface type should look like:

use std::sync::{mpsc, Arc, Mutex};

pub struct Interface {
    // some fields go here, probably `nic`, `gateway_mac`,
    // our `pending` queue and whatever else we need
}

impl Interface {
    pub fn open_default() -> Result<Self, error::Error> {
        unimplemented!()
    }

    pub fn send_ipv4(
        &self,
        payload: ipv4::Payload,
        addr: &ipv4::Addr,
    ) -> Result<(), error::Error> {
        unimplemented!()
    }

    pub fn expect_ipv4<F, T>(&self, f: F) -> mpsc::Receiver<T>
    where
        F: Fn(&ipv4::Packet) -> Option<T>,
    {
        unimplemented!()
    }
}

That looks reasonable.

Let's start with the easiest bit: We want to be able to send IPv4 packets from our own IP address - those need to be coated in an Ethernet frame, sent from our MAC to the Gateway's MAC.

All our as_xxx() helpers take ownership of self, so we'll take an ipv4::Payload, not a reference:

// in `ersatz/src/lib.rs`

impl Interface {
    pub fn send_ipv4(&self, payload: ipv4::Payload, addr: &ipv4::Addr) -> Result<(), error::Error> {
        payload
            .as_packet(self.nic.address, addr.clone())
            .as_ethernet_payload()
            .as_frame(&self.nic, self.gateway_mac)
            .send(self.iface.as_ref())?;
        self.iface.as_ref().flush();
        Ok(())
    }
}

Looks good. For us to be able to do addr.clone(), we need to derive Clone on ipv4::Addr. While we're in that file, let's also copy over the FromStr implementation from sup into ersatz, except this time we'll use the thiserror crate for ParseAddrError:

// in `ersatz/src/ipv4.rs`

// new: Clone!
#[derive(PartialEq, Eq, Clone, Copy, Hash)]
pub struct Addr(pub [u8; 4]);

// omitted: custom Display, Debug implementations

use std::num::ParseIntError;
use thiserror::Error;

#[derive(Debug, Error)]
pub enum ParseAddrError {
    #[error("not enough parts")]
    NotEnoughParts,
    #[error("too many parts")]
    TooManyParts,
    #[error("one of the components is not an int {0:?}")]
    ParseIntError(#[from] ParseIntError),
}

impl std::str::FromStr for Addr {
    type Err = ParseAddrError;

    fn from_str(s: &str) -> Result<Self, ParseAddrError> {
        let mut tokens = s.split(".");

        let mut res = Self([0, 0, 0, 0]);
        for part in res.0.iter_mut() {
            *part = tokens
                .next()
                .ok_or(ParseAddrError::NotEnoughParts)?
                .parse()?
        }

        if let Some(_) = tokens.next() {
            return Err(ParseAddrError::TooManyParts);
        }

        Ok(res)
    }
}

Note that we'll have to derive(Clone) for a bunch of other types along the way - I'm not going to mention all of these by name. As far as I'm concerned, almost everything in ersatz should be cloneable.

Of course, at this point ersatz doesn't compile, because we're missing a few fields we knew we were going to need:

$ cargo run
error[E0609]: no field `nic` on type `&Interface`
   --> C:\msys64\home\amos\ftl\ersatz\src\lib.rs:109:29
    |
109 |             .as_packet(self.nic.address, addr.clone())
    |                             ^^^

error[E0609]: no field `nic` on type `&Interface`
   --> C:\msys64\home\amos\ftl\ersatz\src\lib.rs:111:29
    |
111 |             .as_frame(&self.nic, self.gateway_mac)
    |                             ^^^

error[E0609]: no field `gateway_mac` on type `&Interface`
   --> C:\msys64\home\amos\ftl\ersatz\src\lib.rs:111:39
    |
111 |             .as_frame(&self.nic, self.gateway_mac)
    |                                       ^^^^^^^^^^^

error[E0609]: no field `iface` on type `&Interface`
   --> C:\msys64\home\amos\ftl\ersatz\src\lib.rs:112:24
    |
112 |             .send(self.iface.as_ref())?;
    |                        ^^^^^

error[E0609]: no field `iface` on type `&Interface`
   --> C:\msys64\home\amos\ftl\ersatz\src\lib.rs:113:14
    |
113 |         self.iface.as_ref().flush();
    |              ^^^^^

nic and gateway_mac are easy. Interface should definitely own them:

// in `src/ersatz/lib.rs`

pub struct Interface {
    nic: netinfo::NIC,
    gateway_mac: ethernet::Addr,
}

But for iface, we need to think a little...

We know we want to let users of ersatz look at incoming ipv4 packets, until a condition of their choice is satisfied. In other words, we want to have a list of listeners that can unsubscribe by returning something.

// in `src/ersatz/lib.rs`

struct PendingQueries {
    arp: HashMap<ipv4::Addr, mpsc::Sender<ethernet::Addr>>,
    ipv4: Vec<Box<dyn Fn(&ipv4::Packet) -> bool>>,
}

I think it makes sense for PendingQueries to own those functions. They're also probably all going to have a different concrete type, hence the Box.

Just like before, when ersatz was a binary, we're going to have to start a thread that receives packets and processes them, so open_default() should look a little something like:

// in `src/ersatz/lib.rs`

impl Interface {
    pub fn open_default() -> Result<Self, error::Error> {
        let nic = netinfo::default_nic()?;
        let lib = open_best_library()?;
        let iface_name = format!(r#"\Device\NPF_{}"#, nic.guid);
        let iface = lib.open_interface(&iface_name)?;

        let pending = PendingQueries {
            arp: HashMap::new(),
            ipv4: Vec::new(),
        };

        std::thread::spawn(|| {
            iface.loop_infinite_dyn(&mut |packet| {
                // TODO: parse raw packet, filter only IPv4
                let packet: ipv4::Packet = unimplemented!();

                for f in pending.ipv4 {
                    if f(&packet) {
                        // TODO: remove from pending.ipv4
                    }
                }
            })
        });

        unimplemented!()
    }
}

So far so good, yes?

I mean at this point I'm a bit worried whether or not the thread is ever going to end but that seems like a problem for later so let's just continue and add an iface field to our Interface struct.

Let's see... the declaration of open_interface in rawsock looks like this:

// rawsock internals

pub trait Library {
    fn open_interface<'a>(
        &'a self,
        name: &str
    ) -> Result<Box<dyn DynamicInterface<'a> + 'a>, Error>;
}

Mhhh okay, so our interface is going to have a lifetime, that's fair - but it's in a Box right? So we can just do this:

// in `src/ersatzs/lib.rs`

pub struct Interface<'a> {
    nic: netinfo::NIC,
    gateway_mac: ethernet::Addr,
    iface: Box<dyn rawsock::traits::DynamicInterface<'a>>,
}

And then (forgetting about the thread for now):

// in `src/ersatzs/lib.rs`

impl<'a> Interface<'a> {
    pub fn open_default() -> Result<Self, error::Error> {
        let nic = netinfo::default_nic()?;
        let lib = open_best_library()?;
        let iface_name = format!(r#"\Device\NPF_{}"#, nic.guid);
        let iface = lib.open_interface(&iface_name)?;

        let res = Self {
            nic,
            gateway_mac: ethernet::Addr::zero(),
            iface,
        };
        Ok(res)
    }
}
$ cargo run
   Compiling ersatz v0.1.0 (C:\msys64\home\amos\ftl\ersatz)

error[E0495]: cannot infer an appropriate lifetime due to conflicting requirements
  --> C:\msys64\home\amos\ftl\ersatz\src\lib.rs:41:21
   |
41 |         let iface = lib.open_interface(&iface_name)?;
   |                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   |
note: first, the lifetime cannot outlive the lifetime 'a as defined on the impl at 36:6...
  --> C:\msys64\home\amos\ftl\ersatz\src\lib.rs:36:6
   |
36 | impl<'a> Interface<'a> {
   |      ^^
   = note: ...so that the types are compatible:
           expected dyn rawsock::traits::DynamicInterface<'a>
              found dyn rawsock::traits::DynamicInterface<'_>
   = note: but, the lifetime must be valid for the static lifetime...
   = note: ...so that the expression is assignable:
           expected std::boxed::Box<(dyn rawsock::traits::DynamicInterface<'a> + 'static)>
              found std::boxed::Box<dyn rawsock::traits::DynamicInterface<'a>>

Huh. No, that doesn't work at all.

But if I look at it sideways I can sort of understand what rustc is trying to tell us: we've seen from the signature of Library::open_interface that the returned interface's lifetime was bound to the library's lifetime. And we're dropping the library when returning from open_default().

Well - says the fool - that's easy! We'll just store lib and iface together in the same struct, so their lifetimes will definitely be tied:

// in `src/ersatzs/lib.rs`

pub struct Interface<'a> {
    nic: netinfo::NIC,
    gateway_mac: ethernet::Addr,
    // we're using "+ 'a" here to signify that not only should the trait
    // object implement "Library", it should also have a lifetime of "'a"
    lib: Box<dyn rawsock::traits::Library + 'a>,
    iface: Box<dyn rawsock::traits::DynamicInterface<'a> + 'a>,
}

impl<'a> Interface<'a> {
    pub fn open_default() -> Result<Self, error::Error> {
        let nic = netinfo::default_nic()?;
        let lib = open_best_library()?;
        let iface_name = format!(r#"\Device\NPF_{}"#, nic.guid);
        let iface = lib.open_interface(&iface_name)?;

        let res = Self {
            nic,
            gateway_mac: ethernet::Addr::zero(),
            lib,
            iface,
        };
        Ok(res)
    }
}

There, that's gotta work, I'm pretty sure:

$ cargo run
error[E0597]: `*lib` does not live long enough
   --> C:\msys64\home\amos\ftl\ersatz\src\lib.rs:42:21
    |
37  | impl<'a> Interface<'a> {
    |      -- lifetime `'a` defined here
...
42  |         let iface = lib.open_interface(&iface_name)?;
    |                     ^^^----------------------------
    |                     |
    |                     borrowed value does not live long enough
    |                     argument requires that `*lib` is borrowed for `'a`
...
134 |     }
    |     - `*lib` dropped here while still borrowed

error[E0505]: cannot move out of `lib` because it is borrowed
  --> C:\msys64\home\amos\ftl\ersatz\src\lib.rs:52:13
   |
37 | impl<'a> Interface<'a> {
   |      -- lifetime `'a` defined here
...
42 |         let iface = lib.open_interface(&iface_name)?;
   |                     -------------------------------
   |                     |
   |                     borrow of `*lib` occurs here
   |                     argument requires that `*lib` is borrowed for `'a`
...
52 |             lib,
   |             ^^^ move out of `lib` occurs here

error: aborting due to 2 previous errors

Oh. That's unfortunate.

Why doesn't that work?

It's actually rather simple.

The reason rawsock::traits::DynamicInterface's lifetime is tied to rawsock::traits::Library is because the former holds a reference to the latter. And the thing that lifetimes do is not track how long a value is valid at all - it tracks how long a value is valid at its current memory location.

lib starts out as a local, so it begins its existence somewhere on the stack:

Then we create iface, which has a pointer to lib - also on the stack:

And then we return a struct that contains both lib and iface... and the memory to which iface's internal pointer refers to no longer contains lib!

For this last diagram I've chosen to move iface first, conceptually - but we get a similar result if we move lib first:

So it's not a limitation of the borrow checker - it's the system working as intended, preventing us from causing a crash whenever the implementation of rawsock::traits::DynamicInterface will want to use its pointer to the implementation of rawsock::traits::Library.

One way to address that would be to have a design similar to rawsock's: first force the users of the ersatz crate to open a library, then hold on to it and use it to obtain an Interface.

But I liked our interface from earlier and I don't want to depart from it. So what we can do instead is have our Library be valid for 'static - the duration of the program. How do we do that? We can use once_cell!

// in `ersatz/src/lib.rs`

use once_cell::sync::Lazy;

static RAWSOCK_LIB: Lazy<Box<dyn rawsock::traits::Library>> =
    Lazy::new(|| open_best_library().unwrap());

Note that this will panic if we can't open the best library. If we wanted to return an error instead, we'd have to adjust our design slightly.

But for now, this works great - drop the lib from our Interface struct, and it compiles:

pub struct Interface<'a> {
    nic: netinfo::NIC,
    gateway_mac: ethernet::Addr,
    iface: Box<dyn rawsock::traits::DynamicInterface<'a> + 'a>,
}

impl<'a> Interface<'a> {
    pub fn open_default() -> Result<Self, error::Error> {
        let nic = netinfo::default_nic()?;
        let iface_name = format!(r#"\Device\NPF_{}"#, nic.guid);
        let iface = RAWSOCK_LIB.open_interface(&iface_name)?;

        let res = Self {
            nic,
            gateway_mac: ethernet::Addr::zero(),
            iface,
        };
        Ok(res)
    }
}

Now, we've sorta lost track of pending in all that jazz, and we definitely need it in expect_ipv4, so let's add it back:

pub struct Interface<'a> {
    nic: netinfo::NIC,
    gateway_mac: ethernet::Addr,
    iface: Box<dyn rawsock::traits::DynamicInterface<'a> + 'a>,
    pending: PendingQueries,
}

struct PendingQueries {
    arp: HashMap<ipv4::Addr, mpsc::Sender<ethernet::Addr>>,
    ipv4: Vec<Box<dyn Fn(&ipv4::Packet) -> bool>>,
}

impl<'a> Interface<'a> {
    pub fn open_default() -> Result<Self, error::Error> {
        let nic = netinfo::default_nic()?;
        let iface_name = format!(r#"\Device\NPF_{}"#, nic.guid);
        let iface = RAWSOCK_LIB.open_interface(&iface_name)?;

        let pending = PendingQueries {
            arp: HashMap::new(),
            ipv4: Vec::new(),
        };

        let res = Self {
            pending,
            nic,
            gateway_mac: ethernet::Addr::zero(),
            iface,
        };
        Ok(res)
    }
}

Good, good, this still compiles. Let's try implementing expect_ipv4.

Remember we settled on the following scheme:

  • ersatz users can pass a listener Fn that returns an Option<T>
  • expect_ipv4 returns a Receiver<T>, that receives val as soon as the listener returns Some(val)
  • pending contains a list of functions that return true when they want to unsubscribe.

It follows that we can't just stuff the listener Fn into pending's Vec, we'll have to make another closure inside expect_ipv4, like so:

impl Interface {
    pub fn expect_ipv4<F, T>(&self, f: F) -> mpsc::Receiver<T>
    where
        F: Fn(&ipv4::Packet) -> Option<T>,
    {
        let (tx, rx) = mpsc::channel();

        self.pending.ipv4.push(|packet| {
            match f(&packet) {
                Some(val) => {
                    tx.send(val).unwrap_or(()); // ignore send errors
                    true
                }
                None => false,
            }
        });
        rx
    }
}

I like the look of that!

$ cargo run --quiet
error[E0308]: mismatched types
   --> C:\msys64\home\amos\ftl\ersatz\src\lib.rs:155:32
    |
155 |           self.pending.ipv4.push(|packet| {
    |  ________________________________^
156 | |             match f(&packet) {
157 | |                 Some(val) => {
158 | |                     tx.send(val).unwrap_or(()); // ignore send errors
...   |
162 | |             }
163 | |         });
    | |_________^ expected struct `std::boxed::Box`, found closure
    |
    = note: expected type `std::boxed::Box<(dyn for<'r> std::ops::Fn(&'r ipv4::Packet) -> bool + 'static)>`
               found type `[closure@C:\msys64\home\amos\ftl\ersatz\src\lib.rs:155:32: 163:10 f:_, tx:_]`
    = note: for more on the distinction between the stack and the heap, read https://doc.rust-lang.org/book/ch15-01-box.html, https://doc.rust-lang.org/rust-by-example/std/box.html, and https://doc.rust-lang.org/std/boxed/index.html
help: store this in the heap by calling `Box::new`
    |
155 |         self.pending.ipv4.push(Box::new(|packet| {
156 |             match f(&packet) {
157 |                 Some(val) => {
158 |                     tx.send(val).unwrap_or(()); // ignore send errors
159 |                     true
160 |                 }
  ...

error: aborting due to previous error

Oh, right, of course, it's a Vec<Box<dyn Fn>>, not a Vec<dyn Fn> - we need to box that closure ourselves:

    pub fn expect_ipv4<F, T>(&self, f: F) -> mpsc::Receiver<T>
    where
        F: Fn(&ipv4::Packet) -> Option<T>,
    {
        let (tx, rx) = mpsc::channel();

        // new! Box::new(...)
        self.pending.ipv4.push(Box::new(|packet| {
            match f(&packet) {
                Some(val) => {
                    tx.send(val).unwrap_or(()); // ignore send errors
                    true
                }
                None => false,
            }
        }));
        rx
    }

It always feels nice to do one little mistake, have the compiler catch you, fix it, and then everything works. If it works the first time, it's kinda freaky - but one little mistake is good. Reassuring.

So now that we've fixed it, of course, everything works:

$ cargo run --quiet
error[E0310]: the parameter type `F` may not live long enough
   --> C:\msys64\home\amos\ftl\ersatz\src\lib.rs:155:32
    |
149 |       pub fn expect_ipv4<F, T>(&self, f: F) -> mpsc::Receiver<T>
    |                          - help: consider adding an explicit lifetime bound `F: 'static`...
...
155 |           self.pending.ipv4.push(Box::new(|packet| {
    |  ________________________________^
156 | |             match f(&packet) {
157 | |                 Some(val) => {
158 | |                     tx.send(val).unwrap_or(()); // ignore send errors
...   |
162 | |             }
163 | |         }));
    | |__________^
    |
note: ...so that the type `[closure@C:\msys64\home\amos\ftl\ersatz\src\lib.rs:155:41: 163:10 f:&F, tx:&std::sync::mpsc::Sender<T>]` will meet its required lifetime bounds
   --> C:\msys64\home\amos\ftl\ersatz\src\lib.rs:155:32
    |
155 |           self.pending.ipv4.push(Box::new(|packet| {
    |  ________________________________^
156 | |             match f(&packet) {
157 | |                 Some(val) => {
158 | |                     tx.send(val).unwrap_or(()); // ignore send errors
...   |
162 | |             }
163 | |         }));
    | |__________^

error[E0310]: the parameter type `T` may not live long enough
   --> C:\msys64\home\amos\ftl\ersatz\src\lib.rs:155:32
    |
149 |       pub fn expect_ipv4<F, T>(&self, f: F) -> mpsc::Receiver<T>
    |                             - help: consider adding an explicit lifetime bound `T: 'static`...
...
155 |           self.pending.ipv4.push(Box::new(|packet| {
    |  ________________________________^
156 | |             match f(&packet) {
157 | |                 Some(val) => {
158 | |                     tx.send(val).unwrap_or(()); // ignore send errors
...   |
162 | |             }
163 | |         }));
    | |__________^
    |
note: ...so that the type `[closure@C:\msys64\home\amos\ftl\ersatz\src\lib.rs:155:41: 163:10 f:&F, tx:&std::sync::mpsc::Sender<T>]` will meet its required lifetime bounds
   --> C:\msys64\home\amos\ftl\ersatz\src\lib.rs:155:32
    |
155 |           self.pending.ipv4.push(Box::new(|packet| {
    |  ________________________________^
156 | |             match f(&packet) {
157 | |                 Some(val) => {
158 | |                     tx.send(val).unwrap_or(()); // ignore send errors
...   |
162 | |             }
163 | |         }));
    | |__________^

error: aborting due to 2 previous errors

Ah, uh, not quite.

Why does it want the 'static lifetime here though? We sure didn't write it down anywhere.

Let's look at the definition of PendingQueries again:

struct PendingQueries {
    arp: HashMap<ipv4::Addr, mpsc::Sender<ethernet::Addr>>,
    ipv4: Vec<Box<dyn Fn(&ipv4::Packet) -> bool>>,
}

Ah, there it is - Box<T> without an explicit lifetime is equivalent to Box<T + 'static>.

We could try adding lifetimes everywhere?

struct PendingQueries<'a> {
    arp: HashMap<ipv4::Addr, mpsc::Sender<ethernet::Addr>>,
    ipv4: Vec<Box<dyn Fn(&ipv4::Packet) -> bool + 'a>>,
}

pub struct Interface<'a> {
    // omitted: other fields
    // new: <'a>
    pending: PendingQueries<'a>,
}

impl<'a> Interface<'a> {
    pub fn expect_ipv4<F, T>(&self, f: F) -> mpsc::Receiver<T>
    where
        F: Fn(&ipv4::Packet) -> Option<T> + 'a, // new: explicit lifetime here
        T: 'a, // and also here
    {
        let (tx, rx) = mpsc::channel();

        self.pending.ipv4.push(Box::new(|packet| {
            match f(&packet) {
                Some(val) => {
                    tx.send(val).unwrap_or(()); // ignore send errors
                    true
                }
                None => false,
            }
        }));
        rx
    }
}

And now, everything works:

$ cargo run --quiet
error[E0596]: cannot borrow `self.pending.ipv4` as mutable, as it is behind a `&` reference
   --> C:\msys64\home\amos\ftl\ersatz\src\lib.rs:156:9
    |
149 |     pub fn expect_ipv4<F, T>(&self, f: F) -> mpsc::Receiver<T>
    |                              ----- help: consider changing this to be a mutable reference: `&mut self`
...
156 |         self.pending.ipv4.push(Box::new(|packet| {
    |         ^^^^^^^^^^^^^^^^^ `self` is a `&` reference, so the data it refers to cannot be borrowed as mutable

Mh, fair enough, push mutates self.pending.ipv4, let's take &mut self:

    // was: &self
    pub fn expect_ipv4<F, T>(&mut self, f: F) -> mpsc::Receiver<T>
    where
        F: Fn(&ipv4::Packet) -> Option<T> + 'a,
        T: 'a,
    {
        // omitted: body
    }

And now, everything w..:

$ cargo run --quiet
error[E0373]: closure may outlive the current function, but it borrows `tx`, which is owned by the current function
   --> C:\msys64\home\amos\ftl\ersatz\src\lib.rs:156:41
    |
41  | impl<'a> Interface<'a> {
    |      -- lifetime `'a` defined here
...
156 |         self.pending.ipv4.push(Box::new(|packet| {
    |                                         ^^^^^^^^ may outlive borrowed value `tx`
...
159 |                     tx.send(val).unwrap_or(()); // ignore send errors
    |                     -- `tx` is borrowed here
    |

Errr okay, okay, you keep making good points rustc. We don't actually want our closure to borrow tx and f, we want it to take ownership of them. In other words, we want tx and f to move into that closure.

So we can make it a move closure:

    pub fn expect_ipv4<F, T>(&mut self, f: F) -> mpsc::Receiver<T>
    where
        F: Fn(&ipv4::Packet) -> Option<T> + 'a,
        T: 'a,
    {
        let (tx, rx) = mpsc::channel();

        // was: `|packet|`, now `move |packet|`
        self.pending.ipv4.push(Box::new(move |packet| {
            match f(&packet) {
                Some(val) => {
                    tx.send(val).unwrap_or(()); // ignore send errors
                    true
                }
                None => false,
            }
        }));
        rx
    }

Alright, FINALLY, this compiles, so the last piece - we should be easy - is to re-add that std::thread in Interface::open_default.

Surely that won't cause any problems:

impl<'a> Interface<'a> {
    pub fn open_default() -> Result<Self, error::Error> {
        let nic = netinfo::default_nic()?;
        let iface_name = format!(r#"\Device\NPF_{}"#, nic.guid);
        let iface = RAWSOCK_LIB.open_interface(&iface_name)?;

        let pending = PendingQueries {
            arp: HashMap::new(),
            ipv4: Vec::new(),
        };

        std::thread::spawn(move || {
            iface
                .loop_infinite_dyn(&mut |packet| {
                    let frame = match ethernet::Frame::parse(packet) {
                        Ok((_, frame)) => frame,
                        _ => return,
                    };

                    if let ethernet::Payload::IPv4(ref packet) = frame.payload {
                        let mut idx = None;
                        // try all listeners in order
                        for (i, f) in pending.ipv4.iter().enumerate() {
                            if f(&packet) {
                                // break if one wanted to unsubscribe
                                idx = Some(i);
                                break;
                            }
                        }
                        if let Some(idx) = idx {
                            // unsubscribe
                            pending.ipv4.remove(idx);
                        }
                    }
                })
                .unwrap();
        });

        let res = Self {
            pending,
            nic,
            gateway_mac: ethernet::Addr::zero(),
            iface,
        };
        Ok(res)
    }
}

Good! Well I'm not super happy about the unsubscribe logic but at least we finally have a proof of concept that compi...

$ cargo run
   Compiling ersatz v0.1.0 (C:\msys64\home\amos\ftl\ersatz)
error[E0277]: `dyn for<'r> std::ops::Fn(&'r ipv4::Packet) -> bool` cannot be sent between threads safely
   --> C:\msys64\home\amos\ftl\ersatz\src\lib.rs:52:9
    |
52  |         std::thread::spawn(move || {
    |         ^^^^^^^^^^^^^^^^^^ `dyn for<'r> std::ops::Fn(&'r ipv4::Packet) -> bool` cannot be sent between threads safely
    |
    = help: the trait `std::marker::Send` is not implemented for `dyn for<'r> std::ops::Fn(&'r ipv4::Packet) -> bool`
    = note: required because of the requirements on the impl of `std::marker::Send` for `std::ptr::Unique<dyn for<'r> std::ops::Fn(&'r ipv4::Packet) -> bool>`
    = note: required because it appears within the type `std::boxed::Box<dyn for<'r> std::ops::Fn(&'r ipv4::Packet) -> bool>`
    = note: required because of the requirements on the impl of `std::marker::Send` for `std::ptr::Unique<std::boxed::Box<dyn for<'r> std::ops::Fn(&'r ipv4::Packet) -> bool>>`
    = note: required because it appears within the type `alloc::raw_vec::RawVec<std::boxed::Box<dyn for<'r> std::ops::Fn(&'r ipv4::Packet) -> bool>>`
    = note: required because it appears within the type `std::vec::Vec<std::boxed::Box<dyn for<'r> std::ops::Fn(&'r ipv4::Packet) -> bool>>`
    = note: required because it appears within the type `PendingQueries<'_>`
    = note: required because it appears within the type `[closure@C:\msys64\home\amos\ftl\ersatz\src\lib.rs:52:28: 77:10 iface:std::boxed::Box<dyn rawsock::traits::DynamicInterface<'_>>, pending:PendingQueries<'_>]`

error: aborting due to previous error

Oh. Ohhh no. It doesn't compile at all.

It's telling us dyn for<'r> std::ops::Fn(&'r ipv4::Packet) -> bool cannot be sent between threads safely, and then something about std::marker::Send.

From the nomicon:

Send and Sync

Not everything obeys inherited mutability, though. Some types allow you to have multiple aliases of a location in memory while mutating it. Unless these types use synchronization to manage this access, they are absolutely not thread-safe. Rust captures this through the Send and Sync traits.

  • A type is Send if it is safe to send it to another thread.
  • A type is Sync if it is safe to share between threads (&T is Send).

Right. It doesn't matter that the actual Fn we're storing in PendingQueries right now can be safely sent to another thread - the problem is that, with our current types, the possibility of an Fn that cannot be safely sent to another thread may be stored in there.

And we are sending it to another thread, because we're using those function from a closure we pass to std::thread::spawn.

So, we can simply add a constraint that those Fns must be Send:

struct PendingQueries<'a> {
    arp: HashMap<ipv4::Addr, mpsc::Sender<ethernet::Addr>>,
    // new: "+ Send"
    ipv4: Vec<Box<dyn Fn(&ipv4::Packet) -> bool + Send + 'a>>,
}

Moving on, we now get this error:

$ cargo run --quiet
error[E0277]: `F` cannot be sent between threads safely
   --> C:\msys64\home\amos\ftl\ersatz\src\lib.rs:183:32
    |
183 |           self.pending.ipv4.push(Box::new(move |packet| {
    |  ________________________________^
184 | |             match f(&packet) {
185 | |                 Some(val) => {
186 | |                     tx.send(val).unwrap_or(()); // ignore send errors
...   |
190 | |             }
191 | |         }));
    | |__________^ `F` cannot be sent between threads safely
    |
    = help: within `[closure@C:\msys64\home\amos\ftl\ersatz\src\lib.rs:183:41: 191:10 f:F, tx:std::sync::mpsc::Sender<T>]`, the trait `std::marker:
:Send` is not implemented for `F`
    = help: consider adding a `where F: std::marker::Send` bound
    = note: required because it appears within the type `[closure@C:\msys64\home\amos\ftl\ersatz\src\lib.rs:183:41: 191:10 f:F, tx:std::sync::mpsc:
:Sender<T>]`
    = note: required for the cast to the object type `dyn for<'r> std::ops::Fn(&'r ipv4::Packet) -> bool + std::marker::Send`

Alright, same thing. This is in expect_ipv4 - we're making a closure that adapts our Fn(&ipv4::Packet) -> Option<T> into a Fn(&ipv4::Packet) -> bool. It stands to reason that anything our Fn(&ipv4::Packet) -> bool captures must also be Send - since we're sending a closure's context along with its code.

We need a few more constraints:

    pub fn expect_ipv4<F, T>(&mut self, f: F) -> mpsc::Receiver<T>
    where
        F: Fn(&ipv4::Packet) -> Option<T> + Send + 'a, // new: "+ Send"
        T: Send + 'a, // also here
    {
        let (tx, rx) = mpsc::channel();

        self.pending.ipv4.push(Box::new(move |packet| {
            match f(&packet) {
                Some(val) => {
                    tx.send(val).unwrap_or(()); // ignore send errors
                    true
                }
                None => false,
            }
        }));
        rx
    }

And now, everythi..

$ cargo run
   Compiling ersatz v0.1.0 (C:\msys64\home\amos\ftl\ersatz)
error[E0495]: cannot infer an appropriate lifetime due to conflicting requirements
  --> C:\msys64\home\amos\ftl\ersatz\src\lib.rs:45:21
   |
45 |         let iface = RAWSOCK_LIB.open_interface(&iface_name)?;
   |                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   |
note: first, the lifetime cannot outlive the lifetime 'a as defined on the impl at 41:6...
  --> C:\msys64\home\amos\ftl\ersatz\src\lib.rs:41:6
   |
41 | impl<'a> Interface<'a> {
   |      ^^
   = note: ...so that the types are compatible:
           expected dyn rawsock::traits::DynamicInterface<'a>
              found dyn rawsock::traits::DynamicInterface<'_>
   = note: but, the lifetime must be valid for the static lifetime...
note: ...so that the type `[closure@C:\msys64\home\amos\ftl\ersatz\src\lib.rs:52:28: 77:10 iface:std::boxed::Box<dyn rawsock::traits::DynamicInterface<'_>>, pending:PendingQueries<'_>]` will meet its required lifetime bounds
  --> C:\msys64\home\amos\ftl\ersatz\src\lib.rs:52:9
   |
52 |         std::thread::spawn(move || {
   |         ^^^^^^^^^^^^^^^^^^

error: aborting due to previous error

Aw, no! I thought we solved that one!

Wait, no we didn't - not really anyway. We only made sure that our Library had the 'static lifetime, but we kind of, uh, gave the lifetime of rawsock::traits::DynamicInterface a name, 'a, and promptly forgot about it.

But as-is, there's no guarantee the thread will ever end! It might run until the whole application exits - way after ersatz::Interface has been dropped. In other words, it can outlive 'a - and that's what rustc is warning us about.

Which, to be honest, is a bit of a pickle. Because we now have two things, one with lifetime 'a (the Interface struct) and one with lifetime 'static (the packet processor thread), both of which hold a reference to the same thing:

Assuming we could "turn off" the borrow checker, we'd be fine if the thread finishes first. Then whenever Library gets dropped, the interface would be closed and freed. But there's nothing that stops the thread if Library gets dropped first. And that's (among other things) what rustc is telling us.

How can we solve that?

Well, we can pull the same trick we did earlier - we can make the interface a static, and initialize it lazily using once_cell. But I don't like that idea. I can more or less stomach the fact that a library like npcap is something we open once and then can use from wherever, but I don't like the idea of one iface.break_loop() call affecting someone else's iface.loop_infinite_dyn() call.

In other words, rawsock::traits::DynamicInterface is stateful, and it should not be shared (even though all its methods have &self as receiver - which normally means it's all thread-safe).

Luckily, there's another way.

When you don't know how many references there's going to be to some value, but you definitely want it to be freed only when there are no references left to it, you can use reference counting.

A quick search in the Rust standard library documentation turns up std::rc::Rc:

A single-threaded reference-counting pointer. 'Rc' stands for 'Reference Counted'.

Mh. We're definitely using rawsock::traits::DynamicInterface from several threads. Isn't there something thread-safe? Let's look at std::sync::Arc:

A thread-safe reference-counting pointer. 'Arc' stands for 'Atomically Reference Counted'.

The type Arc<T> provides shared ownership of a value of type T, allocated in the heap. Invoking clone on Arc produces a new Arc instance, which points to the same value on the heap as the source Arc, while increasing a reference count. When the last Arc pointer to a given value is destroyed, the pointed-to value is also destroyed.

Shared references in Rust disallow mutation by default, and Arc is no exception: you cannot generally obtain a mutable reference to something inside an Arc. If you need to mutate through an Arc, use Mutex, RwLock, or one of the Atomic types.

Heyyy, that sounds exactly like what the doctor ordered!

You can use an Arc<T> like you would use a Box<T> (they're both smart pointers), so let's just perform a quick swaperoo:

pub struct Interface<'a> {
    nic: netinfo::NIC,
    gateway_mac: ethernet::Addr,
    iface: Arc<dyn rawsock::traits::DynamicInterface<'a> + 'a>,
    pending: PendingQueries<'a>,
}
$ cargo run
   Compiling ersatz v0.1.0 (C:\msys64\home\amos\ftl\ersatz)
error[E0308]: mismatched types
  --> C:\msys64\home\amos\ftl\ersatz\src\lib.rs:83:13
   |
83 |             iface,
   |             ^^^^^ expected struct `std::sync::Arc`, found struct `std::boxed::Box`
   |
   = note: expected type `std::sync::Arc<(dyn rawsock::traits::DynamicInterface<'a> + 'a)>`
              found type `std::boxed::Box<dyn rawsock::traits::DynamicInterface<'_>>`

error: aborting due to previous error

Ah. Of course. This bit of code is now problematic:

    pub fn open_default() -> Result<Self, error::Error> {
        // (cut)
        let iface = RAWSOCK_LIB.open_interface(&iface_name)?;

        // (cut)
        let res = Self {
            pending,
            nic,
            gateway_mac: ethernet::Addr::zero(),
            iface, // <- wrong type here
        };
        Ok(res)
    }

In other words, we need rawsock itself to support returning an Arc, rather than a Box.

Well, does it?

// rawsock internals

pub trait Library {
    fn open_interface_arc<'a>(
        &'a self,
        name: &str
    ) -> Result<Arc<dyn DynamicInterface<'a> + 'a>, Error>;
}

It does! It's almost like.. the rawsock crate was carefully designed? To support exactly this use-case?

Very well, let's switch to it:

    pub fn open_default() -> Result<Self, error::Error> {
        // (cut)
        // was: open_interface, now: open_interface_arc
        let iface = RAWSOCK_LIB.open_interface_arc(&iface_name)?;
        // (cut)
    }
$ cargo run --quiet
error[E0495]: cannot infer an appropriate lifetime due to conflicting requirements
  --> C:\msys64\home\amos\ftl\ersatz\src\lib.rs:45:21
   |
45 |         let iface = RAWSOCK_LIB.open_interface_arc(&iface_name)?;
   |                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   |
note: first, the lifetime cannot outlive the lifetime 'a as defined on the impl at 41:6...
  --> C:\msys64\home\amos\ftl\ersatz\src\lib.rs:41:6
   |
41 | impl<'a> Interface<'a> {
   |      ^^
   = note: ...so that the types are compatible:
           expected dyn rawsock::traits::DynamicInterface<'a>
              found dyn rawsock::traits::DynamicInterface<'_>
   = note: but, the lifetime must be valid for the static lifetime...
note: ...so that the type `[closure@C:\msys64\home\amos\ftl\ersatz\src\lib.rs:52:28: 77:10 iface:std::sync::Arc<dyn rawsock::traits::DynamicInterface<'_>>, pending:PendingQueries<'_>]` will meet its required lifetime bounds
  --> C:\msys64\home\amos\ftl\ersatz\src\lib.rs:52:9
   |
52 |         std::thread::spawn(move || {
   |         ^^^^^^^^^^^^^^^^^^

error: aborting due to previous error

Huh. I uh... I could've sworn we just solved that one. Wasn't Arc<T> supposed to solve that one? I uh checks notes oh alright we'll read the error message.

So what it says is basically that, our thread still runs for 'static, so our DynamicInterface should still live for 'static and, uh, wait a minute, we gave it a less-than-static lifetime in the definition of Interface. Alright, that's easy to fix.

pub struct Interface<'a> {
    // (cut: other fields)

    // was:
    iface: Arc<dyn rawsock::traits::DynamicInterface<'a> + 'a>,
    // now:
    iface: Arc<dyn rawsock::traits::DynamicInterface<'static>>,

    pending: PendingQueries<'a>,
}

But WAIT A MINUTE. We don't need rustc to pick up all our mistakes. We also use pending in that thread. And it says right here, that pending's lifetime is 'a. We're for sure going to get an error here as well.

Why does PendingQueries need an explicit lifetime again?

struct PendingQueries<'a> {
    arp: HashMap<ipv4::Addr, mpsc::Sender<ethernet::Addr>>,
    ipv4: Vec<Box<dyn Fn(&ipv4::Packet) -> bool + Send + 'a>>,
}

Oh right. Our closures.

Well, is it unreasonable to ask that any Fn passed to expect_ipv4 be valid for 'static?

I don't think so. The expect use case is for those functions to do mostly pattern matching like: is it an IPv4 packet? From that host? Containing an ICMP packet? Which is an echo reply? etc.

I'm thinking those functions can probably take ownership of anything they need - like copies of ipv4::Addr, maybe some u16, etc. It seems reasonable to ask that.

So we can rid PendingQueries of its lifetime type parameter:

struct PendingQueries {
    arp: HashMap<ipv4::Addr, mpsc::Sender<ethernet::Addr>>,
    ipv4: Vec<Box<dyn Fn(&ipv4::Packet) -> bool + Send + 'static>>,
}

And, in turn, Interface:

pub struct Interface {
    nic: netinfo::NIC,
    gateway_mac: ethernet::Addr,
    iface: Arc<dyn rawsock::traits::DynamicInterface<'static>>,
    pending: PendingQueries,
}

And WAIT A MINUTE, I'm fairly sure we're going to run into some more problems.

We access pending from both expect_ipv4 (to add listeners) and from our "packet processor thread" (to call listeners and maybe remove some if they want to unsubscribe). And this happens from different threads.

So not only does pending need to be valid for 'static (because we access it from a thread that has lifetime 'static), we also need to protect it from concurrent use.

As far as lifetimes are concerned, we can use an Arc!

Let's see how that would work.

Note that Interface no longer has a lifetime parameter ('a), so we need to remove it from the impl block as well.

pub struct Interface {
    nic: netinfo::NIC,
    gateway_mac: ethernet::Addr,
    iface: Arc<dyn rawsock::traits::DynamicInterface<'static>>,
    pending: Arc<PendingQueries>,
}

impl Interface {
    pub fn open_default() -> Result<Self, error::Error> {
        // cut: get `nic`, open `iface`

        let pending = Arc::new(PendingQueries {
            arp: HashMap::new(),
            ipv4: Vec::new(),
        });

        let res = Self {
            // create another reference to `pending`,
            // which increase the reference count by one
            pending: pending.clone(),
            nic,
            gateway_mac: ethernet::Addr::zero(),
            iface,
        };

        std::thread::spawn(move || {
            iface
                .loop_infinite_dyn(&mut |packet| {
                    let frame = match ethernet::Frame::parse(packet) {
                        Ok((_, frame)) => frame,
                        _ => return,
                    };

                    if let ethernet::Payload::IPv4(ref packet) = frame.payload {
                        let mut idx = None;
                        // try all listeners in order
                        for (i, f) in pending.ipv4.iter().enumerate() {
                            if f(&packet) {
                                // break if one wanted to unsubscribe
                                idx = Some(i);
                                break;
                            }
                        }
                        if let Some(idx) = idx {
                            // unsubscribe
                            pending.ipv4.remove(idx);
                        }
                    }
                })
                .unwrap();
        });

        Ok(res)
    }
}

The theory is sound, have we forgotten about anything?

$ cargo run --quiet
error[E0277]: `std::sync::mpsc::Sender<ethernet::Addr>` cannot be shared between threads safely
   --> C:\msys64\home\amos\ftl\ersatz\src\lib.rs:58:9
    |
58  |         std::thread::spawn(move || {
    |         ^^^^^^^^^^^^^^^^^^ `std::sync::mpsc::Sender<ethernet::Addr>` cannot be shared between threads safely
    |
    = help: within `(ipv4::Addr, std::sync::mpsc::Sender<ethernet::Addr>)`, the trait `std::marker::Sync` is not implemented for `std::sync::mpsc::Sender<ethernet::Addr>`
    = note: required because it appears within the type `(ipv4::Addr, std::sync::mpsc::Sender<ethernet::Addr>)`
    = note: required because of the requirements on the impl of `std::marker::Sync` for `hashbrown::raw::RawTable<(ipv4::Addr, std::sync::mpsc::Sender<ethernet::Addr>)>`
    = note: required because it appears within the type `hashbrown::map::HashMap<ipv4::Addr, std::sync::mpsc::Sender<ethernet::Addr>, std::collections::hash_map::RandomState>`
    = note: required because it appears within the type `std::collections::HashMap<ipv4::Addr, std::sync::mpsc::Sender<ethernet::Addr>>`
    = note: required because it appears within the type `PendingQueries`
    = note: required because of the requirements on the impl of `std::marker::Send` for `std::sync::Arc<PendingQueries>`
    = note: required because it appears within the type `[closure@C:\msys64\home\amos\ftl\ersatz\src\lib.rs:58:28: 83:10 iface:std::sync::Arc<dyn rawsock::traits::DynamicInterface<'_>>, pending:std::sync::Arc<PendingQueries>]`

error[E0277]: `(dyn for<'r> std::ops::Fn(&'r ipv4::Packet) -> bool + std::marker::Send + 'static)` cannot be shared between threads safely
   --> C:\msys64\home\amos\ftl\ersatz\src\lib.rs:58:9
    |
58  |         std::thread::spawn(move || {
    |         ^^^^^^^^^^^^^^^^^^ `(dyn for<'r> std::ops::Fn(&'r ipv4::Packet) -> bool + std::marker::Send + 'static)` cannot be shared between threads safely
    |
    = help: the trait `std::marker::Sync` is not implemented for `(dyn for<'r> std::ops::Fn(&'r ipv4::Packet) -> bool + std::marker::Send + 'static)`
    (cut: same requirements)

error: aborting due to 2 previous errors

Hey, that's Sync! Send's little sister! The nomicon told us earlier:

  • A type is Sync if it is safe to share between threads (&T is Send).

Can mpsc::Sender<T> ever be Send? Can it be Sync ?

On its documentation page, we find:

// https://doc.rust-lang.org/std/sync/mpsc/struct.Sender.html

impl<T: Send> Send for Sender<T> {}
impl<T> !Sync for Sender<T> {}

So: it automatically becomes Send if T itself is Send, but it can never be Sync.

But, wait, why are we getting an error on std::thread::spawn about a Sender<T>? We know is that the thread captures an Arc<PendingQueries>, and PendingQueries currently looks like:

struct PendingQueries {
    arp: HashMap<ipv4::Addr, mpsc::Sender<ethernet::Addr>>,
    ipv4: Vec<Box<dyn Fn(&ipv4::Packet) -> bool + Send + 'static>>,
}

Right. There is a Sender<ethernet::Addr>. It just so happens that ethernet::Addr is both Send and Sync (it's just data), but that doesn't matter - an mpsc::Sender<T> will never be Sync - presumably because, for performance reasons, it's single-threaded by default.

There's no lock inside of Sender that would prevent data corruption from happening if two threads called .send() at the same time. You know what is Sync, and does a great job at doing that though?

std::sync::Mutex!

I used it earlier in this article and didn't really explain why. Mutex stands for "mutually exclusive" - it's a lock that only one thread can hold at a time. It needs to release it before another one can acquire it.

We don't need a Mutex for iface, because we only ever need an immutable reference to it - those are already Sync, but we do need one for pending, which we modify from two different threads: the one we start in open_default(), that processes packets in the background, and the main thread, from the code that uses ersatz.

Next question: which order do they go in? The documentation for Arc just says "use a Mutex". It doesn't tell us if we need to go Mutex<Arc<T>> or Arc<Mutex<T>>.

Let's review: in order to use something in a Mutex, you need to lock() it.

// example code

let t: T;
let m = Mutex::new(t);

{
    // this blocks if another thread holds the lock,
    // it resumes when we successfully acquire it.
    let m_guard = m.lock().unwrap();

    // m_guard implements `Deref<Target=T>` and `DerefMut<Target=T>`
    // so we can call methods on it:
    m_guard.do_something()

    // m_guard is dropped here, releasing the lock
}

So if we had a Mutex<Arc<T>> and we wanted different threads to hold on to it, we'd need to... lock it first? And then clone() the Arc<T> inside? And then... I guess create another Mutex to give to the other thread? But then we'd have two locks, one per thread - which defeats the purpose, because then we wouldn't be synchronizing anything.

// example code (bad)

let t: T;
let m = Mutex::new(Arc::new(t));

// uh-oh, second Mutex? that can't be right
let m_for_thread = Mutex::new(m.lock().unwrap().clone());
std::thread::spawn(move || {
    // ...note that rustc is going to complain about us sharing
    // an `Arc<T>` across threads anyway (if `T` isn't `Sync + Send`).
    // which is the reason we need a `Mutex` in the first place!
})

From this, it's pretty clear we need an Arc<Mutex<T>>, because we want: multiple threads, to hold (counted) references to the same Mutex, protecting a single resource.

Let's give it a shot:

impl Interface {
    pub fn open_default() -> Result<Self, error::Error> {
        // cut: get nic, iface

        let pending = Arc::new(Mutex::new(PendingQueries {
            arp: HashMap::new(),
            ipv4: Vec::new(),
        }));

        let res = Self {
            // make a new a reference to the (same) mutex, all good
            pending: pending.clone(),
            // make a new reference to the iface, still good
            iface: iface.clone(),

            // omitted: other fields
        };

        std::thread::spawn(move || {
            iface
                .loop_infinite_dyn(&mut |packet| {
                    let frame = /* omitted: parsing */;

                    if let ethernet::Payload::IPv4(ref packet) = frame.payload {
                        let mut idx = None;
                        // only if the packet is IPv4 do we need to lock `pending`
                        let mut pending = pending.lock().unwrap();

                        // now we can read/write to/from pending.ipv4
                        for (i, f) in pending.ipv4.iter().enumerate() {
                            // etc.
                        }

                        // cut: unsubscribe listeners
                    }
                })
                .unwrap();
        });

        Ok(res)
    }
}

We also need to change expect_ipv4 to make it acquire the lock:

    // new: we don't need to take `&mut self` anymore, just `&self`!
    pub fn expect_ipv4<F, T>(&self, f: F) -> mpsc::Receiver<T>
    where
        F: Fn(&ipv4::Packet) -> Option<T> + Send + 'static,
        T: Send + 'static,
    {
        let (tx, rx) = mpsc::channel();

        // new: acquire lock
        let mut pending = self.pending.lock().unwrap();
        pending.ipv4.push(Box::new(move |packet| {
            // etc.
        }));
        rx
    }

...and just like that, it compiles.

No, no, I'm serious! I know we've joked a lot, but this time, for real, it does compile.

I've taken the liberty of fleshing out sup's main() function to be feature-complete:

// in `sup/src/main.rs`

fn main() -> Result<(), Box<dyn Error>> {
    let arg = env::args().nth(1).unwrap_or_else(|| {
        println!("Usage: sup DEST");
        process::exit(1);
    });

    let dest = arg.parse()?;

    let iface = Interface::open_default()?;

    // this should really be random, but oh well.
    let identifier = 0xC0DE;

    // always send the same data, a-la Windows ping
    let data = "O Romeo.";
    println!("Pinging {:?} with {} bytes of data:", dest, data.len());

    // ping 4 times - this should really be configurable.
    // also, have the sequence_number be increasing.
    // fun fact: I tried without, and 8.8.8.8 straight up refused
    // to respond to the second request with the same sequence number!
    for sequence_number in 0..4 {
        let echo_request = ersatz::icmp::Echo {
            identifier,
            sequence_number,
        }
        .as_echo_request(data)
        .as_ipv4_payload();

        // measure ping time - more on that later.
        let before = Instant::now();

        // install our listener *before* we send our echo request,
        // so we're sure not to miss it!
        let rx = iface.expect_ipv4(move |packet| {
            // we can un-nest that by using a `match` with a catch-all
            // arm that returns instead. I have no strong feelings one way
            // or the other.
            if let ipv4::Payload::ICMP(ref icmp_packet) = packet.payload {
                if let icmp::Header::EchoReply(ref reply) = icmp_packet.header {
                    if reply.identifier == identifier && reply.sequence_number == sequence_number {
                        return Some((before.elapsed(), packet.clone()));
                    }
                }
            }

            None
        });

        // send our echo request
        iface.send_ipv4(echo_request, &dest)?;

        // wait up to 3 seconds for a reply
        match rx.recv_timeout(Duration::from_secs(3)) {
            Ok((elapsed, packet)) => {
                // Even though we know it's an Echo Reply ICMP packet, we still
                // need to do pattern matching here.
                // We could go another route and have a custom type with just the
                // info we need (or even print from our `expect_ipv4` closure) to
                // avoid that.
                // It's just nice to know listeners can return packets unadulterated!
                if let ipv4::Payload::ICMP(ref icmp_packet) = packet.payload {
                    if let icmp::Header::EchoReply(_) = icmp_packet.header {
                        println!(
                            "Reply from {:?}: bytes={} time={:?} TTL={}",
                            packet.src,
                            icmp_packet.payload.0.len(),
                            elapsed,
                            packet.ttl,
                        );
                    }
                }
            }
            Err(_) => {
                // give up and go home
                println!("Timed out!");
                process::exit(1);
            }
        }

        // wait a bit before pinging again. We don't want
        // to DoS Google, now do we? *laugh track*
        std::thread::sleep(Duration::from_secs(1));
    }

    Ok(())
}

Pretty neat, right?

Now all of that compiles (pinky swear).

It even runs:

$ cargo run --quiet -- 8.8.8.8
Pinging 8.8.8.8 with 8 bytes of data:
Timed out!

Of course, it doesn't work. But it does compile and run.

Which, as far as I'm concerned.. I mean, my job here is done, right? You figure it out.

Cool bear

Cool bear's hot tip

Hey uhh I found an gateway_mac: ethernet::Addr::zero(), is it yours?

Oooohhhhhh. Right. Kinda forgot all about the ARP part, in all that excitement.

Turns out we've been sending our Ethernet frames to "00-00-00-00-00-00". Well, it happens, haha. I'm fairly sure our router completely dropped our packets though. Which may explain the timeout.

So we need to find the MAC address of our router before returning from Interface::open_default().

I'm pretty sure a scoped thread is the right tool for the job. Let's remove arp from PendingRequests too - we'll deal with all that locally.

struct PendingQueries {
    // gone: arp
    ipv4: Vec<Box<dyn Fn(&ipv4::Packet) -> bool + Send + 'static>>,
}

impl Interface {
    pub fn open_default() -> Result<Self, error::Error> {
        // omitted: getting nic, iface

        // gone: arp
        let pending = Arc::new(Mutex::new(PendingQueries { ipv4: Vec::new() }));

        let gateway_mac = crossbeam_utils::thread::scope(|s| {
            let (tx, rx) = mpsc::channel();
            let gateway_ip = nic.gateway;

            let poll_iface = iface.clone();
            s.spawn(move |_| {
                poll_iface
                    .loop_infinite_dyn(&mut |packet| {
                        // if we can parse it...
                        let frame = match ethernet::Frame::parse(packet) {
                            Ok((_remaining, frame)) => frame,
                            _ => return,
                        };
                        // ...and it's ARP
                        let arp = match frame.payload {
                            ethernet::Payload::ARP(x) => x,
                            _ => return,
                        };

                        // ...and it's a reply
                        if let arp::Operation::Reply = arp.operation {
                            // ...from the host we want
                            if arp.sender_ip_addr == gateway_ip {
                                // ... then send the result!
                                tx.send(arp.sender_hw_addr).unwrap();
                            }
                        }
                    })
                    .unwrap();
            });

            arp::Packet::request(&nic, gateway_ip)
                .as_ethernet_payload()
                .as_broadcast_frame(&nic)
                .send(iface.as_ref())
                .unwrap();

            let ret = rx
                .recv_timeout(Duration::from_secs(3))
                .map_err(|_| "ARP timeout")
                .unwrap();
            iface.break_loop();
            ret
        })
        .unwrap();

        let res = Self {
            pending: pending.clone(),
            nic,
            // new: use `gateway_mac` value we just obtained
            gateway_mac,
            iface: iface.clone(),
        };

        std::thread::spawn(move || {
            // omitted: body of "packet processor thread"
        });

        Ok(res)
    }
}

That should do the trick.

I want to jump on the occasion to do one more code cleanup.

Our packet processing thread looks like that currently:

iface
    .loop_infinite_dyn(&mut |packet| {
        let frame = match ethernet::Frame::parse(packet) {
            Ok((_, frame)) => frame,
            _ => return,
        };

        if let ethernet::Payload::IPv4(ref packet) = frame.payload {
            let mut idx = None;
            // only if the packet is IPv4 do we need to lock `pending`
            let mut pending = pending.lock().unwrap();

            // try all listeners in order
            for (i, f) in pending.ipv4.iter().enumerate() {
                if f(&packet) {
                    // break if one wanted to unsubscribe
                    idx = Some(i);
                    break;
                }
            }
            if let Some(idx) = idx {
                // unsubscribe
                pending.ipv4.remove(idx);
            }
        }
    })
    .unwrap();

I'm not super fond of that. Isn't there a method on Iterator that returns the position of an item that satisfies a predicate?

// the rust standard library

/// Searches for an element in an iterator, returning its index.
///
/// position() takes a closure that returns true or false. It applies this
/// closure to each element of the iterator, and if one of them returns true,
/// then position() returns Some(index). If all of them return false, it returns
/// None.
///
/// position() is short-circuiting; in other words, it will stop processing as
/// soon as it finds a true.
fn position<P>(&mut self, predicate: P) -> Option<usize>
where
    P: FnMut(Self::Item) -> bool

Heyyy that sounds great.

Using that, we can change:

let mut idx = None;
// only if the packet is IPv4 do we need to lock `pending`
let mut pending = pending.lock().unwrap();

// try all listeners in order
for (i, f) in pending.ipv4.iter().enumerate() {
    if f(&packet) {
        // break if one wanted to unsubscribe
        idx = Some(i);
        break;
    }
}
if let Some(idx) = idx {
    // unsubscribe
    pending.ipv4.remove(idx);
}

To the following:

let mut pending = pending.lock().unwrap();
pending
    .ipv4
    .iter()
    .position(|f| f(packet))
    .map(|i| pending.ipv4.remove(i));

Much better!

Does it work now?

$ cargo run --quiet -- 8.8.8.8
Pinging 8.8.8.8 with 8 bytes of data:
Reply from 8.8.8.8: bytes=8 time=1.000651s TTL=54
Reply from 8.8.8.8: bytes=8 time=301.8831ms TTL=54
Reply from 8.8.8.8: bytes=8 time=251.4266ms TTL=54
Reply from 8.8.8.8: bytes=8 time=210.9397ms TTL=54

It does! Our efforts to parse and serialize Ethernet, ARP, IPv4, and ICMP have all finally come together.

There's.. there's something about these timings though. They don't really sound accurate. The first one in particular (1.0000651s) is downright fishy.

Compared to the output of Windows's built-in ping:

$ ping 8.8.8.8

Pinging 8.8.8.8 with 32 bytes of data:
Reply from 8.8.8.8: bytes=32 time=8ms TTL=54
Reply from 8.8.8.8: bytes=32 time=7ms TTL=54
Reply from 8.8.8.8: bytes=32 time=8ms TTL=54
Reply from 8.8.8.8: bytes=32 time=7ms TTL=54

Yeah our timings are whack.

But, again, in a suspicious way. There's no way the roundtrip took almost exactly one second. It's almost like there's a one-second delay somewhere in the stack.

We now it's not in ersatz or sup. Although we do wait one second between each ping, we send and process packets as soon as we can. Also, we've taken some liberties performance-wise, but nowhere near any that could justify such a delay.

It's almost like... our packet capture/injection library is buffering packets at some point...

If you'll remember, we know that it has a "send queue", and we took care to flush it:

// in `ersatz/src/lib.rs`

impl Interface {
    pub fn send_ipv4(&self, payload: ipv4::Payload, addr: &ipv4::Addr) -> Result<(), error::Error> {
        payload
            .as_packet(self.nic.address, addr.clone())
            .as_ethernet_payload()
            .as_frame(&self.nic, self.gateway_mac)
            .send(self.iface.as_ref())?;
        // right here
        self.iface.as_ref().flush();
        Ok(())
    }
}

Is there a "receive queue" maybe? The actual library rawsock uses is npcap - in particular, it uses its winpcap-compatible interface, maybe if we look at the rawsock code for this, we might find a clue?

// rawsock internals

impl<'a> Interface<'a> {
    pub fn new(name: &str, dll: &'a WPCapDll) ->Result<Self, Error> {
        let name = CString::new(name)?;
        let mut errbuf =  PCapErrBuf::new();
        let handle = unsafe { dll.pcap_open_live(
            name.as_ptr(),
            65536,                  /* max packet size */
            8,                      /* promiscuous mode */
            1000,                   /* read timeout in milliseconds */
            errbuf.buffer()
        )};

        // omitted: rest of function
    }
}

Oh hey. Would you look at that.

Let's try something. We'll clone rawsock locally, in a vendor/ directory:

# (in ersatz/)

$ mkdir vendor

$ git clone https://github.com/szymonwieloch/rust-rawsock vendor/rawsock
Cloning into 'vendor/rawsock'...
remote: Enumerating objects: 82, done.
remote: Counting objects: 100% (82/82), done.
remote: Compressing objects: 100% (50/50), done.
remote: Total 931 (delta 48), reused 54 (delta 32), pack-reused 849
Receiving objects: 100% (931/931), 152.95 KiB | 855.00 KiB/s, done.
Resolving deltas: 100% (597/597), done.

Set the timeout to 1 millisecond:

// in `vendor/rawsock/src/wpcap/interface.rs`

impl<'a> Interface<'a> {
    pub fn new(name: &str, dll: &'a WPCapDll) -> Result<Self, Error> {
        let name = CString::new(name)?;
        let mut errbuf = PCapErrBuf::new();
        let handle = unsafe {
            dll.pcap_open_live(
                name.as_ptr(),
                65536, /* max packet size */
                8,     /* promiscuous mode */
                // new!
                1,     /* read timeout in milliseconds */
                errbuf.buffer(),
            )
        };

        // cut: rest
    }
}

Adjust our Cargo.toml:

# in `rawsock/Cargo.toml`

[dependencies]
rawsock = { version = "0.3.0", path = "./vendor/rawsock" }
# omitted: everything else

And give it a shot!

$ cargo run --quiet -- 8.8.8.8
Pinging 8.8.8.8 with 8 bytes of data:
Reply from 8.8.8.8: bytes=8 time=9.8372ms TTL=54
Reply from 8.8.8.8: bytes=8 time=9.0064ms TTL=54
Reply from 8.8.8.8: bytes=8 time=9.9738ms TTL=54
Reply from 8.8.8.8: bytes=8 time=8.9758ms TTL=54

And there you have it, folks.

We implemented parts of Ethernet, ARP, IPv4, and ICMP and have a fully-functioning ping utility that clocks in at roughly 1600 lines of relatively-clean Rust.

We've made our own ping.

Comment on /r/fasterthanlime

(JavaScript is required to see this. Or maybe my stuff broke)

Here's another article just for you:

Pin and suffering

I'd like to think that my understanding of "async Rust" has increased over the past year or so. I'm 100% onboard with the basic principle: I would like to handle thousands of concurrent tasks using a handful of threads. That sounds great!

And to become proficient with async Rust, I've accepted a lot of things. There are blue functions and red functions, and red (async) functions are contagious.