Thanks to my sponsors: Johan Andersson, Jean Manguy, David Barsky, Yann Schwartz, Jonathan Adams, Borys Minaiev, Helge Eichhorn, Nicolas Riebesel, Valentin Mariette, Tanner Muro, Astrid, Dylan Anthony, David E Disch, jer, Boris Dolgov, Cole Kurkowski, Mario Fleischhacker, Richard Stephens, C J Silverio, Dennis Henderson and 235 more
Crafting ICMP-bearing IPv4 packets with the help of bitvec
👋 This page was last updated ~5 years ago. Just so you know.
So. Serializing IPv4 packets. Easy? Well, not exactly.
IPv4 was annoying to parse, because we had 3-bit integers, and 13-bit integers, and who knows what else. Serializing it is going to be exactly the same.
Right now, we don't have a way to serialize that.
Let's take the version
and ihl
fields, both of which are supposed
to take 4 bits, together making a byte. We could serialize them like this:
// in `src/ipv4.rs`
impl Packet {
pub fn serialize<'a, W: io::Write + 'a>(&'a self) -> impl cf::SerializeFn<W> + 'a {
use cf::bytes::be_u8;
// TODO: serialize dscp, ecn, etc.
be_u8((u8::from(self.version) << 4) + u8::from(self.ihl))
}
}
That would work, I think - let's try it out:
impl Packet {
pub fn parse(i: parse::Input) -> parse::Result {
let original_i = i;
// omitted: actual parser
// omitted: let res = Self { ... }
use crate::blob::Blob;
let serialized = cf::gen_simple(res.serialize(), Vec::new()).unwrap();
println!(" original = {:?}", Blob::new(original_i));
println!("serialized = {:?}", Blob::new(&serialized));
Ok((i, res))
}
}
$ cargo run --quiet
original = [45 00 00 29 17 cf 40 00 80 06 00 00 c0 a8 01 10 2e 33 b3 5a + 21 bytes]
serialized = [45]
original = [45 00 00 34 9f 03 40 00 27 06 11 7b 2e 33 b3 5a c0 a8 01 10 + 32 bytes]
serialized = [45]
original = [45 00 00 47 5e e7 00 00 80 11 00 00 c0 a8 01 10 08 08 08 08 + 51 bytes]
serialized = [45]
Yeah, okay, that works.
It's not fun, though. We avoid doing bit twiddling by hand when parsing, and I'll be darned if we do it when serializing.
If only we had a way to deal with... bits.. as individual units. Like, what if we could have a slice of bits - no, a vector of bits. That we could extend to, subslice, etc.
Luckily, horn section revs up there is crate for that sax riff.
Introducing the bitvec
crate. Which lets
us do exactly that.
Let's take it for a spin.
$ cargo add bitvec
Adding bitvec v0.16.0 to dependencies
// in `src/main.rs`
fn main() {
use bitvec::prelude::*;
let mut vec = BitVec::<BigEndian, u8>::new();
let val = 4u8;
vec.extend_from_slice(val.bits::<BigEndian>());
println!("{:?}", vec);
}
$ cargo run --quiet
BitVec<BigEndian, u64> [00000100]
Okay, so we got bits. We only want the last 4 bits though.
Can we do that?
use bitvec::prelude::*;
let mut vec = BitVec::<BigEndian, u8>::new();
let val = 4u8;
vec.extend_from_slice(&val.bits::<BigEndian>()[4..]);
println!("{:?}", vec);
$ cargo run --quiet
BitVec<BigEndian, u64> [0100]
Hey, yeah!
Can.. can we do that twice?
use bitvec::prelude::*;
let mut vec = BitVec::<BigEndian, u8>::new();
let val = 4u8;
vec.extend_from_slice(&val.bits::<BigEndian>()[4..]);
let val = 5u8;
vec.extend_from_slice(&val.bits::<BigEndian>()[4..]);
println!("{:?}", vec);
$ cargo run --quiet
BitVec<BigEndian, u64> [01000101]
Yes, uh-huh, yep we can.
Can we get that back as an u8
slice though?
use bitvec::prelude::*;
let mut vec = BitVec::<BigEndian, u8>::new();
let val = 4u8;
vec.extend_from_slice(&val.bits::<BigEndian>()[4..]);
let val = 5u8;
vec.extend_from_slice(&val.bits::<BigEndian>()[4..]);
println!("{:?}", vec);
println!("{}", as_hex::AsHex(vec.as_slice()));
$ cargo run --quiet
BitVec<BigEndian, u8> [01000101]
45
Well then.
Let's make our bed and serialize in it, shall we?
How about a nice helper in these trying times:
// in `src/main.rs`
mod serialize;
// in `src/serialize.rs`
use bitvec::prelude::*;
use cookie_factory as cf;
use std::io;
pub type BitOutput = BitVec<BigEndian, u8>;
pub fn bits<W, F>(f: F) -> impl cf::SerializeFn<W>
where
W: io::Write,
F: Fn(&mut BitOutput),
{
move |mut out: cf::WriteContext<W>| {
let mut bo = BitOutput::new();
f(&mut bo);
io::Write::write(&mut out, bo.as_slice())?;
Ok(out)
}
}
Wonderful.
Next up: we'll want serialize
methods on all those u3
, u4
, etc. types.
Let's start by adding a convenience method on BitOutput
. Since BitOutput
is just a type alias, not a newtype, we have to make our own trait, because
of orphan rules.
Cool bear's hot tip
See Part 11 for further discussion on orphan rules.
pub trait WriteLastNBits {
fn write_last_n_bits<B: Bits>(&mut self, b: B, num_bits: usize);
}
impl WriteLastNBits for BitOutput {
fn write_last_n_bits<B: Bits>(&mut self, b: B, num_bits: usize) {
let bitslice = b.bits::<BigEndian>();
let start = bitslice.len() - num_bits;
self.extend_from_slice(&bitslice[start..])
}
}
Now let's take on u4
, as an example:
pub trait BitSerialize {
fn write(&self, b: &mut BitOutput);
}
use ux::*;
impl BitSerialize for u4 {
fn write(&self, b: &mut BitOutput) {
// more on the choice of `u16` later
b.write_last_n_bits(u16::from(*self), 4);
}
}
Cool, that was easy.
Can we use it in our serializer?
impl Packet {
pub fn serialize<'a, W: io::Write + 'a>(&'a self) -> impl cf::SerializeFn<W> + 'a {
use crate::serialize::{bits, BitSerialize};
bits(move |bo| {
self.version.write(bo);
self.ihl.write(bo);
})
}
}
Looks like it!
Does it work?
$ cargo run --quiet
original = [45 00 00 34 6d 79 00 00 35 06 20 b8 5d b8 d8 22 c0 a8 01 10 + 32 bytes]
serialized = [45]
original = [45 00 00 28 9a 40 40 00 80 06 00 00 c0 a8 01 10 5d b8 d8 22 + 20 bytes]
serialized = [45]
That it does!
Let's industrialize BitSerialize using macros, the feature we all hate to love:
pub trait BitSerialize {
fn write(&self, b: &mut BitOutput);
}
use ux::*;
macro_rules! impl_bit_serialize_for_ux {
($($width: expr),*) => {
$(
paste::item! {
impl BitSerialize for [<u $width>] {
fn write(&self, b: &mut BitOutput) {
b.write_last_n_bits(u16::from(*self), $width);
}
}
}
)*
};
}
impl_bit_serialize_for_ux!(2, 3, 4, 6, 13);
Boom.
Serializing IPv4 is suddenly very easy.
Let's take care of Payload
and Protocol
first:
impl Payload {
pub fn protocol(&self) -> Protocol {
match self {
Self::ICMP(_) => Protocol::ICMP,
_ => unimplemented!(),
}
}
pub fn serialize<'a, W: io::Write + 'a>(&'a self) -> impl cf::SerializeFn<W> + 'a {
move |out| match self {
Self::ICMP(ref icmp) => icmp.serialize()(out),
_ => unimplemented!(),
}
}
}
impl Protocol {
pub fn serialize<'a, W: io::Write + 'a>(&'a self) -> impl cf::SerializeFn<W> + 'a {
use cf::bytes::be_u8;
// note: `Protocol` needs to derive Clone and Copy for this
be_u8(*self as u8)
}
}
And now the meat of it:
impl Packet {
pub fn serialize_no_checksum<'a, W: io::Write + 'a>(&'a self) -> impl cf::SerializeFn<W> + 'a {
use crate::serialize::{bits, BitSerialize};
use cf::{
bytes::{be_u16, be_u8},
sequence::tuple,
};
tuple((
bits(move |bo| {
// hard-code these, so we don't have to fill them out when
// building new IPv4 packets
let version = u4::new(4);
let ihl = u4::new(5);
let dscp = u6::new(0);
let ecn = u2::new(0);
version.write(bo);
ihl.write(bo);
dscp.write(bo);
ecn.write(bo);
}),
be_u16(0), // length, to fill later
be_u16(self.identification),
bits(move |bo| {
// again, hard-code these, we don't care about fragmentation
// right now
let flags = u3::new(0);
let fragment_offset = u13::new(0);
flags.write(bo);
fragment_offset.write(bo);
}),
be_u8(self.ttl),
// we need to do this to avoid capturing a temporary
move |out| self.payload.protocol().serialize()(out),
be_u16(0), // checksum, to fill later
self.src.serialize(),
self.dst.serialize(),
self.payload.serialize(),
))
}
}
Looking real good.
Let's compare notes:
// in `src/ipv4.rs`
impl Packet {
pub fn parse(i: parse::Input) -> parse::Result<Self> {
let original_i = i;
// cut: meat of the parser
if let Payload::ICMP(_) = res.payload {
use crate::as_hex::AsHex;
let serialized = cf::gen_simple(res.serialize(), Vec::new()).unwrap();
println!(" original = {}", AsHex(original_i));
println!("serialized = {}", AsHex(&serialized));
}
Ok((i, res))
}
}
Running this, we get lines like these:
$ cargo run --quiet
(cut)
original = 45 00 00 3c 00 00 00 00 36 01 b2 f9 08 08 08 08 c0 a8 01 10 00 00 45 2d 00 01 10 2e 61 62 63 64 65 66 67 68 69 6a 6b 6c 6d 6e 6f 70 71 72 73 74 75 76 77 61 62 63 64 65 66 67 68 69
serialized = 45 00 00 00 00 00 00 00 36 01 00 00 08 08 08 08 c0 a8 01 10 00 00 45 2d 00 01 10 2e 61 62 63 64 65 66 67 68 69 6a 6b 6c 6d 6e 6f 70 71 72 73 74 75 76 77 61 62 63 64 65 66 67 68 69
We can see that the original has 3c
, whereas serialized has 00
- that's
the length field. The original has a checksum of b2 f9
, ours is empty.
Let's fill those out. The plan is the same as icmp::Packet::serialize
- we
make another function that fills in the missing fields.
// in `src/ipv4.rs`
impl Packet {
// omitted: `fn serialize_no_checksum()`
pub fn serialize<'a, W: io::Write + 'a>(&'a self) -> impl cf::SerializeFn<W> + 'a {
use cf::{bytes::le_u16, combinator::slice};
move |out| {
let mut buf = cf::gen_simple(self.serialize_no_checksum(), Vec::new())?;
// fill in length
let length = buf.len() as u16;
cf::gen_simple(le_u16(length), &mut buf[2..])?;
// fill in checksum
// it's important to do this last, because the length is
// part of the checksum
let checksum = crate::ipv4::checksum(&buf);
cf::gen_simple(le_u16(checksum), &mut buf[10..])?;
slice(buf)(out)
}
}
}
Let's try this again:
$ cargo run --quiet
original = 45 00 00 3c 00 00 00 00 36 01 b2 f9
serialized = 45 00 3c 00 00 00 00 00 36 01 77 35
^^ ^^ ^^ ^^
Uh oh, we got both of these wrong somehow.
Cool bear's hot tip
If you want, take a minute and try to figure out what we messed up.
I'll wait.
To pad out the page (and avoid spoilers), let me tell you a few evil things we could do with a low-level ICMP implementation like ours:
First, we could try to ping every IP address in a subnet to find out which hosts are live. Why? Well, once we have a list of potential attack targets, we can start finding out if they have any UDP or TCP ports open, on which services are served, that could have potential vulnerabilities.
Second, we could just a lot of ICMP echo requests very quickly. Ideally, we'd do that from many different hosts. The idea is that the target would eventually no longer be able to handle sending responses - it would saturate its CPU, its network link, or both. That's a pretty simple Denial-of-service attack (aka DoS) right there.
Third, you know how we're able to specify an arbitrary payload (of reasonable length) in the ICMP echo request? And the host is supposed to reply with that same payload? Well it's our implementation - we can do anything we want. So we might have a host we control that stuffs replies in that payload. That way, we could implement IP over ICMP. This could be used to bypass WiFi paywalls - although it would be significantly slower than regular IP over Ethernet. But don't take it from me, give it a shot.
Fourth, we could (try to) use ICMP redirect messages to pull off a Man-in-the-middle attack (aka MITM). We'd pretend to be a router that says "hey, there's a better route to that host, you should totally send it to X.Y.Z.W instead", and that address is a host we control. This would probably be of limited use, as most network traffic nowadays is secured via protocols like TLS, but still fun.
Consequently, ICMP is often at least partially disabled for security reasons. To learn more, read Drew Branch's article, from which this list is paraphrased.
Okay, did you figure out what we got wrong?
First off, we used le_u16
to write the "total length" field. Because we
copied the code used to write the checksum - also a 16-bit integer. But for
the internet checksum, we need to write it as little-endian since it was
calculated as little-endian. The length, however, needs to be written as
big-endian, like all the IPv4 fields.
That explains why we got 3c 00
instead of 00 3c
Secondly, we computed a checksum over the whole packet. But the field is called "header checksum" for a reason.
Let's fix both of these:
// in `src/ipv4.rs`
impl Packet {
// omitted: `fn serialize_no_checksum()`
pub fn serialize<'a, W: io::Write + 'a>(&'a self) -> impl cf::SerializeFn<W> + 'a {
use cf::{
bytes::{be_u16, le_u16},
combinator::slice,
};
move |out| {
let mut buf = cf::gen_simple(self.serialize_no_checksum(), Vec::new())?;
// fill in length - as big-endian
let length = buf.len() as u16;
cf::gen_simple(be_u16(length), &mut buf[2..])?;
// fill in header checksum
// note: this will break if we ever allow IP options.
// luckily this is out of scope now.
let header_slice = &buf[..5 * 4];
let checksum = crate::ipv4::checksum(header_slice);
cf::gen_simple(le_u16(checksum), &mut buf[10..])?;
slice(buf)(out)
}
}
}
Let's run it once again - I've split the output over multiple lines so it's easier to compare:
orig | 45 00 00 3c 00 00 00 00 36 01 b2 f9 08 08 08 08
serd | 45 00 00 3c 00 00 00 00 36 01 b2 f9 08 08 08 08
^^ ^^^^^ ^^^^^ ^^^^^ ^^ ^^ ^^^^^ ^^^^^^^^^^^
ipv4 length | flags TTL | sum 8.8.8.8
ihl=5 ident proto=icmp
orig | c0 a8 01 10 00 00 45 17 00 01 10 44 61 62 63 64
serd | c0 a8 01 10 00 00 45 17 00 01 10 44 61 62 63 64
^^^^^^^^^^^ ^^^^^ ^^^^^ ^^^^^ ^^^^^ ^^^^^^^^^^^
192.168.1.16 | sum | seqnb payload "abcd"
echo reply ident
orig | 65 66 67 68 69 6a 6b 6c 6d 6e 6f 70 71 72 73 74
serd | 65 66 67 68 69 6a 6b 6c 6d 6e 6f 70 71 72 73 74
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
"efghijklmnopqrst"
orig | 75 76 77 61 62 63 64 65 66 67 68 69
serd | 75 76 77 61 62 63 64 65 66 67 68 69
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
"uvwabcdefghi"
Y'all, that looks really good.
It's time. The checks are summed and the bits are twiddled. We took our sweet, sweet time to arrive to this point. Our beast of a codebase is growling, impatient to be unleashed onto the greater internet.
Kill ping -t 8.8.8.8
- we won't be needing it anymore.
First, some ICMP helpers:
// in `src/icmp.rs`
impl Echo {
pub fn as_echo_request<P: AsRef<[u8]>>(self, payload: P) -> Packet {
Packet {
typ: Type::EchoRequest,
checksum: 0,
header: Header::EchoRequest(self),
payload: Blob::new(payload.as_ref()),
}
}
}
impl Packet {
pub fn as_ipv4_payload(self) -> crate::ipv4::Payload {
crate::ipv4::Payload::ICMP(self)
}
}
Next, some IPv4 helpers:
$ cargo add rand
Adding rand v0.7.2 to dependencies
// in `src/ipv4.rs`
impl Addr {
pub fn zero() -> Self {
Self([0, 0, 0, 0])
}
}
impl Default for Packet {
fn default() -> Self {
Self {
length: 0,
identification: rand::random(),
version: u4::new(4),
ihl: u4::new(5),
dscp: u6::new(0),
ecn: u2::new(0),
flags: u3::new(0),
fragment_offset: u13::new(0),
ttl: 128,
protocol: None,
checksum: 0,
src: Addr::zero(),
dst: Addr::zero(),
payload: Payload::Unknown,
}
}
}
impl Payload {
pub fn as_packet(self, src: Addr, dst: Addr) -> Packet {
Packet {
protocol: Some(self.protocol()),
payload: self,
src,
dst,
..Default::default()
}
}
}
impl Packet {
pub fn as_ethernet_payload(self) -> crate::ethernet::Payload {
crate::ethernet::Payload::IPv4(self)
}
}
Finally, let's send an echo request:
// in `src/main.rs`
fn make_queries(
iface: &dyn rawsock::traits::DynamicInterface,
nic: &netinfo::NIC,
pending: &Mutex<PendingQueries>,
) {
let gateway_ip = nic.gateway;
let (tx, rx) = mpsc::channel();
pending.lock().unwrap().arp.insert(gateway_ip, tx);
arp::Packet::request(nic, gateway_ip)
.as_ethernet_payload()
.as_broadcast_frame(nic)
.send(iface);
let gateway_mac = rx.recv().unwrap();
println!("gateway MAC: {:?}", gateway_mac);
// send an echo request to 8.8.8.8
icmp::Echo {
identifier: 0xC0DE,
sequence_number: 0xFACE,
}
.as_echo_request("I'm a little teapot".as_bytes())
.as_ipv4_payload()
.as_packet(nic.address, ipv4::Addr([8, 8, 8, 8]))
.as_ethernet_payload()
.as_frame(nic, gateway_mac)
.send(iface);
}
And now our big moment:
$ cargo run --quiet
Using NIC {
guid: "{0E89380B-814A-48FC-86C4-5C51B8040CB2}",
gateway: 192.168.1.254,
address: 192.168.1.16,
phy_address: F4-D1-08-0B-7E-BC,
}
gateway MAC: 14-0C-76-6A-71-BD
The application panicked (crashed).
Message: not yet implemented
Location: src\ethernet.rs:120
Backtrace omitted. Run with RUST_BACKTRACE=1 environment variable to display it.
Run with RUST_BACKTRACE=full to include source snippets.
Oh, ah, uh, just one second:
// in `src/ethernet.rs`
impl Payload {
pub fn serialize<'a, W: io::Write + 'a>(&'a self) -> impl cf::SerializeFn<W> + 'a {
use cf::sequence::tuple;
move |out| match self {
Self::ARP(ref packet) => tuple((EtherType::ARP.serialize(), packet.serialize()))(out),
// that's the line:
// |
// v
Self::IPv4(_) => unimplemented!(),
Self::Unknown => unimplemented!(),
}
}
}
Let's fix that real quick:
// in `src/ethernet.rs`
impl Payload {
pub fn serialize<'a, W: io::Write + 'a>(&'a self) -> impl cf::SerializeFn<W> + 'a {
use cf::sequence::tuple;
move |out| match self {
Self::ARP(ref packet) => tuple((EtherType::ARP.serialize(), packet.serialize()))(out),
Self::IPv4(ref packet) => tuple((EtherType::IPv4.serialize(), packet.serialize()))(out),
Self::Unknown => unimplemented!(),
}
}
}
Okay, now - for real - the moment we've all been waiting for:
$ cargo run --quiet
Using NIC {
guid: "{0E89380B-814A-48FC-86C4-5C51B8040CB2}",
gateway: 192.168.1.254,
address: 192.168.1.16,
phy_address: F4-D1-08-0B-7E-BC,
}
gateway MAC: 14-0C-76-6A-71-BD
The application panicked (crashed).
Message: checksum() input size should be a multiple of 2 bytes
Location: src\ipv4.rs:311
Backtrace omitted. Run with RUST_BACKTRACE=1 environment variable to display it.
Run with RUST_BACKTRACE=full to include source snippets.
Okay look, networks are complicated okay, we're fourteen articles in for crying out loud. You have to expect a few false starts.
So uh, what did we get wrong?
Well we always assumed that the input to the internet checksum would be a multiple of 2 bytes (and we made sure this assertion held by writing some code). And we were happy about it too, because the IPv4 header is always a multiple of 4 bytes, since its length is specified in "32-bit words".
But ICMP packets can have odd lengths, as we just found out.
What does RFC 792 say about that case??
Checksum
The checksum is the 16-bit ones's complement of the one's complement sum of the ICMP message starting with the ICMP Type.
For computing the checksum, the checksum field should be zero.
If the total length is odd, the received data is padded with one octet of zeros for computing the checksum.
This checksum may be replaced in the future.
Ah. Let's see. Our internet checksum routine is currently this:
// in `src/ipv4.rs`
pub fn checksum(slice: &[u8]) -> u16 {
let (head, slice, tail) = unsafe { slice.align_to::<u16>() };
if !head.is_empty() {
panic!("checksum() input should be 16-bit aligned");
}
if !tail.is_empty() {
panic!("checksum() input size should be a multiple of 2 bytes");
}
fn add(a: u16, b: u16) -> u16 {
let s: u32 = (a as u32) + (b as u32);
if s & 0x1_00_00 > 0 {
(s + 1) as u16
} else {
s as u16
}
}
!slice.iter().fold(0, |x, y| add(x, *y))
}
Now, we don't really want to do something like this:
// in `src/ipv4.rs`
pub fn checksum(slice: &[u8]) -> u16 {
if slice.len() % 2 != 0 {
let mut v = Vec::from(slice);
v.push(0);
return checksum(&v[..]);
}
// omitted: rest of checksum
}
Although that is the spirit of the RFC, and I did say on multiple occasions that we don't really care about performance, it seems silly to allocate a whole buffer just for that.
Instead, we can think about this: if our last byte is 0xFF
, padding it with
a null byte would look like 0xFF 0x00
. Our processor (we're doing all this
on x86) is little-endian, and FF
as a 16-bit little-endian integer is...
0xFF 0x00
as well.
TL;DR we can just do this:
// in `src/ipv4.rs`
pub fn checksum(slice: &[u8]) -> u16 {
let (head, slice, tail) = unsafe { slice.align_to::<u16>() };
if !head.is_empty() {
panic!("checksum() input should be 16-bit aligned");
}
// don't check tail, we'll use it
fn add(a: u16, b: u16) -> u16 {
let s: u32 = (a as u32) + (b as u32);
if s & 0x1_00_00 > 0 {
(s + 1) as u16
} else {
s as u16
}
}
!add(
slice.iter().fold(0, |x, y| add(x, *y)),
// add only item of tail (as an u16), or 0 (which is a no-op)
tail.iter().next().map(|&x| x as u16).unwrap_or_default(),
)
}
Okay, let's move on to the next error:
$ cargo run --quiet
Using NIC {
guid: "{0E89380B-814A-48FC-86C4-5C51B8040CB2}",
gateway: 192.168.1.254,
address: 192.168.1.16,
phy_address: F4-D1-08-0B-7E-BC,
}
gateway MAC: 14-0C-76-6A-71-BD
(192.168.1.16) => (8.8.8.8) | Packet {
typ: EchoRequest,
header: EchoRequest(Echo { identifier: c0de, sequence_number: face }),
payload: [49 27 6d 20 61 20 6c 69 74 74 6c 65 20 74 65 61 70 6f 74],
}
(8.8.8.8) => (192.168.1.16) | Packet {
typ: EchoReply,
header: EchoReply(Echo { identifier: c0de, sequence_number: face }),
payload: [49 27 6d 20 61 20 6c 69 74 74 6c 65 20 74 65 61 70 6f 74],
}
Ok so the thing we got wrong here is..
..nothing? It worked?
IT WORKED.
Migrating sup
to ersatz
The sup
crate was our take on ping
, using the Win32 ICMP APIs. Now that
we have our own network stack, we can just use that!
We'll have to think about how to package ersatz as a proper library though.
I'm thinking that:
- The ARP part should be completely abstracted away
- We should have an easy way to send IP packets to a destination
- We should have a way to wait for a packet that satisfies a specific filter.
In short, I want an API like that:
// fictional code
use ersatz::icmp::Echo;
use std::{time::Duration, process};
fn main() {
let iface = ersatz::Interface::open_default();
let addr = "8.8.8.8".parse().unwrap();
for sequence_number in 0..3 {
let echo_request = Echo {
identifier: 0xC0DE,
sequence_number,
}
.as_echo_request("I'm a little teapot".as_bytes())
.as_ipv4_payload();
// rx is an std::sync::mpsc::Receiver<ersatz::ipv4::Packet>
let rx = iface.expect_ipv4(|packet| {
// look for an ICMP echo reply packet from 'addr',
// with identifier 0xC0DE and the right sequence number
if is_the_one_we_want(packet) {
return Some(packet)
}
None
});
iface.send_ipv4(echo_request, addr).unwrap();
match res.recv_timeout(Duration::from_secs(3)) {
Ok(packet) => {
// print "Reply from {:?}: bytes={}, etc.
}
Err(_) => {
println!("Timed out!")
process::exit(1);
},
}
}
}
Can we pull that off?
First, let's add ersatz
as a dependency of sup
:
# in Cargo.toml
# omitted: [package]
[dependencies]
memoffset = "0.5.1"
pretty-hex = "0.1.1"
once_cell = "1.2.0"
# new!
ersatz = { path = "../ersatz" }
We can then start with a very basic main:
// in `sup/src/main.rs`
fn main() -> Result<(), Box<dyn Error>> {
Ok(())
}
$ cargo run
Compiling sup v0.1.0 (C:\msys64\home\amos\ftl\sup)
error[E0433]: failed to resolve: use of undeclared type or module `ersatz`
--> src\main.rs:8:17
|
8 | let iface = ersatz::default_iface();
| ^^^^^^ use of undeclared type or module `ersatz`
error: aborting due to previous error
Ah. Well, although we did add ersatz
to our Cargo.toml
, it's a binary
right now - not an executable. Let's change that by:
- Renaming
ersatz/src/main.rs
toersatz/src/lib.rs
- Making all modules public
// in `ersatz/src/lib.rs`
pub mod arp;
pub mod as_hex;
pub mod blob;
pub mod error;
pub mod ethernet;
pub mod icmp;
pub mod ipv4;
pub mod loadlibrary;
pub mod netinfo;
pub mod parse;
pub mod serialize;
pub mod vls;
Let's try this again:
$ cargo run
Compiling ersatz v0.1.0 (C:\msys64\home\amos\ftl\ersatz)
Compiling sup v0.1.0 (C:\msys64\home\amos\ftl\sup)
error[E0433]: failed to resolve: could not find `Interface` in `ersatz`
--> src\main.rs:8:25
|
8 | let iface = ersatz::Interface::open_default().unwrap();
| ^^^^^^^^^ could not find `Interface` in `ersatz`
error: aborting due to previous error
Okay, better. Let's prototype what our Interface
type should look like:
use std::sync::{mpsc, Arc, Mutex};
pub struct Interface {
// some fields go here, probably `nic`, `gateway_mac`,
// our `pending` queue and whatever else we need
}
impl Interface {
pub fn open_default() -> Result<Self, error::Error> {
unimplemented!()
}
pub fn send_ipv4(
&self,
payload: ipv4::Payload,
addr: &ipv4::Addr,
) -> Result<(), error::Error> {
unimplemented!()
}
pub fn expect_ipv4<F, T>(&self, f: F) -> mpsc::Receiver<T>
where
F: Fn(&ipv4::Packet) -> Option<T>,
{
unimplemented!()
}
}
That looks reasonable.
Let's start with the easiest bit: We want to be able to send IPv4 packets from our own IP address - those need to be coated in an Ethernet frame, sent from our MAC to the Gateway's MAC.
All our as_xxx()
helpers take ownership of self
, so we'll take an ipv4::Payload
,
not a reference:
// in `ersatz/src/lib.rs`
impl Interface {
pub fn send_ipv4(&self, payload: ipv4::Payload, addr: &ipv4::Addr) -> Result<(), error::Error> {
payload
.as_packet(self.nic.address, addr.clone())
.as_ethernet_payload()
.as_frame(&self.nic, self.gateway_mac)
.send(self.iface.as_ref())?;
self.iface.as_ref().flush();
Ok(())
}
}
Looks good. For us to be able to do addr.clone()
, we need to derive Clone
on ipv4::Addr
. While we're in that file, let's also copy over the FromStr
implementation from sup
into ersatz
, except this time we'll use the
thiserror
crate for ParseAddrError
:
// in `ersatz/src/ipv4.rs`
// new: Clone!
#[derive(PartialEq, Eq, Clone, Copy, Hash)]
pub struct Addr(pub [u8; 4]);
// omitted: custom Display, Debug implementations
use std::num::ParseIntError;
use thiserror::Error;
#[derive(Debug, Error)]
pub enum ParseAddrError {
#[error("not enough parts")]
NotEnoughParts,
#[error("too many parts")]
TooManyParts,
#[error("one of the components is not an int {0:?}")]
ParseIntError(#[from] ParseIntError),
}
impl std::str::FromStr for Addr {
type Err = ParseAddrError;
fn from_str(s: &str) -> Result<Self, ParseAddrError> {
let mut tokens = s.split(".");
let mut res = Self([0, 0, 0, 0]);
for part in res.0.iter_mut() {
*part = tokens
.next()
.ok_or(ParseAddrError::NotEnoughParts)?
.parse()?
}
if let Some(_) = tokens.next() {
return Err(ParseAddrError::TooManyParts);
}
Ok(res)
}
}
Note that we'll have to derive(Clone)
for a bunch of other types along the way -
I'm not going to mention all of these by name. As far as I'm concerned, almost
everything in ersatz
should be cloneable.
Of course, at this point ersatz
doesn't compile, because we're missing a few fields we
knew we were going to need:
$ cargo run
error[E0609]: no field `nic` on type `&Interface`
--> C:\msys64\home\amos\ftl\ersatz\src\lib.rs:109:29
|
109 | .as_packet(self.nic.address, addr.clone())
| ^^^
error[E0609]: no field `nic` on type `&Interface`
--> C:\msys64\home\amos\ftl\ersatz\src\lib.rs:111:29
|
111 | .as_frame(&self.nic, self.gateway_mac)
| ^^^
error[E0609]: no field `gateway_mac` on type `&Interface`
--> C:\msys64\home\amos\ftl\ersatz\src\lib.rs:111:39
|
111 | .as_frame(&self.nic, self.gateway_mac)
| ^^^^^^^^^^^
error[E0609]: no field `iface` on type `&Interface`
--> C:\msys64\home\amos\ftl\ersatz\src\lib.rs:112:24
|
112 | .send(self.iface.as_ref())?;
| ^^^^^
error[E0609]: no field `iface` on type `&Interface`
--> C:\msys64\home\amos\ftl\ersatz\src\lib.rs:113:14
|
113 | self.iface.as_ref().flush();
| ^^^^^
nic
and gateway_mac
are easy. Interface
should definitely own them:
// in `src/ersatz/lib.rs`
pub struct Interface {
nic: netinfo::NIC,
gateway_mac: ethernet::Addr,
}
But for iface
, we need to think a little...
We know we want to let users of ersatz
look at incoming ipv4 packets, until a condition
of their choice is satisfied. In other words, we want to have a list of listeners that
can unsubscribe by returning something.
// in `src/ersatz/lib.rs`
struct PendingQueries {
arp: HashMap<ipv4::Addr, mpsc::Sender<ethernet::Addr>>,
ipv4: Vec<Box<dyn Fn(&ipv4::Packet) -> bool>>,
}
I think it makes sense for PendingQueries
to own those functions. They're also
probably all going to have a different concrete type, hence the Box
.
Just like before, when ersatz
was a binary, we're going to have to start a thread
that receives packets and processes them, so open_default()
should look a little
something like:
// in `src/ersatz/lib.rs`
impl Interface {
pub fn open_default() -> Result<Self, error::Error> {
let nic = netinfo::default_nic()?;
let lib = open_best_library()?;
let iface_name = format!(r#"\Device\NPF_{}"#, nic.guid);
let iface = lib.open_interface(&iface_name)?;
let pending = PendingQueries {
arp: HashMap::new(),
ipv4: Vec::new(),
};
std::thread::spawn(|| {
iface.loop_infinite_dyn(&mut |packet| {
// TODO: parse raw packet, filter only IPv4
let packet: ipv4::Packet = unimplemented!();
for f in pending.ipv4 {
if f(&packet) {
// TODO: remove from pending.ipv4
}
}
})
});
unimplemented!()
}
}
So far so good, yes?
I mean at this point I'm a bit worried whether or not the thread is ever
going to end but that seems like a problem for later so let's just continue and add
an iface
field to our Interface
struct.
Let's see... the declaration of open_interface
in rawsock
looks like this:
// rawsock internals
pub trait Library {
fn open_interface<'a>(
&'a self,
name: &str
) -> Result<Box<dyn DynamicInterface<'a> + 'a>, Error>;
}
Mhhh okay, so our interface is going to have a lifetime, that's fair - but it's
in a Box
right? So we can just do this:
// in `src/ersatzs/lib.rs`
pub struct Interface<'a> {
nic: netinfo::NIC,
gateway_mac: ethernet::Addr,
iface: Box<dyn rawsock::traits::DynamicInterface<'a>>,
}
And then (forgetting about the thread for now):
// in `src/ersatzs/lib.rs`
impl<'a> Interface<'a> {
pub fn open_default() -> Result<Self, error::Error> {
let nic = netinfo::default_nic()?;
let lib = open_best_library()?;
let iface_name = format!(r#"\Device\NPF_{}"#, nic.guid);
let iface = lib.open_interface(&iface_name)?;
let res = Self {
nic,
gateway_mac: ethernet::Addr::zero(),
iface,
};
Ok(res)
}
}
$ cargo run
Compiling ersatz v0.1.0 (C:\msys64\home\amos\ftl\ersatz)
error[E0495]: cannot infer an appropriate lifetime due to conflicting requirements
--> C:\msys64\home\amos\ftl\ersatz\src\lib.rs:41:21
|
41 | let iface = lib.open_interface(&iface_name)?;
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
note: first, the lifetime cannot outlive the lifetime 'a as defined on the impl at 36:6...
--> C:\msys64\home\amos\ftl\ersatz\src\lib.rs:36:6
|
36 | impl<'a> Interface<'a> {
| ^^
= note: ...so that the types are compatible:
expected dyn rawsock::traits::DynamicInterface<'a>
found dyn rawsock::traits::DynamicInterface<'_>
= note: but, the lifetime must be valid for the static lifetime...
= note: ...so that the expression is assignable:
expected std::boxed::Box<(dyn rawsock::traits::DynamicInterface<'a> + 'static)>
found std::boxed::Box<dyn rawsock::traits::DynamicInterface<'a>>
Huh. No, that doesn't work at all.
But if I look at it sideways I can sort of understand what rustc is trying to tell us:
we've seen from the signature of Library::open_interface
that the returned interface's
lifetime was bound to the library's lifetime. And we're dropping the library when returning
from open_default()
.
Well - says the fool - that's easy! We'll just store lib
and iface
together in the
same struct, so their lifetimes will definitely be tied:
// in `src/ersatzs/lib.rs`
pub struct Interface<'a> {
nic: netinfo::NIC,
gateway_mac: ethernet::Addr,
// we're using "+ 'a" here to signify that not only should the trait
// object implement "Library", it should also have a lifetime of "'a"
lib: Box<dyn rawsock::traits::Library + 'a>,
iface: Box<dyn rawsock::traits::DynamicInterface<'a> + 'a>,
}
impl<'a> Interface<'a> {
pub fn open_default() -> Result<Self, error::Error> {
let nic = netinfo::default_nic()?;
let lib = open_best_library()?;
let iface_name = format!(r#"\Device\NPF_{}"#, nic.guid);
let iface = lib.open_interface(&iface_name)?;
let res = Self {
nic,
gateway_mac: ethernet::Addr::zero(),
lib,
iface,
};
Ok(res)
}
}
There, that's gotta work, I'm pretty sure:
$ cargo run
error[E0597]: `*lib` does not live long enough
--> C:\msys64\home\amos\ftl\ersatz\src\lib.rs:42:21
|
37 | impl<'a> Interface<'a> {
| -- lifetime `'a` defined here
...
42 | let iface = lib.open_interface(&iface_name)?;
| ^^^----------------------------
| |
| borrowed value does not live long enough
| argument requires that `*lib` is borrowed for `'a`
...
134 | }
| - `*lib` dropped here while still borrowed
error[E0505]: cannot move out of `lib` because it is borrowed
--> C:\msys64\home\amos\ftl\ersatz\src\lib.rs:52:13
|
37 | impl<'a> Interface<'a> {
| -- lifetime `'a` defined here
...
42 | let iface = lib.open_interface(&iface_name)?;
| -------------------------------
| |
| borrow of `*lib` occurs here
| argument requires that `*lib` is borrowed for `'a`
...
52 | lib,
| ^^^ move out of `lib` occurs here
error: aborting due to 2 previous errors
Oh. That's unfortunate.
Why doesn't that work?
It's actually rather simple.
The reason rawsock::traits::DynamicInterface
's lifetime is tied to rawsock::traits::Library
is because
the former holds a reference to the latter. And the thing that lifetimes do is not track how long
a value is valid at all - it tracks how long a value is valid at its current memory location.
lib
starts out as a local, so it begins its existence somewhere on the stack:
Then we create iface
, which has a pointer to lib
- also on the stack:
And then we return a struct that contains both lib
and iface
... and the memory
to which iface
's internal pointer refers to no longer contains lib
!
For this last diagram I've chosen to move iface
first, conceptually - but we get
a similar result if we move lib
first:
So it's not a limitation of the borrow checker - it's the system working as intended,
preventing us from causing a crash whenever the implementation of rawsock::traits::DynamicInterface
will want to use its pointer to the implementation of rawsock::traits::Library
.
One way to address that would be to have a design similar to rawsock's: first
force the users of the ersatz
crate to open a library, then hold on to it
and use it to obtain an Interface
.
But I liked our interface from earlier and I don't want to depart from it. So what
we can do instead is have our Library
be valid for 'static
- the duration of the
program. How do we do that? We can use once_cell
!
// in `ersatz/src/lib.rs`
use once_cell::sync::Lazy;
static RAWSOCK_LIB: Lazy<Box<dyn rawsock::traits::Library>> =
Lazy::new(|| open_best_library().unwrap());
Note that this will panic if we can't open the best library. If we wanted to return an error instead, we'd have to adjust our design slightly.
But for now, this works great - drop the lib
from our Interface
struct, and
it compiles:
pub struct Interface<'a> {
nic: netinfo::NIC,
gateway_mac: ethernet::Addr,
iface: Box<dyn rawsock::traits::DynamicInterface<'a> + 'a>,
}
impl<'a> Interface<'a> {
pub fn open_default() -> Result<Self, error::Error> {
let nic = netinfo::default_nic()?;
let iface_name = format!(r#"\Device\NPF_{}"#, nic.guid);
let iface = RAWSOCK_LIB.open_interface(&iface_name)?;
let res = Self {
nic,
gateway_mac: ethernet::Addr::zero(),
iface,
};
Ok(res)
}
}
Now, we've sorta lost track of pending
in all that jazz, and we definitely
need it in expect_ipv4
, so let's add it back:
pub struct Interface<'a> {
nic: netinfo::NIC,
gateway_mac: ethernet::Addr,
iface: Box<dyn rawsock::traits::DynamicInterface<'a> + 'a>,
pending: PendingQueries,
}
struct PendingQueries {
arp: HashMap<ipv4::Addr, mpsc::Sender<ethernet::Addr>>,
ipv4: Vec<Box<dyn Fn(&ipv4::Packet) -> bool>>,
}
impl<'a> Interface<'a> {
pub fn open_default() -> Result<Self, error::Error> {
let nic = netinfo::default_nic()?;
let iface_name = format!(r#"\Device\NPF_{}"#, nic.guid);
let iface = RAWSOCK_LIB.open_interface(&iface_name)?;
let pending = PendingQueries {
arp: HashMap::new(),
ipv4: Vec::new(),
};
let res = Self {
pending,
nic,
gateway_mac: ethernet::Addr::zero(),
iface,
};
Ok(res)
}
}
Good, good, this still compiles. Let's try implementing expect_ipv4
.
Remember we settled on the following scheme:
ersatz
users can pass a listenerFn
that returns anOption<T>
expect_ipv4
returns aReceiver<T>
, that receivesval
as soon as the listener returnsSome(val)
pending
contains a list of functions that returntrue
when they want to unsubscribe.
It follows that we can't just stuff the listener Fn
into pending's Vec
,
we'll have to make another closure inside expect_ipv4
, like so:
impl Interface {
pub fn expect_ipv4<F, T>(&self, f: F) -> mpsc::Receiver<T>
where
F: Fn(&ipv4::Packet) -> Option<T>,
{
let (tx, rx) = mpsc::channel();
self.pending.ipv4.push(|packet| {
match f(&packet) {
Some(val) => {
tx.send(val).unwrap_or(()); // ignore send errors
true
}
None => false,
}
});
rx
}
}
I like the look of that!
$ cargo run --quiet
error[E0308]: mismatched types
--> C:\msys64\home\amos\ftl\ersatz\src\lib.rs:155:32
|
155 | self.pending.ipv4.push(|packet| {
| ________________________________^
156 | | match f(&packet) {
157 | | Some(val) => {
158 | | tx.send(val).unwrap_or(()); // ignore send errors
... |
162 | | }
163 | | });
| |_________^ expected struct `std::boxed::Box`, found closure
|
= note: expected type `std::boxed::Box<(dyn for<'r> std::ops::Fn(&'r ipv4::Packet) -> bool + 'static)>`
found type `[closure@C:\msys64\home\amos\ftl\ersatz\src\lib.rs:155:32: 163:10 f:_, tx:_]`
= note: for more on the distinction between the stack and the heap, read https://doc.rust-lang.org/book/ch15-01-box.html, https://doc.rust-lang.org/rust-by-example/std/box.html, and https://doc.rust-lang.org/std/boxed/index.html
help: store this in the heap by calling `Box::new`
|
155 | self.pending.ipv4.push(Box::new(|packet| {
156 | match f(&packet) {
157 | Some(val) => {
158 | tx.send(val).unwrap_or(()); // ignore send errors
159 | true
160 | }
...
error: aborting due to previous error
Oh, right, of course, it's a Vec<Box<dyn Fn>>
, not a Vec<dyn Fn>
- we need to box
that closure ourselves:
pub fn expect_ipv4<F, T>(&self, f: F) -> mpsc::Receiver<T>
where
F: Fn(&ipv4::Packet) -> Option<T>,
{
let (tx, rx) = mpsc::channel();
// new! Box::new(...)
self.pending.ipv4.push(Box::new(|packet| {
match f(&packet) {
Some(val) => {
tx.send(val).unwrap_or(()); // ignore send errors
true
}
None => false,
}
}));
rx
}
It always feels nice to do one little mistake, have the compiler catch you, fix it, and then everything works. If it works the first time, it's kinda freaky - but one little mistake is good. Reassuring.
So now that we've fixed it, of course, everything works:
$ cargo run --quiet
error[E0310]: the parameter type `F` may not live long enough
--> C:\msys64\home\amos\ftl\ersatz\src\lib.rs:155:32
|
149 | pub fn expect_ipv4<F, T>(&self, f: F) -> mpsc::Receiver<T>
| - help: consider adding an explicit lifetime bound `F: 'static`...
...
155 | self.pending.ipv4.push(Box::new(|packet| {
| ________________________________^
156 | | match f(&packet) {
157 | | Some(val) => {
158 | | tx.send(val).unwrap_or(()); // ignore send errors
... |
162 | | }
163 | | }));
| |__________^
|
note: ...so that the type `[closure@C:\msys64\home\amos\ftl\ersatz\src\lib.rs:155:41: 163:10 f:&F, tx:&std::sync::mpsc::Sender<T>]` will meet its required lifetime bounds
--> C:\msys64\home\amos\ftl\ersatz\src\lib.rs:155:32
|
155 | self.pending.ipv4.push(Box::new(|packet| {
| ________________________________^
156 | | match f(&packet) {
157 | | Some(val) => {
158 | | tx.send(val).unwrap_or(()); // ignore send errors
... |
162 | | }
163 | | }));
| |__________^
error[E0310]: the parameter type `T` may not live long enough
--> C:\msys64\home\amos\ftl\ersatz\src\lib.rs:155:32
|
149 | pub fn expect_ipv4<F, T>(&self, f: F) -> mpsc::Receiver<T>
| - help: consider adding an explicit lifetime bound `T: 'static`...
...
155 | self.pending.ipv4.push(Box::new(|packet| {
| ________________________________^
156 | | match f(&packet) {
157 | | Some(val) => {
158 | | tx.send(val).unwrap_or(()); // ignore send errors
... |
162 | | }
163 | | }));
| |__________^
|
note: ...so that the type `[closure@C:\msys64\home\amos\ftl\ersatz\src\lib.rs:155:41: 163:10 f:&F, tx:&std::sync::mpsc::Sender<T>]` will meet its required lifetime bounds
--> C:\msys64\home\amos\ftl\ersatz\src\lib.rs:155:32
|
155 | self.pending.ipv4.push(Box::new(|packet| {
| ________________________________^
156 | | match f(&packet) {
157 | | Some(val) => {
158 | | tx.send(val).unwrap_or(()); // ignore send errors
... |
162 | | }
163 | | }));
| |__________^
error: aborting due to 2 previous errors
Ah, uh, not quite.
Why does it want the 'static
lifetime here though? We sure didn't write it down anywhere.
Let's look at the definition of PendingQueries
again:
struct PendingQueries {
arp: HashMap<ipv4::Addr, mpsc::Sender<ethernet::Addr>>,
ipv4: Vec<Box<dyn Fn(&ipv4::Packet) -> bool>>,
}
Ah, there it is - Box<T>
without an explicit lifetime is equivalent to Box<T + 'static>
.
We could try adding lifetimes everywhere?
struct PendingQueries<'a> {
arp: HashMap<ipv4::Addr, mpsc::Sender<ethernet::Addr>>,
ipv4: Vec<Box<dyn Fn(&ipv4::Packet) -> bool + 'a>>,
}
pub struct Interface<'a> {
// omitted: other fields
// new: <'a>
pending: PendingQueries<'a>,
}
impl<'a> Interface<'a> {
pub fn expect_ipv4<F, T>(&self, f: F) -> mpsc::Receiver<T>
where
F: Fn(&ipv4::Packet) -> Option<T> + 'a, // new: explicit lifetime here
T: 'a, // and also here
{
let (tx, rx) = mpsc::channel();
self.pending.ipv4.push(Box::new(|packet| {
match f(&packet) {
Some(val) => {
tx.send(val).unwrap_or(()); // ignore send errors
true
}
None => false,
}
}));
rx
}
}
And now, everything works:
$ cargo run --quiet
error[E0596]: cannot borrow `self.pending.ipv4` as mutable, as it is behind a `&` reference
--> C:\msys64\home\amos\ftl\ersatz\src\lib.rs:156:9
|
149 | pub fn expect_ipv4<F, T>(&self, f: F) -> mpsc::Receiver<T>
| ----- help: consider changing this to be a mutable reference: `&mut self`
...
156 | self.pending.ipv4.push(Box::new(|packet| {
| ^^^^^^^^^^^^^^^^^ `self` is a `&` reference, so the data it refers to cannot be borrowed as mutable
Mh, fair enough, push
mutates self.pending.ipv4
, let's take &mut self
:
// was: &self
pub fn expect_ipv4<F, T>(&mut self, f: F) -> mpsc::Receiver<T>
where
F: Fn(&ipv4::Packet) -> Option<T> + 'a,
T: 'a,
{
// omitted: body
}
And now, everything w..:
$ cargo run --quiet
error[E0373]: closure may outlive the current function, but it borrows `tx`, which is owned by the current function
--> C:\msys64\home\amos\ftl\ersatz\src\lib.rs:156:41
|
41 | impl<'a> Interface<'a> {
| -- lifetime `'a` defined here
...
156 | self.pending.ipv4.push(Box::new(|packet| {
| ^^^^^^^^ may outlive borrowed value `tx`
...
159 | tx.send(val).unwrap_or(()); // ignore send errors
| -- `tx` is borrowed here
|
Errr okay, okay, you keep making good points rustc. We don't actually want our closure
to borrow tx
and f
, we want it to take ownership of them. In other words, we want
tx
and f
to move into that closure.
So we can make it a move closure:
pub fn expect_ipv4<F, T>(&mut self, f: F) -> mpsc::Receiver<T>
where
F: Fn(&ipv4::Packet) -> Option<T> + 'a,
T: 'a,
{
let (tx, rx) = mpsc::channel();
// was: `|packet|`, now `move |packet|`
self.pending.ipv4.push(Box::new(move |packet| {
match f(&packet) {
Some(val) => {
tx.send(val).unwrap_or(()); // ignore send errors
true
}
None => false,
}
}));
rx
}
Alright, FINALLY, this compiles, so the last piece - we should be easy - is to re-add that
std::thread
in Interface::open_default
.
Surely that won't cause any problems:
impl<'a> Interface<'a> {
pub fn open_default() -> Result<Self, error::Error> {
let nic = netinfo::default_nic()?;
let iface_name = format!(r#"\Device\NPF_{}"#, nic.guid);
let iface = RAWSOCK_LIB.open_interface(&iface_name)?;
let pending = PendingQueries {
arp: HashMap::new(),
ipv4: Vec::new(),
};
std::thread::spawn(move || {
iface
.loop_infinite_dyn(&mut |packet| {
let frame = match ethernet::Frame::parse(packet) {
Ok((_, frame)) => frame,
_ => return,
};
if let ethernet::Payload::IPv4(ref packet) = frame.payload {
let mut idx = None;
// try all listeners in order
for (i, f) in pending.ipv4.iter().enumerate() {
if f(&packet) {
// break if one wanted to unsubscribe
idx = Some(i);
break;
}
}
if let Some(idx) = idx {
// unsubscribe
pending.ipv4.remove(idx);
}
}
})
.unwrap();
});
let res = Self {
pending,
nic,
gateway_mac: ethernet::Addr::zero(),
iface,
};
Ok(res)
}
}
Good! Well I'm not super happy about the unsubscribe logic but at least we finally have a proof of concept that compi...
$ cargo run
Compiling ersatz v0.1.0 (C:\msys64\home\amos\ftl\ersatz)
error[E0277]: `dyn for<'r> std::ops::Fn(&'r ipv4::Packet) -> bool` cannot be sent between threads safely
--> C:\msys64\home\amos\ftl\ersatz\src\lib.rs:52:9
|
52 | std::thread::spawn(move || {
| ^^^^^^^^^^^^^^^^^^ `dyn for<'r> std::ops::Fn(&'r ipv4::Packet) -> bool` cannot be sent between threads safely
|
= help: the trait `std::marker::Send` is not implemented for `dyn for<'r> std::ops::Fn(&'r ipv4::Packet) -> bool`
= note: required because of the requirements on the impl of `std::marker::Send` for `std::ptr::Unique<dyn for<'r> std::ops::Fn(&'r ipv4::Packet) -> bool>`
= note: required because it appears within the type `std::boxed::Box<dyn for<'r> std::ops::Fn(&'r ipv4::Packet) -> bool>`
= note: required because of the requirements on the impl of `std::marker::Send` for `std::ptr::Unique<std::boxed::Box<dyn for<'r> std::ops::Fn(&'r ipv4::Packet) -> bool>>`
= note: required because it appears within the type `alloc::raw_vec::RawVec<std::boxed::Box<dyn for<'r> std::ops::Fn(&'r ipv4::Packet) -> bool>>`
= note: required because it appears within the type `std::vec::Vec<std::boxed::Box<dyn for<'r> std::ops::Fn(&'r ipv4::Packet) -> bool>>`
= note: required because it appears within the type `PendingQueries<'_>`
= note: required because it appears within the type `[closure@C:\msys64\home\amos\ftl\ersatz\src\lib.rs:52:28: 77:10 iface:std::boxed::Box<dyn rawsock::traits::DynamicInterface<'_>>, pending:PendingQueries<'_>]`
error: aborting due to previous error
Oh. Ohhh no. It doesn't compile at all.
It's telling us dyn for<'r> std::ops::Fn(&'r ipv4::Packet) -> bool
cannot be sent between threads
safely, and then something about std::marker::Send
.
From the nomicon:
Send and Sync
Not everything obeys inherited mutability, though. Some types allow you to have multiple aliases of a location in memory while mutating it. Unless these types use synchronization to manage this access, they are absolutely not thread-safe. Rust captures this through the Send and Sync traits.
- A type is
Send
if it is safe to send it to another thread.- A type is
Sync
if it is safe to share between threads (&T
isSend
).
Right. It doesn't matter that the actual Fn
we're storing in PendingQueries
right now
can be safely sent to another thread - the problem is that, with our current types, the
possibility of an Fn
that cannot be safely sent to another thread may be stored in there.
And we are sending it to another thread, because we're using those function
from a closure we pass to std::thread::spawn
.
So, we can simply add a constraint that those Fn
s must be Send
:
struct PendingQueries<'a> {
arp: HashMap<ipv4::Addr, mpsc::Sender<ethernet::Addr>>,
// new: "+ Send"
ipv4: Vec<Box<dyn Fn(&ipv4::Packet) -> bool + Send + 'a>>,
}
Moving on, we now get this error:
$ cargo run --quiet
error[E0277]: `F` cannot be sent between threads safely
--> C:\msys64\home\amos\ftl\ersatz\src\lib.rs:183:32
|
183 | self.pending.ipv4.push(Box::new(move |packet| {
| ________________________________^
184 | | match f(&packet) {
185 | | Some(val) => {
186 | | tx.send(val).unwrap_or(()); // ignore send errors
... |
190 | | }
191 | | }));
| |__________^ `F` cannot be sent between threads safely
|
= help: within `[closure@C:\msys64\home\amos\ftl\ersatz\src\lib.rs:183:41: 191:10 f:F, tx:std::sync::mpsc::Sender<T>]`, the trait `std::marker:
:Send` is not implemented for `F`
= help: consider adding a `where F: std::marker::Send` bound
= note: required because it appears within the type `[closure@C:\msys64\home\amos\ftl\ersatz\src\lib.rs:183:41: 191:10 f:F, tx:std::sync::mpsc:
:Sender<T>]`
= note: required for the cast to the object type `dyn for<'r> std::ops::Fn(&'r ipv4::Packet) -> bool + std::marker::Send`
Alright, same thing. This is in expect_ipv4
- we're making a closure that
adapts our Fn(&ipv4::Packet) -> Option<T>
into a Fn(&ipv4::Packet) -> bool
. It stands to reason that anything our Fn(&ipv4::Packet) -> bool
captures must also be Send
- since we're sending a closure's context along
with its code.
We need a few more constraints:
pub fn expect_ipv4<F, T>(&mut self, f: F) -> mpsc::Receiver<T>
where
F: Fn(&ipv4::Packet) -> Option<T> + Send + 'a, // new: "+ Send"
T: Send + 'a, // also here
{
let (tx, rx) = mpsc::channel();
self.pending.ipv4.push(Box::new(move |packet| {
match f(&packet) {
Some(val) => {
tx.send(val).unwrap_or(()); // ignore send errors
true
}
None => false,
}
}));
rx
}
And now, everythi..
$ cargo run
Compiling ersatz v0.1.0 (C:\msys64\home\amos\ftl\ersatz)
error[E0495]: cannot infer an appropriate lifetime due to conflicting requirements
--> C:\msys64\home\amos\ftl\ersatz\src\lib.rs:45:21
|
45 | let iface = RAWSOCK_LIB.open_interface(&iface_name)?;
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
note: first, the lifetime cannot outlive the lifetime 'a as defined on the impl at 41:6...
--> C:\msys64\home\amos\ftl\ersatz\src\lib.rs:41:6
|
41 | impl<'a> Interface<'a> {
| ^^
= note: ...so that the types are compatible:
expected dyn rawsock::traits::DynamicInterface<'a>
found dyn rawsock::traits::DynamicInterface<'_>
= note: but, the lifetime must be valid for the static lifetime...
note: ...so that the type `[closure@C:\msys64\home\amos\ftl\ersatz\src\lib.rs:52:28: 77:10 iface:std::boxed::Box<dyn rawsock::traits::DynamicInterface<'_>>, pending:PendingQueries<'_>]` will meet its required lifetime bounds
--> C:\msys64\home\amos\ftl\ersatz\src\lib.rs:52:9
|
52 | std::thread::spawn(move || {
| ^^^^^^^^^^^^^^^^^^
error: aborting due to previous error
Aw, no! I thought we solved that one!
Wait, no we didn't - not really anyway. We only made sure that our Library
had the 'static
lifetime, but we kind of, uh, gave the lifetime of
rawsock::traits::DynamicInterface
a name, 'a
, and promptly forgot about it.
But as-is, there's no guarantee the thread will ever end! It might run until the
whole application exits - way after ersatz::Interface
has been dropped. In other
words, it can outlive 'a
- and that's what rustc is warning us about.
Which, to be honest, is a bit of a pickle. Because we now have two things, one with lifetime
'a
(the Interface
struct) and one with lifetime 'static
(the packet processor thread),
both of which hold a reference to the same thing:
Assuming we could "turn off" the borrow checker, we'd be fine if the thread
finishes first. Then whenever Library
gets dropped, the interface would be
closed and freed. But there's nothing that stops the thread if Library
gets
dropped first. And that's (among other things) what rustc is telling us.
How can we solve that?
Well, we can pull the same trick we did earlier - we can make the interface a static,
and initialize it lazily using once_cell
. But I don't like that idea. I can more or less
stomach the fact that a library like npcap is something we open once and then can use
from wherever, but I don't like the idea of one iface.break_loop()
call
affecting someone else's iface.loop_infinite_dyn()
call.
In other words, rawsock::traits::DynamicInterface
is stateful, and it should not
be shared (even though all its methods have &self
as receiver - which
normally means it's all thread-safe).
Luckily, there's another way.
When you don't know how many references there's going to be to some value, but you definitely want it to be freed only when there are no references left to it, you can use reference counting.
A quick search in the Rust standard library documentation
turns up std::rc::Rc
:
A single-threaded reference-counting pointer. 'Rc' stands for 'Reference Counted'.
Mh. We're definitely using rawsock::traits::DynamicInterface
from several threads. Isn't
there something thread-safe? Let's look at std::sync::Arc
:
A thread-safe reference-counting pointer. 'Arc' stands for 'Atomically Reference Counted'.
The type
Arc<T>
provides shared ownership of a value of typeT
, allocated in the heap. Invokingclone
onArc
produces a newArc
instance, which points to the same value on the heap as the source Arc, while increasing a reference count. When the lastArc
pointer to a given value is destroyed, the pointed-to value is also destroyed.Shared references in Rust disallow mutation by default, and
Arc
is no exception: you cannot generally obtain a mutable reference to something inside anArc
. If you need to mutate through anArc
, useMutex
,RwLock
, or one of theAtomic
types.
Heyyy, that sounds exactly like what the doctor ordered!
You can use an Arc<T>
like you would use a Box<T>
(they're both smart pointers), so let's
just perform a quick swaperoo:
pub struct Interface<'a> {
nic: netinfo::NIC,
gateway_mac: ethernet::Addr,
iface: Arc<dyn rawsock::traits::DynamicInterface<'a> + 'a>,
pending: PendingQueries<'a>,
}
$ cargo run
Compiling ersatz v0.1.0 (C:\msys64\home\amos\ftl\ersatz)
error[E0308]: mismatched types
--> C:\msys64\home\amos\ftl\ersatz\src\lib.rs:83:13
|
83 | iface,
| ^^^^^ expected struct `std::sync::Arc`, found struct `std::boxed::Box`
|
= note: expected type `std::sync::Arc<(dyn rawsock::traits::DynamicInterface<'a> + 'a)>`
found type `std::boxed::Box<dyn rawsock::traits::DynamicInterface<'_>>`
error: aborting due to previous error
Ah. Of course. This bit of code is now problematic:
pub fn open_default() -> Result<Self, error::Error> {
// (cut)
let iface = RAWSOCK_LIB.open_interface(&iface_name)?;
// (cut)
let res = Self {
pending,
nic,
gateway_mac: ethernet::Addr::zero(),
iface, // <- wrong type here
};
Ok(res)
}
In other words, we need rawsock
itself to support returning an Arc
, rather than a Box
.
Well, does it?
// rawsock internals
pub trait Library {
fn open_interface_arc<'a>(
&'a self,
name: &str
) -> Result<Arc<dyn DynamicInterface<'a> + 'a>, Error>;
}
It does! It's almost like.. the rawsock
crate was carefully designed? To
support exactly this use-case?
Very well, let's switch to it:
pub fn open_default() -> Result<Self, error::Error> {
// (cut)
// was: open_interface, now: open_interface_arc
let iface = RAWSOCK_LIB.open_interface_arc(&iface_name)?;
// (cut)
}
$ cargo run --quiet
error[E0495]: cannot infer an appropriate lifetime due to conflicting requirements
--> C:\msys64\home\amos\ftl\ersatz\src\lib.rs:45:21
|
45 | let iface = RAWSOCK_LIB.open_interface_arc(&iface_name)?;
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
note: first, the lifetime cannot outlive the lifetime 'a as defined on the impl at 41:6...
--> C:\msys64\home\amos\ftl\ersatz\src\lib.rs:41:6
|
41 | impl<'a> Interface<'a> {
| ^^
= note: ...so that the types are compatible:
expected dyn rawsock::traits::DynamicInterface<'a>
found dyn rawsock::traits::DynamicInterface<'_>
= note: but, the lifetime must be valid for the static lifetime...
note: ...so that the type `[closure@C:\msys64\home\amos\ftl\ersatz\src\lib.rs:52:28: 77:10 iface:std::sync::Arc<dyn rawsock::traits::DynamicInterface<'_>>, pending:PendingQueries<'_>]` will meet its required lifetime bounds
--> C:\msys64\home\amos\ftl\ersatz\src\lib.rs:52:9
|
52 | std::thread::spawn(move || {
| ^^^^^^^^^^^^^^^^^^
error: aborting due to previous error
Huh. I uh... I could've sworn we just solved that one. Wasn't Arc<T>
supposed to solve that one? I uh checks notes oh alright we'll read the error
message.
So what it says is basically that, our thread still runs for 'static
, so our DynamicInterface
should still live for 'static
and, uh, wait a minute, we gave it a less-than-static lifetime
in the definition of Interface
. Alright, that's easy to fix.
pub struct Interface<'a> {
// (cut: other fields)
// was:
iface: Arc<dyn rawsock::traits::DynamicInterface<'a> + 'a>,
// now:
iface: Arc<dyn rawsock::traits::DynamicInterface<'static>>,
pending: PendingQueries<'a>,
}
But WAIT A MINUTE. We don't need rustc to pick up all our mistakes. We also
use pending
in that thread. And it says right here, that pending
's
lifetime is 'a
. We're for sure going to get an error here as well.
Why does PendingQueries
need an explicit lifetime again?
struct PendingQueries<'a> {
arp: HashMap<ipv4::Addr, mpsc::Sender<ethernet::Addr>>,
ipv4: Vec<Box<dyn Fn(&ipv4::Packet) -> bool + Send + 'a>>,
}
Oh right. Our closures.
Well, is it unreasonable to ask that any Fn
passed to expect_ipv4
be valid for 'static
?
I don't think so. The expect use case is for those functions to do mostly pattern matching like: is it an IPv4 packet? From that host? Containing an ICMP packet? Which is an echo reply? etc.
I'm thinking those functions can probably take ownership of anything they need - like copies
of ipv4::Addr
, maybe some u16
, etc. It seems reasonable to ask that.
So we can rid PendingQueries
of its lifetime type parameter:
struct PendingQueries {
arp: HashMap<ipv4::Addr, mpsc::Sender<ethernet::Addr>>,
ipv4: Vec<Box<dyn Fn(&ipv4::Packet) -> bool + Send + 'static>>,
}
And, in turn, Interface
:
pub struct Interface {
nic: netinfo::NIC,
gateway_mac: ethernet::Addr,
iface: Arc<dyn rawsock::traits::DynamicInterface<'static>>,
pending: PendingQueries,
}
And WAIT A MINUTE, I'm fairly sure we're going to run into some more problems.
We access pending
from both expect_ipv4
(to add listeners) and from our "packet
processor thread" (to call listeners and maybe remove some if they want to unsubscribe).
And this happens from different threads.
So not only does pending
need to be valid for 'static
(because we access
it from a thread that has lifetime 'static
), we also need to protect it
from concurrent use.
As far as lifetimes are concerned, we can use an Arc
!
Let's see how that would work.
Note that Interface
no longer has a lifetime parameter ('a
), so we need to remove
it from the impl block as well.
pub struct Interface {
nic: netinfo::NIC,
gateway_mac: ethernet::Addr,
iface: Arc<dyn rawsock::traits::DynamicInterface<'static>>,
pending: Arc<PendingQueries>,
}
impl Interface {
pub fn open_default() -> Result<Self, error::Error> {
// cut: get `nic`, open `iface`
let pending = Arc::new(PendingQueries {
arp: HashMap::new(),
ipv4: Vec::new(),
});
let res = Self {
// create another reference to `pending`,
// which increase the reference count by one
pending: pending.clone(),
nic,
gateway_mac: ethernet::Addr::zero(),
iface,
};
std::thread::spawn(move || {
iface
.loop_infinite_dyn(&mut |packet| {
let frame = match ethernet::Frame::parse(packet) {
Ok((_, frame)) => frame,
_ => return,
};
if let ethernet::Payload::IPv4(ref packet) = frame.payload {
let mut idx = None;
// try all listeners in order
for (i, f) in pending.ipv4.iter().enumerate() {
if f(&packet) {
// break if one wanted to unsubscribe
idx = Some(i);
break;
}
}
if let Some(idx) = idx {
// unsubscribe
pending.ipv4.remove(idx);
}
}
})
.unwrap();
});
Ok(res)
}
}
The theory is sound, have we forgotten about anything?
$ cargo run --quiet
error[E0277]: `std::sync::mpsc::Sender<ethernet::Addr>` cannot be shared between threads safely
--> C:\msys64\home\amos\ftl\ersatz\src\lib.rs:58:9
|
58 | std::thread::spawn(move || {
| ^^^^^^^^^^^^^^^^^^ `std::sync::mpsc::Sender<ethernet::Addr>` cannot be shared between threads safely
|
= help: within `(ipv4::Addr, std::sync::mpsc::Sender<ethernet::Addr>)`, the trait `std::marker::Sync` is not implemented for `std::sync::mpsc::Sender<ethernet::Addr>`
= note: required because it appears within the type `(ipv4::Addr, std::sync::mpsc::Sender<ethernet::Addr>)`
= note: required because of the requirements on the impl of `std::marker::Sync` for `hashbrown::raw::RawTable<(ipv4::Addr, std::sync::mpsc::Sender<ethernet::Addr>)>`
= note: required because it appears within the type `hashbrown::map::HashMap<ipv4::Addr, std::sync::mpsc::Sender<ethernet::Addr>, std::collections::hash_map::RandomState>`
= note: required because it appears within the type `std::collections::HashMap<ipv4::Addr, std::sync::mpsc::Sender<ethernet::Addr>>`
= note: required because it appears within the type `PendingQueries`
= note: required because of the requirements on the impl of `std::marker::Send` for `std::sync::Arc<PendingQueries>`
= note: required because it appears within the type `[closure@C:\msys64\home\amos\ftl\ersatz\src\lib.rs:58:28: 83:10 iface:std::sync::Arc<dyn rawsock::traits::DynamicInterface<'_>>, pending:std::sync::Arc<PendingQueries>]`
error[E0277]: `(dyn for<'r> std::ops::Fn(&'r ipv4::Packet) -> bool + std::marker::Send + 'static)` cannot be shared between threads safely
--> C:\msys64\home\amos\ftl\ersatz\src\lib.rs:58:9
|
58 | std::thread::spawn(move || {
| ^^^^^^^^^^^^^^^^^^ `(dyn for<'r> std::ops::Fn(&'r ipv4::Packet) -> bool + std::marker::Send + 'static)` cannot be shared between threads safely
|
= help: the trait `std::marker::Sync` is not implemented for `(dyn for<'r> std::ops::Fn(&'r ipv4::Packet) -> bool + std::marker::Send + 'static)`
(cut: same requirements)
error: aborting due to 2 previous errors
Hey, that's Sync
! Send
's little sister! The nomicon told us earlier:
- A type is
Sync
if it is safe to share between threads (&T
isSend
).
Can mpsc::Sender<T>
ever be Send
? Can it be Sync
?
On its documentation page, we find:
// https://doc.rust-lang.org/std/sync/mpsc/struct.Sender.html
impl<T: Send> Send for Sender<T> {}
impl<T> !Sync for Sender<T> {}
So: it automatically becomes Send
if T
itself is Send
, but it can never be Sync
.
But, wait, why are we getting an error on std::thread::spawn
about a
Sender<T>
? We know is that the thread captures an Arc<PendingQueries>
,
and PendingQueries
currently looks like:
struct PendingQueries {
arp: HashMap<ipv4::Addr, mpsc::Sender<ethernet::Addr>>,
ipv4: Vec<Box<dyn Fn(&ipv4::Packet) -> bool + Send + 'static>>,
}
Right. There is a Sender<ethernet::Addr>
. It just so happens that
ethernet::Addr
is both Send
and Sync
(it's just data), but that doesn't
matter - an mpsc::Sender<T>
will never be Sync
- presumably because, for
performance reasons, it's single-threaded by default.
There's no lock inside of Sender
that would prevent data corruption from
happening if two threads called .send()
at the same time. You know what is
Sync
, and does a great job at doing that though?
std::sync::Mutex
!
I used it earlier in this article and didn't really explain why. Mutex
stands for "mutually exclusive" - it's a lock that only one thread can hold
at a time. It needs to release it before another one can acquire it.
We don't need a Mutex
for iface
, because we only ever need an immutable
reference to it - those are already Sync
, but we do need one for pending
,
which we modify from two different threads: the one we start in
open_default()
, that processes packets in the background, and the main
thread, from the code that uses ersatz
.
Next question: which order do they go in? The documentation for Arc
just
says "use a Mutex
". It doesn't tell us if we need to go Mutex<Arc<T>>
or
Arc<Mutex<T>>
.
Let's review: in order to use something in a Mutex
, you need to lock()
it.
// example code
let t: T;
let m = Mutex::new(t);
{
// this blocks if another thread holds the lock,
// it resumes when we successfully acquire it.
let m_guard = m.lock().unwrap();
// m_guard implements `Deref<Target=T>` and `DerefMut<Target=T>`
// so we can call methods on it:
m_guard.do_something()
// m_guard is dropped here, releasing the lock
}
So if we had a Mutex<Arc<T>>
and we wanted different threads to hold on to it,
we'd need to... lock it first? And then clone()
the Arc<T>
inside? And then...
I guess create another Mutex
to give to the other thread? But then we'd have two
locks, one per thread - which defeats the purpose, because then we wouldn't be
synchronizing anything.
// example code (bad)
let t: T;
let m = Mutex::new(Arc::new(t));
// uh-oh, second Mutex? that can't be right
let m_for_thread = Mutex::new(m.lock().unwrap().clone());
std::thread::spawn(move || {
// ...note that rustc is going to complain about us sharing
// an `Arc<T>` across threads anyway (if `T` isn't `Sync + Send`).
// which is the reason we need a `Mutex` in the first place!
})
From this, it's pretty clear we need an Arc<Mutex<T>>
, because we want:
multiple threads, to hold (counted) references to the same Mutex
,
protecting a single resource.
Let's give it a shot:
impl Interface {
pub fn open_default() -> Result<Self, error::Error> {
// cut: get nic, iface
let pending = Arc::new(Mutex::new(PendingQueries {
arp: HashMap::new(),
ipv4: Vec::new(),
}));
let res = Self {
// make a new a reference to the (same) mutex, all good
pending: pending.clone(),
// make a new reference to the iface, still good
iface: iface.clone(),
// omitted: other fields
};
std::thread::spawn(move || {
iface
.loop_infinite_dyn(&mut |packet| {
let frame = /* omitted: parsing */;
if let ethernet::Payload::IPv4(ref packet) = frame.payload {
let mut idx = None;
// only if the packet is IPv4 do we need to lock `pending`
let mut pending = pending.lock().unwrap();
// now we can read/write to/from pending.ipv4
for (i, f) in pending.ipv4.iter().enumerate() {
// etc.
}
// cut: unsubscribe listeners
}
})
.unwrap();
});
Ok(res)
}
}
We also need to change expect_ipv4
to make it acquire the lock:
// new: we don't need to take `&mut self` anymore, just `&self`!
pub fn expect_ipv4<F, T>(&self, f: F) -> mpsc::Receiver<T>
where
F: Fn(&ipv4::Packet) -> Option<T> + Send + 'static,
T: Send + 'static,
{
let (tx, rx) = mpsc::channel();
// new: acquire lock
let mut pending = self.pending.lock().unwrap();
pending.ipv4.push(Box::new(move |packet| {
// etc.
}));
rx
}
...and just like that, it compiles.
No, no, I'm serious! I know we've joked a lot, but this time, for real, it does compile.
I've taken the liberty of fleshing out sup
's main()
function to be feature-complete:
// in `sup/src/main.rs`
fn main() -> Result<(), Box<dyn Error>> {
let arg = env::args().nth(1).unwrap_or_else(|| {
println!("Usage: sup DEST");
process::exit(1);
});
let dest = arg.parse()?;
let iface = Interface::open_default()?;
// this should really be random, but oh well.
let identifier = 0xC0DE;
// always send the same data, a-la Windows ping
let data = "O Romeo.";
println!("Pinging {:?} with {} bytes of data:", dest, data.len());
// ping 4 times - this should really be configurable.
// also, have the sequence_number be increasing.
// fun fact: I tried without, and 8.8.8.8 straight up refused
// to respond to the second request with the same sequence number!
for sequence_number in 0..4 {
let echo_request = ersatz::icmp::Echo {
identifier,
sequence_number,
}
.as_echo_request(data)
.as_ipv4_payload();
// measure ping time - more on that later.
let before = Instant::now();
// install our listener *before* we send our echo request,
// so we're sure not to miss it!
let rx = iface.expect_ipv4(move |packet| {
// we can un-nest that by using a `match` with a catch-all
// arm that returns instead. I have no strong feelings one way
// or the other.
if let ipv4::Payload::ICMP(ref icmp_packet) = packet.payload {
if let icmp::Header::EchoReply(ref reply) = icmp_packet.header {
if reply.identifier == identifier && reply.sequence_number == sequence_number {
return Some((before.elapsed(), packet.clone()));
}
}
}
None
});
// send our echo request
iface.send_ipv4(echo_request, &dest)?;
// wait up to 3 seconds for a reply
match rx.recv_timeout(Duration::from_secs(3)) {
Ok((elapsed, packet)) => {
// Even though we know it's an Echo Reply ICMP packet, we still
// need to do pattern matching here.
// We could go another route and have a custom type with just the
// info we need (or even print from our `expect_ipv4` closure) to
// avoid that.
// It's just nice to know listeners can return packets unadulterated!
if let ipv4::Payload::ICMP(ref icmp_packet) = packet.payload {
if let icmp::Header::EchoReply(_) = icmp_packet.header {
println!(
"Reply from {:?}: bytes={} time={:?} TTL={}",
packet.src,
icmp_packet.payload.0.len(),
elapsed,
packet.ttl,
);
}
}
}
Err(_) => {
// give up and go home
println!("Timed out!");
process::exit(1);
}
}
// wait a bit before pinging again. We don't want
// to DoS Google, now do we? *laugh track*
std::thread::sleep(Duration::from_secs(1));
}
Ok(())
}
Pretty neat, right?
Now all of that compiles (pinky swear).
It even runs:
$ cargo run --quiet -- 8.8.8.8
Pinging 8.8.8.8 with 8 bytes of data:
Timed out!
Of course, it doesn't work. But it does compile and run.
Which, as far as I'm concerned.. I mean, my job here is done, right? You figure it out.
Cool bear's hot tip
Hey uhh I found an gateway_mac: ethernet::Addr::zero()
, is it yours?
Oooohhhhhh. Right. Kinda forgot all about the ARP part, in all that excitement.
Turns out we've been sending our Ethernet frames to "00-00-00-00-00-00". Well, it happens, haha. I'm fairly sure our router completely dropped our packets though. Which may explain the timeout.
So we need to find the MAC address of our router before returning from Interface::open_default()
.
I'm pretty sure a scoped thread is the right tool for the job. Let's remove
arp
from PendingRequests
too - we'll deal with all that locally.
struct PendingQueries {
// gone: arp
ipv4: Vec<Box<dyn Fn(&ipv4::Packet) -> bool + Send + 'static>>,
}
impl Interface {
pub fn open_default() -> Result<Self, error::Error> {
// omitted: getting nic, iface
// gone: arp
let pending = Arc::new(Mutex::new(PendingQueries { ipv4: Vec::new() }));
let gateway_mac = crossbeam_utils::thread::scope(|s| {
let (tx, rx) = mpsc::channel();
let gateway_ip = nic.gateway;
let poll_iface = iface.clone();
s.spawn(move |_| {
poll_iface
.loop_infinite_dyn(&mut |packet| {
// if we can parse it...
let frame = match ethernet::Frame::parse(packet) {
Ok((_remaining, frame)) => frame,
_ => return,
};
// ...and it's ARP
let arp = match frame.payload {
ethernet::Payload::ARP(x) => x,
_ => return,
};
// ...and it's a reply
if let arp::Operation::Reply = arp.operation {
// ...from the host we want
if arp.sender_ip_addr == gateway_ip {
// ... then send the result!
tx.send(arp.sender_hw_addr).unwrap();
}
}
})
.unwrap();
});
arp::Packet::request(&nic, gateway_ip)
.as_ethernet_payload()
.as_broadcast_frame(&nic)
.send(iface.as_ref())
.unwrap();
let ret = rx
.recv_timeout(Duration::from_secs(3))
.map_err(|_| "ARP timeout")
.unwrap();
iface.break_loop();
ret
})
.unwrap();
let res = Self {
pending: pending.clone(),
nic,
// new: use `gateway_mac` value we just obtained
gateway_mac,
iface: iface.clone(),
};
std::thread::spawn(move || {
// omitted: body of "packet processor thread"
});
Ok(res)
}
}
That should do the trick.
I want to jump on the occasion to do one more code cleanup.
Our packet processing thread looks like that currently:
iface
.loop_infinite_dyn(&mut |packet| {
let frame = match ethernet::Frame::parse(packet) {
Ok((_, frame)) => frame,
_ => return,
};
if let ethernet::Payload::IPv4(ref packet) = frame.payload {
let mut idx = None;
// only if the packet is IPv4 do we need to lock `pending`
let mut pending = pending.lock().unwrap();
// try all listeners in order
for (i, f) in pending.ipv4.iter().enumerate() {
if f(&packet) {
// break if one wanted to unsubscribe
idx = Some(i);
break;
}
}
if let Some(idx) = idx {
// unsubscribe
pending.ipv4.remove(idx);
}
}
})
.unwrap();
I'm not super fond of that. Isn't there a method on Iterator
that returns the position
of an item that satisfies a predicate?
// the rust standard library
/// Searches for an element in an iterator, returning its index.
///
/// position() takes a closure that returns true or false. It applies this
/// closure to each element of the iterator, and if one of them returns true,
/// then position() returns Some(index). If all of them return false, it returns
/// None.
///
/// position() is short-circuiting; in other words, it will stop processing as
/// soon as it finds a true.
fn position<P>(&mut self, predicate: P) -> Option<usize>
where
P: FnMut(Self::Item) -> bool
Heyyy that sounds great.
Using that, we can change:
let mut idx = None;
// only if the packet is IPv4 do we need to lock `pending`
let mut pending = pending.lock().unwrap();
// try all listeners in order
for (i, f) in pending.ipv4.iter().enumerate() {
if f(&packet) {
// break if one wanted to unsubscribe
idx = Some(i);
break;
}
}
if let Some(idx) = idx {
// unsubscribe
pending.ipv4.remove(idx);
}
To the following:
let mut pending = pending.lock().unwrap();
pending
.ipv4
.iter()
.position(|f| f(packet))
.map(|i| pending.ipv4.remove(i));
Much better!
Does it work now?
$ cargo run --quiet -- 8.8.8.8
Pinging 8.8.8.8 with 8 bytes of data:
Reply from 8.8.8.8: bytes=8 time=1.000651s TTL=54
Reply from 8.8.8.8: bytes=8 time=301.8831ms TTL=54
Reply from 8.8.8.8: bytes=8 time=251.4266ms TTL=54
Reply from 8.8.8.8: bytes=8 time=210.9397ms TTL=54
It does! Our efforts to parse and serialize Ethernet, ARP, IPv4, and ICMP have all finally come together.
There's.. there's something about these timings though. They don't really
sound accurate. The first one in particular (1.0000651s
) is downright fishy.
Compared to the output of Windows's built-in ping
:
$ ping 8.8.8.8
Pinging 8.8.8.8 with 32 bytes of data:
Reply from 8.8.8.8: bytes=32 time=8ms TTL=54
Reply from 8.8.8.8: bytes=32 time=7ms TTL=54
Reply from 8.8.8.8: bytes=32 time=8ms TTL=54
Reply from 8.8.8.8: bytes=32 time=7ms TTL=54
Yeah our timings are whack.
But, again, in a suspicious way. There's no way the roundtrip took almost exactly one second. It's almost like there's a one-second delay somewhere in the stack.
We now it's not in ersatz
or sup
. Although we do wait one second between
each ping, we send and process packets as soon as we can. Also, we've taken
some liberties performance-wise, but nowhere near any that could justify
such a delay.
It's almost like... our packet capture/injection library is buffering packets at some point...
If you'll remember, we know that it has a "send queue", and we took care to flush it:
// in `ersatz/src/lib.rs`
impl Interface {
pub fn send_ipv4(&self, payload: ipv4::Payload, addr: &ipv4::Addr) -> Result<(), error::Error> {
payload
.as_packet(self.nic.address, addr.clone())
.as_ethernet_payload()
.as_frame(&self.nic, self.gateway_mac)
.send(self.iface.as_ref())?;
// right here
self.iface.as_ref().flush();
Ok(())
}
}
Is there a "receive queue" maybe? The actual library rawsock
uses is npcap
- in particular,
it uses its winpcap
-compatible interface, maybe if we look at the rawsock
code for this, we
might find a clue?
// rawsock internals
impl<'a> Interface<'a> {
pub fn new(name: &str, dll: &'a WPCapDll) ->Result<Self, Error> {
let name = CString::new(name)?;
let mut errbuf = PCapErrBuf::new();
let handle = unsafe { dll.pcap_open_live(
name.as_ptr(),
65536, /* max packet size */
8, /* promiscuous mode */
1000, /* read timeout in milliseconds */
errbuf.buffer()
)};
// omitted: rest of function
}
}
Oh hey. Would you look at that.
Let's try something. We'll clone rawsock
locally, in a vendor/
directory:
# (in ersatz/)
$ mkdir vendor
$ git clone https://github.com/szymonwieloch/rust-rawsock vendor/rawsock
Cloning into 'vendor/rawsock'...
remote: Enumerating objects: 82, done.
remote: Counting objects: 100% (82/82), done.
remote: Compressing objects: 100% (50/50), done.
remote: Total 931 (delta 48), reused 54 (delta 32), pack-reused 849
Receiving objects: 100% (931/931), 152.95 KiB | 855.00 KiB/s, done.
Resolving deltas: 100% (597/597), done.
Set the timeout to 1 millisecond:
// in `vendor/rawsock/src/wpcap/interface.rs`
impl<'a> Interface<'a> {
pub fn new(name: &str, dll: &'a WPCapDll) -> Result<Self, Error> {
let name = CString::new(name)?;
let mut errbuf = PCapErrBuf::new();
let handle = unsafe {
dll.pcap_open_live(
name.as_ptr(),
65536, /* max packet size */
8, /* promiscuous mode */
// new!
1, /* read timeout in milliseconds */
errbuf.buffer(),
)
};
// cut: rest
}
}
Adjust our Cargo.toml
:
# in `rawsock/Cargo.toml`
[dependencies]
rawsock = { version = "0.3.0", path = "./vendor/rawsock" }
# omitted: everything else
And give it a shot!
$ cargo run --quiet -- 8.8.8.8
Pinging 8.8.8.8 with 8 bytes of data:
Reply from 8.8.8.8: bytes=8 time=9.8372ms TTL=54
Reply from 8.8.8.8: bytes=8 time=9.0064ms TTL=54
Reply from 8.8.8.8: bytes=8 time=9.9738ms TTL=54
Reply from 8.8.8.8: bytes=8 time=8.9758ms TTL=54
And there you have it, folks.
We implemented parts of Ethernet, ARP, IPv4, and ICMP and have a fully-functioning ping utility that clocks in at roughly 1600 lines of relatively-clean Rust.
We've made our own ping.
Here's another article just for you:
Pin and suffering
I'd like to think that my understanding of "async Rust" has increased over the past year or so. I'm 100% onboard with the basic principle: I would like to handle thousands of concurrent tasks using a handful of threads. That sounds great!
And to become proficient with async Rust, I've accepted a lot of things. There are blue functions and red functions, and red (async) functions are contagious.