Surviving Rust async interfaces

👋 This page was last updated ~4 years ago. Just so you know.

I used to be afraid of async Rust. It's easy to get into trouble!

But thanks to the work done by the whole community, async Rust is getting easier to use every week. One project I think is doing particularly great work in this area is async-std.

Let's say we want to compute the SHA3-256 hash of a file. It's very easy to do with synchronous I/O:

$ cargo new surviving
     Created binary (application) `surviving` package
$ cargo add argh sha3 color-eyre
    Updating 'https://github.com/rust-lang/crates.io-index' index
      Adding argh v0.1.3 to dependencies
      Adding sha3 v0.9.1 to dependencies
      Adding color-eyre v0.5.1 to dependencies
Cool bear

Cool bear's hot tip

Whoa, amos, cool off. Why all the dependencies?

What's the matter cool bear? Too fast for you?

Okay then, let's go through these one by one.

The color-eyre crate (hi Jane!) provides an Error type that collects backtraces (and spantraces, but more on that later), and it also happens to format them in a way that looks suspiciously like color-backtrace - ie, I like it.

It's super easy to get started with:

// in `src/main.rs`

use color_eyre::eyre;
use std::io;

fn main() -> Result<(), eyre::Error> {
    color_eyre::install().unwrap();

    // I am: too lazy to make a custom Error type right now, AMA
    let e = io::Error::new(
        io::ErrorKind::PermissionDenied,
        "you can't cut back on error handling! you will regret it",
    );
    Err(e.into())
}
$ cargo run -q
Error:
   0: you can't cut back on error handling! you will regret this

Backtrace omitted.
Run with RUST_BACKTRACE=1 environment variable to display it.
Run with RUST_BACKTRACE=full to include source snippets.

And here's the output with RUST_BACKTRACE=1:

$ RUST_BACKTRACE=1 cargo run -q
Error:
   0: you can't cut back on error handling! you will regret this

  ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ BACKTRACE ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
                                ⋮ 5 frames hidden ⋮
   6: <T as core::convert::Into<U>>::into::h330a3dcd2ad914ef
      at /home/amos/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/src/libcore/convert/mod.rs:559
   7: surviving::main::ha622503931ea5ba8
      at /home/amos/ftl/surviving/src/main.rs:11
                                ⋮ 10 frames hidden ⋮

Run with COLORBT_SHOW_HIDDEN=1 environment variable to disable frame filtering.
Run with RUST_BACKTRACE=full to include source snippets.

And with RUST_BACKTRACE=full:

$ RUST_BACKTRACE=full cargo run -q
Error:
   0: you can't cut back on error handling! you will regret this

  ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ BACKTRACE ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
                                ⋮ 5 frames hidden ⋮
   6: <T as core::convert::Into<U>>::into::h330a3dcd2ad914ef
      at /home/amos/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/src/libcore/convert/mod.rs:559
       557 │ {
       558 │     fn into(self) -> U {
       559 >         U::from(self)
       560 │     }
       561 │ }
   7: surviving::main::ha622503931ea5ba8
      at /home/amos/ftl/surviving/src/main.rs:11
         9 │         "you can't cut back on error handling! you will regret this",
        10 │     );
        11 >     Err(e.into())
        12 │ }
                                ⋮ 10 frames hidden ⋮

Run with COLORBT_SHOW_HIDDEN=1 environment variable to disable frame filtering.

Of course the color part doesn't quite carry to the blog format - I have yet to integrate ANSI escape codes to HTML conversion in my content pipeline, so in the meantime, here's a screenshot:

As you can see: it is quite good.

Next up is argh, which is also entirely gratuitous for this example, but which I'm going to use anyway, because I like using a declarative approach for command-line flags.

Also, it lets me showcase that you can have types like PathBuf in command-line arguments.

Cool bear

Cool bear's hot tip

Didn't some folks react to your Mr Golang's Wild Ride saying: sure, Rust doesn't treat paths as strings, but std::env::args() returns an Iterator that yields String items! Checkmate!

Yeah, exactly - and the answer is, if you don't feel like using args_os yourself (OsString instances are a bit annoying to manipulate), then just use a crate that does the right thing for you.

Cool bear

Cool bear's hot tip

Right! And argh is basically a spin on structopt, but optimized for code size?

Yup.

use argh::FromArgs;
use color_eyre::eyre;
use std::path::PathBuf;

/// Prints the SHA3-256 hash of a file.
#[derive(FromArgs)]
struct Args {
    /// the file whose contents to hash and print
    #[argh(positional)]
    file: PathBuf,
}

fn main() -> Result<(), eyre::Error> {
    color_eyre::install().unwrap();
    let args: Args = argh::from_env();

    let metadata = std::fs::metadata(&args.file)?;
    println!("{} is {} bytes", args.file.display(), metadata.len());

    Ok(())
}
$ cargo run -q -- /etc/hosts
/etc/hosts is 184 bytes
Cool bear

Cool bear's hot tip

And then there's just sha3 left.

Speaking of - what do you mean, SHA3-256 - are there several variants of SHA-3?

There is! MD5 has a digest size of 128 bits, SHA-1 is 160 bits, SHA-2 has a couple options (224, 256, 384 or 512 bits), and SHA-3 is also a family - a subset of the Keccak family, first published in 2015 by Bertoni, Daemen, Peeters and Assche.

Cool bear

Cool bear's hot tip

2015 huh? I guess you won't have many things to compare your program's output with?

That's true - I haven't seen any SHA-3 variants in the wild yet. For example, AUR packages tend to provide either SHA-1 or SHA-256 sums. For the Ubuntu 20.04.1 downloads, only SHA-256 sums are available.

But that's not a problem - we can just compute a reference hash using the OpenSSL command-line interface:

$ time openssl dgst -sha3-256 ubuntu.iso
SHA3-256(ubuntu.iso)= 5659eaf1fe2b98f149cb751aaa38b39e2b9573d5f19e773e78c5cc655262b915
openssl dgst -sha3-256 ubuntu.iso  6.68s user 0.35s system 99% cpu 7.032 total

And now for our version:

use argh::FromArgs;
use color_eyre::eyre;
use sha3::Digest;
use std::{fs::File, io::Read, path::PathBuf};

/// Prints the SHA3-256 hash of a file.
#[derive(FromArgs)]
struct Args {
    /// the file whose contents to hash and print
    #[argh(positional)]
    file: PathBuf,
}

fn main() -> Result<(), eyre::Error> {
    color_eyre::install().unwrap();
    let args: Args = argh::from_env();

    let mut file = File::open(&args.file)?;
    let mut hasher = sha3::Sha3_256::new();

    let mut buf = vec![0u8; 256 * 1024];
    loop {
        let n = file.read(&mut buf[..])?;
        match n {
            0 => break,
            n => hasher.update(&buf[..n]),
        }
    }

    let hash = hasher.finalize();
    print!("{} ", args.file.display());
    for x in hash {
        print!("{:02x}", x);
    }
    println!();

    Ok(())
}
$ cargo build --release -q && time ./target/release/surviving ubuntu.iso
ubuntu.iso 5659eaf1fe2b98f149cb751aaa38b39e2b9573d5f19e773e78c5cc655262b915
./target/release/surviving ubuntu.iso  8.64s user 0.23s system 99% cpu 8.884 total
Cool bear

Cool bear's hot tip

Hey, that's slower!

A bit, yeah! But more importantly - the result is correct. And it's pure Rust. And I'm sure it could be made just as fast, if someone really wanted to.

Cool bear

Cool bear's hot tip

Excuses, excuses.

Okay tell you what cool bear - you go and squeeze out extra performance out of sha3, and I'll just finish this article.

For the rest of this article, we'll be hashing a slightly smaller file - the Wine 5.0.2 release (download here).

$ openssl dgst -sha3-256 wine-5.0.2.tar.xz
SHA3-256(wine-5.0.2.tar.xz)= 2a48ac5363318b2b8dd222933002bac9fa0e1cc051605c17ebdae9f78ff03892

$ ./target/release/surviving wine-5.0.2.tar.xz
wine-5.0.2.tar.xz 2a48ac5363318b2b8dd222933002bac9fa0e1cc051605c17ebdae9f78ff03892

Okay, let's bring some async into the mix:

$ cargo add async-std
    Updating 'https://github.com/rust-lang/crates.io-index' index
      Adding async-std v1.6.2 to dependencies

We'll want some extra features so we'll change:

# in Cargo.toml
async-std = "1.6.2"

To:

# in Cargo.toml
async-std = { version = "1.6.2", features = ["attributes"] }

And now we can make our main function async!

// in `src/main.rs`

#[async_std::main]
async fn main() -> Result<(), eyre::Error> {
   // same as before
}
$ cargo build --release -q && ./target/release/surviving wine-5.0.2.tar.xz
wine-5.0.2.tar.xz 2a48ac5363318b2b8dd222933002bac9fa0e1cc051605c17ebdae9f78ff03892

That was easy! Well, as usual, thanks for reading and I'll see you ne..

Cool bear

Cool bear's hot tip

Hoooooold on.

Oh hey bear, how are the performance optimizations going?

Cool bear

Cool bear's hot tip

I gave up. Turns out crypto is hard.

But your program - nothing is async in there. It's all blocking.

Yeah, yeah, you're right of course. Let's make at least the file reads asynchronous:

use argh::FromArgs;
// new: those were imported from `std` previously
use async_std::{fs::File, io::ReadExt};
use color_eyre::eyre;
use sha3::Digest;
use std::path::PathBuf;

/// Prints the SHA3-256 hash of a file.
#[derive(FromArgs)]
struct Args {
    /// the file whose contents to hash and print
    #[argh(positional)]
    file: PathBuf,
}

#[async_std::main]
async fn main() -> Result<(), eyre::Error> {
    color_eyre::install().unwrap();
    let args: Args = argh::from_env();

    // changed: now awaited
    let mut file = File::open(&args.file).await?;
    let mut hasher = sha3::Sha3_256::new();

    let mut buf = vec![0u8; 256 * 1024];
    loop {
        // changed: now awaited (read is provided by `ReadExt`)
        let n = file.read(&mut buf[..]).await?;
        match n {
            0 => break,
            n => hasher.update(&buf[..n]),
        }
    }

    let hash = hasher.finalize();
    print!("{} ", args.file.display());
    for x in hash {
        print!("{:02x}", x);
    }
    println!();

    Ok(())
}
$ cargo build --release -q && ./target/release/surviving wine-5.0.2.tar.xz
wine-5.0.2.tar.xz 2a48ac5363318b2b8dd222933002bac9fa0e1cc051605c17ebdae9f78ff03892

Okay, everything still works.

What uh.. what's the difference though? Our program worked before, and it works now. What did async give us? Is it faster? Does it do stuff in parallel?

Does it use threads?

$ cargo build -q
$ gdb --args ./target/debug/surviving ./wine-5.0.2.tar.xz
...
(gdb) break pthread_create
Function "pthread_create" not defined.
Make breakpoint pending on future shared library load? (y or [n]) y
Breakpoint 1 (pthread_create) pending.
(gdb) r
Starting program: /home/amos/ftl/surviving/target/debug/surviving ./wine-5.0.2.tar.xz
[Thread debugging using libthread_db enabled]
Using host libthread_db library "/usr/lib/libthread_db.so.1".

Breakpoint 1, 0x00007ffff7f6d700 in pthread_create@@GLIBC_2.2.5 () from /usr/lib/libpthread.so.0
(gdb) bt
#0  0x00007ffff7f6d700 in pthread_create@@GLIBC_2.2.5 () from /usr/lib/libpthread.so.0
#1  0x000055555577f43d in std::sys::unix::thread::Thread::new () at src/libstd/sys/unix/thread.rs:66
#2  0x00005555556dbdac in std::thread::Builder::spawn_unchecked (self=..., f=...)
    at /home/amos/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/src/libstd/thread/mod.rs:492
#3  0x00005555556dc389 in std::thread::Builder::spawn (self=..., f=...)
    at /home/amos/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/src/libstd/thread/mod.rs:386
#4  0x00005555556ddf10 in async_std::rt::RUNTIME::{{closure}} ()
    at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/async-std-1.6.2/src/rt/mod.rs:28
#5  0x00005555556c920e in core::ops::function::FnOnce::call_once ()
    at /home/amos/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/src/libcore/ops/function.rs:232
#6  0x00005555556c91fb in core::ops::function::FnOnce::call_once ()
    at /home/amos/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/src/libcore/ops/function.rs:232
#7  0x00005555556d7109 in once_cell::sync::Lazy<T,F>::force::{{closure}} ()
    at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/once_cell-1.4.0/src/lib.rs:953
#8  0x00005555556d728f in once_cell::sync::OnceCell<T>::get_or_init::{{closure}} ()
    at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/once_cell-1.4.0/src/lib.rs:786
#9  0x00005555556d8ec9 in once_cell::imp::OnceCell<T>::initialize::{{closure}} ()
    at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/once_cell-1.4.0/src/imp_std.rs:97
#10 0x000055555575a5f2 in once_cell::imp::initialize_inner (my_state_and_queue=0x555555855040 <async_std::rt::RUNTIME>,
    init=...) at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/once_cell-1.4.0/src/imp_std.rs:171
#11 0x00005555556d8dc0 in once_cell::imp::OnceCell<T>::initialize (self=0x555555855040 <async_std::rt::RUNTIME>, f=...)
    at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/once_cell-1.4.0/src/imp_std.rs:95
#12 0x00005555556d735c in once_cell::sync::OnceCell<T>::get_or_try_init (self=0x555555855040 <async_std::rt::RUNTIME>,
    f=...) at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/once_cell-1.4.0/src/lib.rs:826
#13 0x00005555556d722e in once_cell::sync::OnceCell<T>::get_or_init (self=0x555555855040 <async_std::rt::RUNTIME>, f=...)
    at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/once_cell-1.4.0/src/lib.rs:786
#14 0x00005555556d7046 in once_cell::sync::Lazy<T,F>::force (this=0x555555855040 <async_std::rt::RUNTIME>)
    at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/once_cell-1.4.0/src/lib.rs:952
#15 0x000055555559c25f in async_std::task::builder::Builder::build (self=..., future=...)
    at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/async-std-1.6.2/src/task/builder.rs:42
#16 0x000055555559c439 in async_std::task::builder::Builder::blocking (self=..., future=...)
    at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/async-std-1.6.2/src/task/builder.rs:146
#17 0x00005555555943c6 in async_std::task::block_on::block_on (future=...)
    at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/async-std-1.6.2/src/task/block_on.rs:33
#18 0x000055555559d66b in surviving::main () at src/main.rs:16

Yeah, okay, it uses threads.

But it doesn't do anything in parallel - our code as written is sequential. First open the file, then read from it, feed it to the hasher (which is synchronous, and CPU-bound), etc.

Things would get interesting if we were to spawn tasks, to hash multiple files in parallel:

use argh::FromArgs;
use async_std::{fs::File, io::ReadExt};

use color_eyre::eyre;
use sha3::Digest;
use std::path::{Path, PathBuf};

/// Prints the SHA3-256 hash of some files
#[derive(FromArgs)]
struct Args {
    /// the files whose contents to hash and print
    #[argh(positional)]
    files: Vec<PathBuf>,
}

#[async_std::main]
async fn main() -> Result<(), eyre::Error> {
    color_eyre::install().unwrap();
    let args: Args = argh::from_env();

    let mut handles = Vec::new();

    for file in &args.files {
        let file = file.clone();
        let handle = async_std::task::spawn(async move {
            let res = hash_file(&file).await;
            if let Err(e) = res {
                println!("While hashing {}: {}", file.display(), e);
            }
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.await;
    }

    Ok(())
}

async fn hash_file(path: &Path) -> Result<(), eyre::Error> {
    let mut file = File::open(path).await?;
    let mut hasher = sha3::Sha3_256::new();

    let mut buf = vec![0u8; 256 * 1024];
    loop {
        let n = file.read(&mut buf[..]).await?;
        match n {
            0 => break,
            n => hasher.update(&buf[..n]),
        }
    }

    let hash = hasher.finalize();
    print!("{} ", path.display());
    for x in hash {
        print!("{:02x}", x);
    }
    println!();

    Ok(())
}
$ cargo build --release -q && ./target/release/surviving wine-*
wine-4.0.4.tar.xz d53a567e7a14a2b8dbd669afece1603daa769ada93522ec1f26fe69674d7c433
wine-5.0.2.tar.xz 2a48ac5363318b2b8dd222933002bac9fa0e1cc051605c17ebdae9f78ff03892
wine-5.0.1.tar.xz 4af0d295e56db9f723daa4822ee4b4416ac8840278ccd7df73ef357027e5b663
Cool bear

Cool bear's hot tip

Okay, cool - multiple files. But is it using multiple threads?

Well I don't know! It works, but.. I don't know a lot beyond that.

So let's dig a little.

Here's one thing that's rather easy to do: threads have a unique identifier (process-wide). We can just grab it and print it from whichever place we choose, for example... right before calling read():

async fn hash_file(path: &Path) -> Result<(), eyre::Error> {
   // etc.
    loop {
        println!("{:?}", std::thread::current().id());
        let n = file.read(&mut buf[..]).await?;
        match n {
            0 => break,
            n => hasher.update(&buf[..n]),
        }
    }
    // (cut)
$ cargo build --release -q && time ./target/release/surviving wine-*
ThreadId(3)
ThreadId(3)
ThreadId(2)
ThreadId(2)
ThreadId(3)
ThreadId(1)
ThreadId(1)
ThreadId(3)
ThreadId(2)
ThreadId(3)
ThreadId(2)
ThreadId(3)
ThreadId(1)
ThreadId(2)
Cool bear

Cool bear's hot tip

Ooooooh it's using threads alright.

Yeah. But are the tasks (each task is hashing a different file) always on the same thread?

Let's do some more printing:

// in `fn hash_file`
// in `loop`
println!("{} => {:?}", path.display(), std::thread::current().id());
$ cargo build --release -q && time ./target/release/surviving wine-*
wine-4.0.4.tar.xz => ThreadId(3)
wine-5.0.1.tar.xz => ThreadId(3)
wine-5.0.2.tar.xz => ThreadId(2)
wine-5.0.1.tar.xz => ThreadId(3)
wine-5.0.2.tar.xz => ThreadId(1)
wine-4.0.4.tar.xz => ThreadId(2)
wine-4.0.4.tar.xz => ThreadId(2)
wine-5.0.1.tar.xz => ThreadId(1)
wine-5.0.2.tar.xz => ThreadId(3)
wine-4.0.4.tar.xz => ThreadId(2)
wine-5.0.2.tar.xz => ThreadId(2)
wine-5.0.1.tar.xz => ThreadId(3)
wine-4.0.4.tar.xz => ThreadId(1)
wine-5.0.2.tar.xz => ThreadId(2)

No, they are not pinned. Not at all, in fact. We can see that the wine-4.0.4.tar.xz task hops from thread 3, to thread 2, then to thread 1.

Everything is happening a little too fast for me though. I'd like to try something else - I'd like to try and make an asynchronous reader that sleeps before each read.

Okay, wrapping a Reader is simple right? All we have to do is... declare a struct, which itself has an inner reader - something that implements async_std::io::Read trait, which is just futures::io::AsyncRead in disguise.

For clarity, we'll add futures to our Cargo.toml and use that trait directly:

$ cargo add futures
    Updating 'https://github.com/rust-lang/crates.io-index' index
      Adding futures v0.3.5 to dependencies
use futures::io::AsyncRead;

struct TracingReader<R>
where
    R: AsyncRead,
{
    inner: R,
}

impl<R> AsyncRead for TracingReader<R> where R: AsyncRead {}

Very good - now, what trait items do we need to implement for AsyncRead?

Let's let rust-analyzer implement the missing method for us:

use futures::io::AsyncRead;
use std::{
    io,
    pin::Pin,
    task::{Context, Poll},
};

struct TracingReader<R>
where
    R: AsyncRead,
{
    inner: R,
}

impl<R> AsyncRead for TracingReader<R>
where
    R: AsyncRead,
{
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut [u8],
    ) -> Poll<io::Result<usize>> {
        todo!()
    }
}
Cool bear

Cool bear's hot tip

This looks uhh... different from std::io::Read.

Yeah, it does! Well, it takes a &mut [u8], which I recognize. It also returns a type that contains an io::Result<usize>.

But everything else is different. It has a Context, which I'm guessing has something to do with the async executor. It returns a Poll type, and its self feels a bit.. unfamiliar maybe?

Let's do a bit of reading on pinning:

Types that pin data to its location in memory.

It is sometimes useful to have objects that are guaranteed not to move, in the sense that their placement in memory does not change, and can thus be relied upon. A prime example of such a scenario would be building self-referential structs, as moving an object with pointers to itself will invalidate them, which could cause undefined behavior.

So, one could probably write a whole book about pinning.

Cool bear

Cool bear's hot tip

A book you say..

...but let's keep this short and sweet. Basically, pinning something prevents us from ever getting a mutable reference to it. Which means we can no longer use, for example, std::mem::swap, or std::mem::replace.

However, given a Pin<&mut T>, we can get a mutable reference to one of our fields.

...right?

impl<R> AsyncRead for TracingReader<R>
where
    R: AsyncRead,
{
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut [u8],
    ) -> Poll<io::Result<usize>> {
        let inner: &mut R = &mut self.inner;
        todo!()
    }
}
$ cargo check
    Checking surviving v0.1.0 (/home/amos/ftl/surviving)
error[E0596]: cannot borrow data in a dereference of `std::pin::Pin<&mut TracingReader<R>>` as mutable
  --> src/main.rs:90:29
   |
90 |         let inner: &mut R = &mut self.inner;
   |                             ^^^^^^^^^^^^^^^ cannot borrow as mutable
   |
   = help: trait `DerefMut` is required to modify through a dereference, but it is not implemented for `std::pin::Pin<&mut TracingReader<R>>`

error: aborting due to previous error

Ah, uh, it's not that simple. To get a &mut self.inner, we'd have to get a &mut self first, and that's precisely what Pin prevents.

Turns out - we have two options. Either we can decide that "pinning is not structural" for our field, inner, and then we can use unsafe code to get a &mut R:

impl<R> AsyncRead for TracingReader<R>
where
    R: AsyncRead,
{
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut [u8],
    ) -> Poll<io::Result<usize>> {
        let inner: &mut R = unsafe { &mut self.get_unchecked_mut().inner };

        todo!()
    }
}

The problem here is that... even though R implements AsyncRead, we cannnot call poll_read on our inner variable:

impl<R> AsyncRead for TracingReader<R>
where
    R: AsyncRead,
{
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut [u8],
    ) -> Poll<io::Result<usize>> {
        let inner: &mut R = unsafe { &mut self.get_unchecked_mut().inner };
        inner.poll_read(cx, buf)
    }
}

In fact, the compiler error is a bit.. lacking for that particular scenario:

$ cargo check
    Checking surviving v0.1.0 (/home/amos/ftl/surviving)
error[E0599]: no method named `poll_read` found for mutable reference `&mut R` in the current scope
  --> src/main.rs:91:15
   |
91 |         inner.poll_read(cx, buf)
   |               ^^^^^^^^^ method not found in `&mut R`
   |
   = help: items from traits can only be used if the type parameter is bounded by the trait
help: the following trait defines an item `poll_read`, perhaps you need to restrict type parameter `R` with it:
   |
81 | impl<R: futures_io::if_std::AsyncRead> AsyncRead for TracingReader<R>
   |      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Which doesn't stop it from being absolutely correct. There is no method named poll_read for R that takes a self: &mut Self (usually written as its shorthand, &mut self).

There is one that takes Pin<&mut Self>, though.

Which brings us to option two: we can decide that "pinning is structural" for our field inner. Which it is - since we want to call poll_read on it, which needs a Pin<&mut Self>, we must guarantee that, while TracingReader<R> is pinned, TracingReader::inner is also pinned.

And there is also a bit of unsafe code that lets us obtain a Pin<&mut R>:

impl<R> AsyncRead for TracingReader<R>
where
    R: AsyncRead,
{
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut [u8],
    ) -> Poll<io::Result<usize>> {
        // tracing
        let address = &self as *const _;
        println!("{:?} => {:?}", address, std::thread::current().id());

        // reading - pinning is structural for `self.inner`
        let inner: Pin<&mut R> = unsafe { self.map_unchecked_mut(|x| &mut x.inner) };
        inner.poll_read(cx, buf)
    }
}

Now we can use our TracingReader directly from hash_file:

async fn hash_file(path: &Path) -> Result<(), eyre::Error> {
    let file = File::open(path).await?;
    let mut file = TracingReader { inner: file };

    let mut hasher = sha3::Sha3_256::new();

    let mut buf = vec![0u8; 256 * 1024];
    loop {
        let n = file.read(&mut buf[..]).await?;
        match n {
            0 => break,
            n => hasher.update(&buf[..n]),
        }
    }

    let hash = hasher.finalize();
    print!("{} ", path.display());
    for x in hash {
        print!("{:02x}", x);
    }
    println!();

    Ok(())
}
$ cargo build --release -q && time ./target/release/surviving wine-*
0x7f54804343e0 => ThreadId(2)
0x7f54804343e0 => ThreadId(2)
0x7f54804343e0 => ThreadId(2)
0x7f54804343e0 => ThreadId(2)
0x7ffff9685970 => ThreadId(1)
0x7f547b3f83e0 => ThreadId(10)
0x7f54804343e0 => ThreadId(2)
0x7ffff9685970 => ThreadId(1)
0x7f547b3f83e0 => ThreadId(10)
Cool bear

Cool bear's hot tip

Wait a minute... if you can obtain both a &mut R and a Pin<&mut R>... doesn't that kind of defeat the purpose of pinning?

One could pass a Pin<&mut R> to a method, and the next moment, they could use a &mut R to move it, deinitialize it, etc.

What a good-looking question. You definitely can get into trouble that way, but you'll note that it is using unsafe.

Cool bear

Cool bear's hot tip

I was getting to that - why are we using unsafe in the first place?

Isn't there another way? A better way?

There is! There's nothing in the Rust type system that prevents you from calling both get_unchecked_mut and map_unchecked_mut for the same field - that's why they're both unsafe.

But the problem can be solved another way - and I want to hear the people in the back this time: there is a crate for that.

$ cargo add pin-project
    Updating 'https://github.com/rust-lang/crates.io-index' index
      Adding pin-project v0.4.23 to dependencies
Cool bear

Cool bear's hot tip

Project? Like, project management?

No, like, projection. But I was confused as well the first couple times it showed up in the dependencies of a project. Don't tell anyone though.

With pin-project, you get to decide which fields are pinned, and which fields aren't.

In our case, we have only one field, and it is pinned:

use pin_project::pin_project;

// Generate projection types
#[pin_project]
struct TracingReader<R>
where
    R: AsyncRead,
{
    // pinning is structural for `inner`
    #[pin]
    inner: R,
}

Then, we can use self.project() (which itself takes a Pin<&mut Self>) to get &mut T for unpinned fields, and Pin<&mut T> for pinned fields.

impl<R> AsyncRead for TracingReader<R>
where
    R: AsyncRead,
{
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut [u8],
    ) -> Poll<io::Result<usize>> {
        // tracing
        let address = &self as *const _;
        println!("{:?} => {:?}", address, std::thread::current().id());

        // reading
        self.project().inner.poll_read(cx, buf)
    }
}

That way - there's no unsafe code directly in our crate, and we can't accidentally change our mind between a field being pinned and a field being unpinned.

Cool bear

Cool bear's hot tip

Okay. That pinning stuff is still very fresh in my bear mind right now, but... I have another question.

Why isn't AsyncRead::poll_read an async fn?

Ah - an "easy" question.

(Keep in mind this is written in August 2020 - this article might be in dire need of being updated at some point.)

At the time of this writing, async functions in traits are not supported.

Cool bear

Cool bear's hot tip

What?

See for yourself:

trait SimpleRead {
    async fn simple_read(&mut self, buf: &mut [u8]) -> io::Result<usize>;
}
$ cargo check
error[E0706]: functions in traits cannot be declared `async`
   --> src/main.rs:107:5
    |
107 |     async fn simple_read(&mut self, &mut buf[u8]) -> io::Result<usize>;
    |     -----^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |     |
    |     `async` because of this
    |
    = note: `async` trait functions are not currently supported
    = note: consider using the `async-trait` crate: https://crates.io/crates/async-trait

Hopefully that won't always be the case, but for the time being we're stuck with it.

Cool bear

Cool bear's hot tip

Wait, wait - the compiler just told us about a crate.

Of course it did.

async-trait lets us work that limitation a little, so let's try it out:

$ cargo add async-trait
    Updating 'https://github.com/rust-lang/crates.io-index' index
      Adding async-trait v0.1.36 to dependencies
use async_trait::async_trait;

#[async_trait]
trait SimpleRead {
    async fn simple_read(&mut self, buf: &mut [u8]) -> io::Result<usize>;
}

And.. voilà! This compiles just fine.

Cool bear

Cool bear's hot tip

Okay... where's the catch?

Can we try implementing it for TracingReader, for example?

Sure, yeah, let's do it:

use async_trait::async_trait;

#[async_trait]
trait SimpleRead {
    async fn simple_read(&mut self, buf: &mut [u8]) -> io::Result<usize>;
}

#[async_trait]
impl<R> SimpleRead for TracingReader<R>
where
    R: AsyncRead,
{
    async fn simple_read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
        self.inner.read(buf).await
    }
}
$ cargo check
    Checking surviving v0.1.0 (/home/amos/ftl/surviving)
error[E0277]: `R` cannot be unpinned
   --> src/main.rs:119:20
    |
119 |         self.inner.read(buf).await
    |                    ^^^^ the trait `std::marker::Unpin` is not implemented for `R`
    |
help: consider further restricting this bound
    |
116 |     R: AsyncRead + std::marker::Unpin,
    |                  ^^^^^^^^^^^^^^^^^^^^

Uh oh. Well, let's see... we have a &mut Self. In order to call AsyncReadExt::read, we need a &mut R where R is Unpin.

Well... you know what is Unpin? Pin<&mut R>.

If we were to take a Pin<&mut Self>, we could use project (generated via pin-project) and get a Pin<&mut R>:

#[async_trait]
trait SimpleRead {
    async fn simple_read(self: Pin<&mut Self>, buf: &mut [u8]) -> io::Result<usize>;
}

#[async_trait]
impl<R> SimpleRead for TracingReader<R>
where
    R: AsyncRead,
{
    async fn simple_read(self: Pin<&mut Self>, buf: &mut [u8]) -> io::Result<usize> {
        self.project().inner.read(buf).await
    }
}
$ cargo check
    Checking surviving v0.1.0 (/home/amos/ftl/surviving)
error: future cannot be sent between threads safely
   --> src/main.rs:118:85
    |
118 |       async fn simple_read(self: Pin<&mut Self>, buf: &mut [u8]) -> io::Result<usize> {
    |  _____________________________________________________________________________________^
119 | |         self.project().inner.read(buf).await
120 | |     }
    | |_____^ future returned by `__simple_read` is not `Send`
    |
    = help: within `TracingReader<R>`, the trait `std::marker::Send` is not implemented for `R`
note: future is not `Send` as this value is used across an await
   --> src/main.rs:119:9
    |
118 |     async fn simple_read(self: Pin<&mut Self>, buf: &mut [u8]) -> io::Result<usize> {
    |                          ---- has type `std::pin::Pin<&mut TracingReader<R>>` which is not `Send`
119 |         self.project().inner.read(buf).await
    |         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ await occurs here, with `self` maybe used later
120 |     }
    |     - `self` is later dropped here
    = note: required for the cast to the object type `dyn core::future::future::Future<Output = std::result::Result<usize, std::io::Error>> + std::marker::Send`
help: consider further restricting this bound
    |
116 |     R: AsyncRead + std::marker::Send,
    |                  ^^^^^^^^^^^^^^^^^^^

Okay, we're 90% of the way there - it no longers complains about R not being Unpin. But now it complains about the future not being Send.

Cool bear

Cool bear's hot tip

The future? What future.

Well, every async function is a normal function in disguise, that returns an impl Future.

Cool bear

Cool bear's hot tip

Oh.. and futures can be polled? By executors?

Passing them a context, yes.

Cool bear

Cool bear's hot tip

Just like poll_read?

Just like poll_read, yeah!

In our case, since we're in future "async fn in traits are supported" land, we don't see the concrete Future type - it's all generated by the compiler under the cover.

But we know that the generated code uses a Pin<&mut R> at some point, which refers to the self.inner field, and R is not Send, so our Future is not Send either.

And that's one of the big limitations of async-trait. When using native async fn support in Rust, sometimes futures end up being Send, sometimes they end up not being Send (ie. they cannot be safely sent across threads).

But when using async-trait, you have to pick one. And the default is: whatever Future the async fn returns, must be Send.

We can lift that limitation by using #[async_trait(?Send)]:

#[async_trait(?Send)]
trait SimpleRead {
    async fn simple_read(self: Pin<&mut Self>, buf: &mut [u8]) -> io::Result<usize>;
}

#[async_trait(?Send)]
impl<R> SimpleRead for TracingReader<R>
where
    R: AsyncRead,
{
    async fn simple_read(self: Pin<&mut Self>, buf: &mut [u8]) -> io::Result<usize> {
        self.project().inner.read(buf).await
    }
}

And then we have a fully working trait, which we can use from hash_file.

Right?

async fn hash_file(path: &Path) -> Result<(), eyre::Error> {
    let file = File::open(path).await?;
    let mut file = TracingReader { inner: file };

    let mut hasher = sha3::Sha3_256::new();

    let mut buf = vec![0u8; 256 * 1024];
    loop {
        let n = file.simple_read(&mut buf[..]).await?;
        match n {
            0 => break,
            n => hasher.update(&buf[..n]),
        }
    }

    // etc.
}
$ cargo check
    Checking surviving v0.1.0 (/home/amos/ftl/surviving)
error[E0599]: no method named `simple_read` found for struct `TracingReader<async_std::fs::file::File>` in the current scope
   --> src/main.rs:51:22
    |
51  |           let n = file.simple_read(&mut buf[..]).await?;
    |                        ^^^^^^^^^^^ method not found in `TracingReader<async_std::fs::file::File>`
...
79  | / struct TracingReader<R>
80  | | where
81  | |     R: AsyncRead,
82  | | {
...   |
85  | |     inner: R,
86  | | }
    | |_- method `simple_read` not found for this
...
110 |       async fn simple_read(self: Pin<&mut Self>, buf: &mut [u8]) -> io::Result<usize>;
    |                                  -------------- the method might not be found because of this arbitrary self type

Oh no. This sounds awfully familiar.

Oh that's right! Our simple_read method takes a Pin<&mut Self> now, not a &mut self.

I guess we can just... pin it ourselves?

let n = Pin::new(&mut file).simple_read(&mut buf[..]).await?;
$ cargo check
    Checking surviving v0.1.0 (/home/amos/ftl/surviving)
error: future cannot be sent between threads safely
  --> src/main.rs:27:22
   |
27 |         let handle = async_std::task::spawn(async move {
   |                      ^^^^^^^^^^^^^^^^^^^^^^ future created by async block is not `Send`
   |
  ::: /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/async-std-1.6.2/src/task/spawn.rs:28:29
   |
28 |     F: Future<Output = T> + Send + 'static,
   |                             ---- required by this bound in `async_std::task::spawn::spawn`
   |
   = help: the trait `std::marker::Send` is not implemented for `dyn core::future::future::Future<Output = std::result::Result<usize, std::io::Error>>`
note: future is not `Send` as it awaits another future which is not `Send`
  --> src/main.rs:51:17
   |
51 |         let n = Pin::new(&mut file).simple_read(&mut buf[..]).await?;
   |                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ await occurs here on type `std::pin::Pin<std::boxed::Box<dyn core::future::future::Future<Output = std::result::Result<usize, std::io::Error>>>>`, which is not `Send`

Ohhhhh.

Cool bear

Cool bear's hot tip

Oh!

Ah. Turns out, spawn requires the, well, spawned Future to be Send.

Which makes sense! Since it spawns the Future on the threaded executor, as we've seen (by printing the ThreadId).

What if.. what if we tried spawning it on the thread-local executor?

Turns out, as of async-std 1.6.2, that's an unstable feature, but let's try it anyway:

# in `Cargo.toml`
async-std = { version = "1.6.2", features = ["attributes", "unstable"] }
$ cargo build --release -q && time ./target/release/surviving wine-*
wine-4.0.4.tar.xz d53a567e7a14a2b8dbd669afece1603daa769ada93522ec1f26fe69674d7c433
wine-5.0.1.tar.xz 4af0d295e56db9f723daa4822ee4b4416ac8840278ccd7df73ef357027e5b663
wine-5.0.2.tar.xz 2a48ac5363318b2b8dd222933002bac9fa0e1cc051605c17ebdae9f78ff03892
./target/release/surviving wine-*  0.22s user 0.04s system 117% cpu 0.218 total

Oh hey it works!

But.. how does it work? Does it just do them in sequence? Let's compare timings:

$ for i in wine-*; do time ./target/release/surviving $i; done
wine-4.0.4.tar.xz d53a567e7a14a2b8dbd669afece1603daa769ada93522ec1f26fe69674d7c433
./target/release/surviving $i  0.05s user 0.03s system 103% cpu 0.078 total
wine-5.0.1.tar.xz 4af0d295e56db9f723daa4822ee4b4416ac8840278ccd7df73ef357027e5b663
./target/release/surviving $i  0.08s user 0.00s system 103% cpu 0.081 total
wine-5.0.2.tar.xz 2a48ac5363318b2b8dd222933002bac9fa0e1cc051605c17ebdae9f78ff03892
./target/release/surviving $i  0.08s user 0.00s system 102% cpu 0.082 total

So, measuring like that is a terrible idea, and don't do it, and all the usual caveats, BUT this is 2020, all bets are off, so: if we add up the "total" times for our for i in wine-* (sequential) run, we get results around 0.24s - whereas the surviving wine-* (mystery) run stays around the 0.21s range.

Cool bear

Cool bear's hot tip

This is.. this not scientific at all.

Also, aren't you running these on a laptop?

Yeah, yes, absolutely. This is absolute garbage methodology - so, let's do the same thing we did before and just print the current thread's identifier:

#[async_trait(?Send)]
impl<R> SimpleRead for TracingReader<R>
where
    R: AsyncRead,
{
    async fn simple_read(self: Pin<&mut Self>, buf: &mut [u8]) -> io::Result<usize> {
        // tracing
        let address = &self as *const _;
        println!("{:?} => {:?}", address, std::thread::current().id());

        // reading
        self.project().inner.read(buf).await
    }
}
$ cargo build --release -q && time ./target/release/surviving wine-*
0x55a274428fe8 => ThreadId(1)
0x55a2744292c8 => ThreadId(1)
0x55a274429458 => ThreadId(1)
0x55a274428fe8 => ThreadId(1)
0x55a2744292c8 => ThreadId(1)
0x55a274429458 => ThreadId(1)
0x55a274428fe8 => ThreadId(1)
0x55a2744292c8 => ThreadId(1)
0x55a274429458 => ThreadId(1)
0x55a274428fe8 => ThreadId(1)
0x55a2744292c8 => ThreadId(1)
0x55a274429458 => ThreadId(1)
0x55a274428fe8 => ThreadId(1)
0x55a2744292c8 => ThreadId(1)
Cool bear

Cool bear's hot tip

Whoa.

It's all running on the same thread!

Yes it is! And what's even more interesting is that you can see the tasks alternate: first fe8, then 2c8, then 458, etc.

So, my feeling is that it is actually a little bit faster, because it's queueing reads, switching to another task whenever they're ready, blocking on the hash computation, then switching again, etc. - it spends less time waiting for I/O syscalls.

But if we really want to get faster performance, we probably do want multi-threading.

So we'll have to make our futures Send:

#[async_trait]
trait SimpleRead {
    async fn simple_read(self: Pin<&mut Self>, buf: &mut [u8]) -> io::Result<usize>;
}

#[async_trait]
impl<R> SimpleRead for TracingReader<R>
where
    R: AsyncRead,
{
    async fn simple_read(self: Pin<&mut Self>, buf: &mut [u8]) -> io::Result<usize> {
        // tracing
        let address = &self as *const _;
        println!("{:?} => {:?}", address, std::thread::current().id());

        // reading
        self.project().inner.read(buf).await
    }
}
$ cargo check
    Checking surviving v0.1.0 (/home/amos/ftl/surviving)
error: future cannot be sent between threads safely
   --> src/main.rs:118:85
    |
118 |       async fn simple_read(self: Pin<&mut Self>, buf: &mut [u8]) -> io::Result<usize> {
    |  _____________________________________________________________________________________^
119 | |         // tracing
120 | |         let address = &self as *const _;
121 | |         println!("{:?} => {:?}", address, std::thread::current().id());
...   |
124 | |         self.project().inner.read(buf).await
125 | |     }
    | |_____^ future returned by `__simple_read` is not `Send`
    |
    = help: within `impl core::future::future::Future`, the trait `std::marker::Send` is not implemented for `*const std::pin::Pin<&mut TracingReader<R>>`
note: future is not `Send` as this value is used across an await
   --> src/main.rs:124:9
    |
120 |         let address = &self as *const _;
    |             ------- has type `*const std::pin::Pin<&mut TracingReader<R>>` which is not `Send`
...
124 |         self.project().inner.read(buf).await
    |         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ await occurs here, with `address` maybe used later
125 |     }
    |     - `address` is later dropped here
    = note: required for the cast to the object type `dyn core::future::future::Future<Output = std::result::Result<usize, std::io::Error>>
 + std::marker::Send`

Okay, back to square one. Let's squint at the compiler messages: we have to constrain R further.

Right now we're only asking of R that it implements AsyncRead. If we also ask that it is Send, then our TracingReader<R> should also automatically implement Send, since the only field of the struct is of type R.

In fact, we only have to add it to our impl SimpleRead block:

#[async_trait]
impl<R> SimpleRead for TracingReader<R>
where
    R: AsyncRead + Send,
{
    async fn simple_read(self: Pin<&mut Self>, buf: &mut [u8]) -> io::Result<usize> {
        // tracing
        let address = &self as *const _;
        println!("{:?} => {:?}", address, std::thread::current().id());

        // reading
        self.project().inner.read(buf).await
    }
}

There! It's done, it's ready for the prime ti..

$ cargo check
    Checking surviving v0.1.0 (/home/amos/ftl/surviving)
error: future cannot be sent between threads safely
   --> src/main.rs:118:85
    |
118 |       async fn simple_read(self: Pin<&mut Self>, buf: &mut [u8]) -> io::Result<usize> {
    |  _____________________________________________________________________________________^
119 | |         // tracing
120 | |         let address = &self as *const _;
121 | |         println!("{:?} => {:?}", address, std::thread::current().id());
...   |
124 | |         self.project().inner.read(buf).await
125 | |     }
    | |_____^ future returned by `__simple_read` is not `Send`
    |
    = help: within `impl core::future::future::Future`, the trait `std::marker::Send` is not implemented for `*const std::pin::Pin<&mut TracingReader<R>>`
note: future is not `Send` as this value is used across an await
   --> src/main.rs:124:9
    |
120 |         let address = &self as *const _;
    |             ------- has type `*const std::pin::Pin<&mut TracingReader<R>>` which is not `Send`
...
124 |         self.project().inner.read(buf).await
    |         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ await occurs here, with `address` maybe used later
125 |     }
    |     - `address` is later dropped here
    = note: required for the cast to the object type `dyn core::future::future::Future<Output = std::result::Result<usize, std::io::Error>> + std::marker::Send`

it's not done. That error is really confusing.

Let's see - address has type *const std::pin::Pin<&mut TracingReader<R>> (not what I was going for, but ah well), and that type is not Send, so we cannot hold on to it across an await.

That part actually makes sense. When we .await, what actually happens is that the poll method of our Future returns Poll::Pending, and the next time it's polled, it might be polled from a different thread - since we promised our Future was send (as is async-trait's default, which we just reverted back to).

The part that doesn't make sense is with address maybe used later.

Where?? Where may it be used later? Tell me rustc!

Cool bear

Cool bear's hot tip

Well, address only gets dropped at the end of the scope - and Drop::drop takes a &mut self. So if it does implement Drop, it'll be used after the await!

Okay - but then why doesn't it work if we just explicitly call drop(address) after the println!?

Cool bear

Cool bear's hot tip

I uh...

Uh-huh. I'm fairly sure it's a case of "the compiler is not sufficiently smart to figure this yet" - Rust is a research project gone terribly right, after all - at any rate, we can solve our problem using a scope:

    async fn simple_read(self: Pin<&mut Self>, buf: &mut [u8]) -> io::Result<usize> {
        // tracing
        {
            let address = &self as *const _;
            println!("{:?} => {:?}", address, std::thread::current().id());
        }

        // reading
        self.project().inner.read(buf).await
    }

And with that change, everything compiles again.

But I'm not really satisfied yet. Having to take Pin<&mut Self>, and having to pin manually when calling simple_read seems like a major usability issue.

Can't we just let the compiler, uh, hand waves handle it?

Let's see. The last time we tried to take &mut self, the compiler complained that R wasn't Unpin. So, what if we just make it Unpin?

#[async_trait]
impl<R> SimpleRead for TracingReader<R>
where
    R: AsyncRead + Send + Unpin,
{
    async fn simple_read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
        // tracing
        {
            let address = &self as *const _;
            println!("{:?} => {:?}", address, std::thread::current().id());
        }

        // reading
        self.inner.read(buf).await
    }
}

And now we can once again use it without pinning ourselves:

// in `fn hash_file`
let n = file.simple_read(&mut buf[..]).await?;

That works just fine! But we're still using async_std::task::spawn_local.

Can we use it with async_std::task::spawn?

Yeah! And if we do, it's definitely faster than hashing each file in sequence:

$ for i in wine-*; do time ./target/release/surviving $i > /dev/null; done
./target/release/surviving $i > /dev/null  0.06s user 0.02s system 101% cpu 0.079 total
./target/release/surviving $i > /dev/null  0.09s user 0.00s system 101% cpu 0.085 total
./target/release/surviving $i > /dev/null  0.08s user 0.00s system 101% cpu 0.082 total
$ cargo build --release -q && time ./target/release/surviving wine-* > /dev/null
./target/release/surviving wine-* > /dev/null  0.24s user 0.01s system 279% cpu 0.089 total
Cool bear

Cool bear's hot tip

Okay, very well. But what's the point of using the async machinery then?

It seems to me like, either you use spawn_local, and then everything is on the same thread and only I/O is non-blocking, or you use spawn and it ends up on a bunch of threads - but then why not just use std::thread::spawn yourself?

Well! Spawning one thread per task would eventually become expensive. Creating a thread is fast compared to making a network request, but it's slow compared to just polling a task for completion. A threaded executor will usually have a maximum number of threads, and it'll execute N tasks (where N could be very large) on M threads (where M remains reasonably low).

Also, keep in mind that here, we're reading from a fast I/O device: a local SSD. But what if we were reading from the network instead? Or god forbid... tape storage?

Then we'd spend a lot of time waiting for I/O - and an executor (threaded or not) would be able to let each task advance as soon as their I/O request is ready.

Cool bear

Cool bear's hot tip

So async is not just a "make things faster" button?

No, it's not. In fact, it's more of a "make things slower" button - because there is more bookkeeping to do. But it also allows "things" to scale better. Note that futures are not the only way to do asynchronous I/O in Rust - you can definitely poll file descriptors yourself, have your own task system, etc.

Futures are just what ended up becoming standard, and a whole ecosystem is growing around them.

Cool bear

Cool bear's hot tip

Speaking of standards - what if you have a type that implements a trait with async methods, like SimpleRead, and you want to implement AsyncRead on top of it?

So that it can be passed to async-std's copy method, for example?

Ah, that part is not as easy. But it's not impossible either!

Let's remove our AsyncRead implementation for TracingReader.

We can also remove pin-project from it, leaving us with:

// Generate projection types
struct TracingReader<R>
where
    R: AsyncRead,
{
    inner: R,
}

...and our impl SimpleRead for TracingReader block.

Now, can we make a type that forward AsyncRead to another type that implements SimpleRead?

Let's try:

use pin_project::pin_project;

#[pin_project]
struct SimpleAsyncReader<R>
where
    R: SimpleRead,
{
    // we're going to need a `&mut R` out of a `Pin<&mut Self>`,
    // to call `SimpleRead::simple_read`.
    inner: R,
}

impl<R> AsyncRead for SimpleAsyncReader<R>
where
    R: SimpleRead,
{
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut [u8],
    ) -> Poll<io::Result<usize>> {
        // step 1: project
        let inner = self.project().inner;
        // step 2: obtain future
        let mut fut = inner.simple_read(buf);
        // step 3: pin and poll future
        Pin::new(&mut fut).poll(cx)
    }
}

(I've commented out the tracing part so the output is less verbose).

Now we have to actually use it:

async fn hash_file(path: &Path) -> Result<(), eyre::Error> {
    let file = File::open(path).await?;
    let file = TracingReader { inner: file };
    // new!
    let mut file = SimpleAsyncReader { inner: file };

    let mut hasher = sha3::Sha3_256::new();

    let mut buf = vec![0u8; 256 * 1024];
    loop {
        // back to `read` (used to be `simple_read`)
        let n = file.read(&mut buf[..]).await?;
        match n {
            0 => break,
            n => hasher.update(&buf[..n]),
        }
    }

    let hash = hasher.finalize();
    print!("{} ", path.display());
    for x in hash {
        print!("{:02x}", x);
    }
    println!();

    Ok(())
}

So, did we nail it?

$ cargo build --release -q && time ./target/release/surviving wine-*
wine-4.0.4.tar.xz d53a567e7a14a2b8dbd669afece1603daa769ada93522ec1f26fe69674d7c433
wine-5.0.2.tar.xz 2a48ac5363318b2b8dd222933002bac9fa0e1cc051605c17ebdae9f78ff03892
wine-5.0.1.tar.xz 4af0d295e56db9f723daa4822ee4b4416ac8840278ccd7df73ef357027e5b663
./target/release/surviving wine-*  0.24s user 0.02s system 276% cpu 0.093 total

Mhhh it appears to work... suspicious.

Let's try changing up our TracingReader a bit. Instead of making it trace, we'll make it artificially slow down reads.

$ cargo add futures-timer
    Updating 'https://github.com/rust-lang/crates.io-index' index
      Adding futures-timer v3.0.2 to dependencies
#[async_trait]
impl<R> SimpleRead for TracingReader<R>
where
    R: AsyncRead + Send + Unpin,
{
    async fn simple_read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
        use futures_timer::Delay;
        use std::time::Duration;

        // artificial slowdown
        Delay::new(Duration::from_millis(50)).await;

        // reading
        self.inner.read(buf).await
    }
}

Now, if we run the program it remains stuck forever.

It's time... to do some tracing.

$ cargo add tracing tracing-futures tracing-tree tracing-subscriber
    Updating 'https://github.com/rust-lang/crates.io-index' index
      Adding tracing v0.1.18 to dependencies
      Adding tracing-futures v0.2.4 to dependencies
      Adding tracing-tree v0.1.4 to dependencies
      Adding tracing-subscriber v0.2.10 to dependencies
use tracing_subscriber::{prelude::*, Registry};

#[async_std::main]
#[tracing::instrument]
async fn main() -> Result<(), eyre::Error> {
    let subscriber = Registry::default().with(HierarchicalLayer::new(2));
    tracing::subscriber::set_global_default(subscriber).unwrap();

    // etc.
}


#[async_trait]
impl<R> SimpleRead for TracingReader<R>
where
    R: AsyncRead + Send + Unpin,
{
    #[tracing::instrument(skip(self, buf))]
    async fn simple_read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
        use futures_timer::Delay;
        use std::time::Duration;

        // artificial slowdown
        tracing::debug!("doing delay...");
        Delay::new(Duration::from_millis(50)).await;
        tracing::debug!("doing delay...done!");

        // reading
        tracing::debug!("doing read...");
        let res = self.inner.read(buf).await;
        tracing::debug!("doing read...done!");
        res
    }
}

impl<R> AsyncRead for SimpleAsyncReader<R>
where
    R: SimpleRead,
{
    #[tracing::instrument(skip(self, buf))]
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut [u8],
    ) -> Poll<io::Result<usize>> {
        let inner = self.project().inner;
        let mut fut = inner.simple_read(buf);
        tracing::debug!("polling future...");
        let res = Pin::new(&mut fut).poll(cx);
        match &res {
            Poll::Ready(_) => tracing::debug!("future was ready!"),
            Poll::Pending => tracing::debug!("future was pending!"),
        }
        res
    }
}

Let's give it a shot:

$ cargo build -q && RUST_LOG=debug ./target/debug/surviving ./wine-4.0.4.tar.xz
poll_read{cx=Context { waker: Waker { data: 0x55a63274c4d0, vtable: 0x55a63056d730 } }}
  0ms DEBUG polling future...
  simple_read{}
    0ms DEBUG doing delay...
  0ms DEBUG future is pending!

Mhh. Well, we're probably doing something wrong, but I can

Cool bear

Cool bear's hot tip

Hang on hang on I think I've got it.

UHh go ahead cool bear, I'm listening.

Cool bear

Cool bear's hot tip

Well, when you call .read(), you're creating a Future, right?

That's right, or at least, read() returns a Future.

Cool bear

Cool bear's hot tip

And then you poll it immediately, and then return from poll_read with whatever std::task::Poll variant that future returned.

Yes, but I don't see where you're going wit..

Cool bear

Cool bear's hot tip

What happens to the future after it's polled?

I'm not sure, what does happen to the future?

Cool bear

Cool bear's hot tip

Well, you're not storing it anywhere, are you?

No, I'm not. I guess it's dropped at the end of poll_read?

Cool bear

Cool bear's hot tip

It is! So you're only polling it once at most.

And what happens you drop a future? It gets cancelled!

AhAH! I think I get it. But then... why did it work before we added the delay?

Cool bear

Cool bear's hot tip

Before you added the delay. I'm reading up on sponge functions.

I guess it worked because... all futures that return Poll::Pending need to let the executor know (via the Context) when to wake them up. And I guess in the case of an async_std::fs::File, it subscribed to "the next read is ready", and even though the future was dropped, poll_read still got called again.

But when you - yes, you - added a delay, it probably subscribed to a timer event, and when the future was dropped, it immediately unsubscribed, so poll_read (called from the outer Future - the task itself) never got called again.

So, if I'm hearing you right... once we get a future, we need to store it, so that we can poll it until it's ready.

But Future is a trait! So it probably needs to be boxed. And .poll takes a Pin<&mut self>, so it probably needs to be pinned, too.

Okay, let's give it a shot:

#[pin_project]
struct SimpleAsyncReader<R>
where
    R: SimpleRead,
{
    inner: R,
    state: State,
}

enum State {
    Idle,
    Pending(Pin<Box<dyn Future<Output = io::Result<usize>> + Send>>),
}

impl<R> AsyncRead for SimpleAsyncReader<R>
where
    R: SimpleRead,
{
    #[tracing::instrument(skip(self, buf))]
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut [u8],
    ) -> Poll<io::Result<usize>> {
        let res = match self.state {
            State::Idle => {
                tracing::debug!("getting new future...");
                let inner = self.project().inner;
                let mut fut = Box::pin(inner.simple_read(buf));
                let res = fut.as_mut().poll(cx);
                self.state = State::Pending(fut);
                res
            }
            State::Pending(ref mut fut) => {
                tracing::debug!("polling existing future...");
                fut.as_mut().poll(cx)
            }
        };
        match &res {
            Poll::Ready(_) => tracing::debug!("future was ready!"),
            Poll::Pending => tracing::debug!("future was pending!"),
        }

        res
    }
}

Okay, this looks promising. Our SimpleAsyncReader has a State, which can be either Idle (no future yet), or Pending (existing future to poll), and depending on which state it is, it does the right thing.

It looks good, except for one thing:

$ cargo check
    Checking surviving v0.1.0 (/home/amos/ftl/surviving)
error[E0495]: cannot infer an appropriate lifetime for lifetime parameter 'pin in function call due to conflicting requirements
   --> src/main.rs:152:34
    |
152 |                 let inner = self.project().inner;
    |                                  ^^^^^^^
    |
note: first, the lifetime cannot outlive the anonymous lifetime #1 defined on the method body at 143:5...
   --> src/main.rs:143:5
    |
143 |     #[tracing::instrument(skip(self, buf))]
    |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
note: ...so that the types are compatible
   --> src/main.rs:152:34
    |
152 |                 let inner = self.project().inner;
    |                                  ^^^^^^^
    = note: expected `std::pin::Pin<&mut SimpleAsyncReader<R>>`
               found `std::pin::Pin<&mut SimpleAsyncReader<R>>`
    = note: but, the lifetime must be valid for the static lifetime...
note: ...so that the expression is assignable
   --> src/main.rs:155:45
    |
155 |                 self.state = State::Pending(fut);
    |                                             ^^^
    = note: expected `std::pin::Pin<std::boxed::Box<(dyn core::future::future::Future<Output = std::result::Result<usize, std::io::Error>> + std::marker::Send + 'static)>>`
               found `std::pin::Pin<std::boxed::Box<dyn core::future::future::Future<Output = std::result::Result<usize, std::io::Error>> + std::marker::Send>>`
    = note: this error originates in an attribute macro (in Nightly builds, run with -Z macro-backtrace for more info)

It doesn't compile.

Why doesn't it compile? Well, when use self.project(), we get a &mut R, but it's only valid for as long as the Pin<&mut Self>. But now we're storing it in self.state, which means it must outlive the current function!

How can we work around that?

I have an idea: let's try letting the Future take ownership of R. That way, R will definitely live for as long as the Future lives.

And since we'll need R back for further poll_read calls, we'll have the future return (literally, give back) R along with the io::Result<usize>

Let's even use a type alias so the code is more readable:

#[pin_project]
struct SimpleAsyncReader<R>
where
    R: SimpleRead,
{
    state: State<R>,
}

type BoxFut<T> = Pin<Box<dyn Future<Output = T> + Send>>;

enum State<R> {
    Idle(R),
    Pending(BoxFut<(R, io::Result<usize>)>),
    Transitional,
}

impl<R> AsyncRead for SimpleAsyncReader<R>
where
    R: SimpleRead + Send,
{
    #[tracing::instrument(skip(self, buf))]
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut [u8],
    ) -> Poll<io::Result<usize>> {
        let proj = self.project();
        let mut state = State::Transitional;
        std::mem::swap(proj.state, &mut state);

        let mut fut = match state {
            State::Idle(mut inner) => {
                tracing::debug!("getting new future...");
                Box::pin(async move {
                    let res = inner.simple_read(buf).await;
                    (inner, res)
                })
            }
            State::Pending(fut) => {
                tracing::debug!("polling existing future...");
                fut
            }
            State::Transitional => unreachable!(),
        };

        match fut.as_mut().poll(cx) {
            Poll::Ready((inner, result)) => {
                tracing::debug!("future was ready!");
                *proj.state = State::Idle(inner);
                Poll::Ready(result)
            }
            Poll::Pending => {
                tracing::debug!("future was pending!");
                *proj.state = State::Pending(fut);
                Poll::Pending
            }
        }
    }
}

There. I'm sure now we've added a sufficient amount of complications to resolve our original problem, and the compiler will definitely not get in the way of anyth...

$ cargo check
    Checking surviving v0.1.0 (/home/amos/ftl/surviving)
error[E0495]: cannot infer an appropriate lifetime due to conflicting requirements
   --> src/main.rs:157:37
    |
157 |                   Box::pin(async move {
    |  _____________________________________^
158 | |                     let res = inner.simple_read(buf).await;
159 | |                     (inner, res)
160 | |                 })
    | |_________________^
    |
note: first, the lifetime cannot outlive the anonymous lifetime #4 defined on the method body at 144:5...
   --> src/main.rs:144:5
    |
144 |     #[tracing::instrument(skip(self, buf))]
    |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
note: ...so that the types are compatible
   --> src/main.rs:157:37
    |
157 |                   Box::pin(async move {
    |  _____________________________________^
158 | |                     let res = inner.simple_read(buf).await;
159 | |                     (inner, res)
160 | |                 })
    | |_________________^
    = note: expected `&mut [u8]`
               found `&mut [u8]`
    = note: but, the lifetime must be valid for the static lifetime...
note: ...so that the expression is assignable
   --> src/main.rs:177:46
    |
177 |                 *proj.state = State::Pending(fut);
    |                                              ^^^
    = note: expected `std::pin::Pin<std::boxed::Box<(dyn core::future::future::Future<Output = (R, std::result::Result<usize, std::io::Error>)> + std::marker::Send + 'static)>>`
               found `std::pin::Pin<std::boxed::Box<dyn core::future::future::Future<Output = (R, std::result::Result<usize, std::io::Error>)> + std::marker::Send>>`
    = note: this error originates in an attribute macro (in Nightly builds, run with -Z macro-backtrace for more info)

Oh.

That's unfortunate.

But the compiler diagnostics are pretty good this time. The Box<dyn Future> wants a Future that's 'static. But we have a &mut [u8] only for a much, much shorter time than that. We don't even have it for the full lifetime of SimpleAsyncReader.

We have it for just the duration of the poll_read call. And now that we're storing futures across poll_read calls, well... we can't hold on to that &mut [u8] at all.

So here's one idea: we could give our SimpleAsyncReader its own buffer. It would... reserve enough bytes for a read of length buf.len(), transfer ownership to the Future, and then get it back.

Let's see how this would look in practice:

First, we're going to need to keep track of our buffer in both states:

enum State<R> {
    Idle(R, Vec<u8>),
    Pending(BoxFut<(R, Vec<u8>, io::Result<usize>)>),
    Transitional,
}

And next, well... we're going to need to capture our Vec<u8> (named internal_buf in the code that follows) within the Future, and get it back.

And when we get it back and a read was successful, then we shall copy from our internal buffer to the output buffer, buf.

impl<R> AsyncRead for SimpleAsyncReader<R>
where
    // new: R must now be `'static`, since it's captured
    // by the future which is, itself, `'static`.
    R: SimpleRead + Send + 'static,
{
    #[tracing::instrument(skip(self, buf))]
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut [u8],
    ) -> Poll<io::Result<usize>> {
        let proj = self.project();
        let mut state = State::Transitional;
        std::mem::swap(proj.state, &mut state);

        let mut fut = match state {
            State::Idle(mut inner, mut internal_buf) => {
                tracing::debug!("getting new future...");
                internal_buf.clear();
                internal_buf.reserve(buf.len());
                unsafe { internal_buf.set_len(buf.len()) }

                Box::pin(async move {
                    let res = inner.simple_read(&mut internal_buf[..]).await;
                    (inner, internal_buf, res)
                })
            }
            State::Pending(fut) => {
                tracing::debug!("polling existing future...");
                fut
            }
            State::Transitional => unreachable!(),
        };

        match fut.as_mut().poll(cx) {
            Poll::Ready((inner, mut internal_buf, result)) => {
                tracing::debug!("future was ready!");
                if let Ok(n) = &result {
                    let n = *n;
                    unsafe { internal_buf.set_len(n) }

                    let dst = &mut buf[..n];
                    let src = &internal_buf[..];
                    dst.copy_from_slice(src);
                } else {
                    unsafe { internal_buf.set_len(0) }
                }
                *proj.state = State::Idle(inner, internal_buf);
                Poll::Ready(result)
            }
            Poll::Pending => {
                tracing::debug!("future was pending!");
                *proj.state = State::Pending(fut);
                Poll::Pending
            }
        }
    }
}

And finally, we need to adjust how we build our SimpleAsyncReader:

async fn hash_file(path: &Path) -> Result<(), eyre::Error> {
    let file = File::open(path).await?;
    let file = TracingReader { inner: file };
    let mut file = SimpleAsyncReader {
        state: State::Idle(file, Default::default()),
    };

    // etc.
}

And with that, our little state machine is complete.

Wind it up and watch it go:

$ cargo build -q && RUST_LOG=debug ./target/debug/surviving ./wine-4.0.4.tar.xz
poll_read{cx=Context { waker: Waker { data: 0x55f4b52ca240, vtable: 0x55f4b3e926f0 } }}
  0ms  DEBUG getting new future...
  simple_read{}
    0ms DEBUG doing delay...
  0ms DEBUG future was pending!
poll_read{cx=Context { waker: Waker { data: 0x55f4b52ca240, vtable: 0x55f4b3e926f0 } }}
  0ms  DEBUG polling existing future...
  simple_read{}
    50ms DEBUG doing delay...done!
    50ms DEBUG doing read...
  0ms DEBUG future was pending!
poll_read{cx=Context { waker: Waker { data: 0x55f4b52ca240, vtable: 0x55f4b3e926f0 } }}
  0ms  DEBUG polling existing future...
  simple_read{}
    51ms DEBUG doing read...done!
  0ms DEBUG future was ready!
poll_read{cx=Context { waker: Waker { data: 0x55f4b52ca240, vtable: 0x55f4b3e926f0 } }}
  0ms  DEBUG getting new future...
  simple_read{}
    0ms DEBUG doing delay...
  0ms DEBUG future was pending!
poll_read{cx=Context { waker: Waker { data: 0x55f4b52ca240, vtable: 0x55f4b3e926f0 } }}
  0ms  DEBUG polling existing future...
  simple_read{}
    50ms DEBUG doing delay...done!
    50ms DEBUG doing read...
  0ms DEBUG future was pending!

Wonderful! Let's give it a shot in release mode and with less tracing:

$ cargo build -q --release && ./target/release/surviving ./wine-*
./wine-4.0.4.tar.xz d53a567e7a14a2b8dbd669afece1603daa769ada93522ec1f26fe69674d7c433
./wine-5.0.1.tar.xz 4af0d295e56db9f723daa4822ee4b4416ac8840278ccd7df73ef357027e5b663
./wine-5.0.2.tar.xz 2a48ac5363318b2b8dd222933002bac9fa0e1cc051605c17ebdae9f78ff03892
$ openssl dgst -sha3-256 wine-4.0.4.tar.xz
SHA3-256(wine-4.0.4.tar.xz)= d53a567e7a14a2b8dbd669afece1603daa769ada93522ec1f26fe69674d7c433
$ openssl dgst -sha3-256 wine-5.0.1.tar.xz
SHA3-256(wine-5.0.1.tar.xz)= 4af0d295e56db9f723daa4822ee4b4416ac8840278ccd7df73ef357027e5b663
$ openssl dgst -sha3-256 wine-5.0.2.tar.xz
SHA3-256(wine-5.0.2.tar.xz)= 2a48ac5363318b2b8dd222933002bac9fa0e1cc051605c17ebdae9f78ff03892

And just like that, everything works.

Comment on /r/fasterthanlime

(JavaScript is required to see this. Or maybe my stuff broke)

Here's another article just for you:

Aiming for correctness with types

The Nature weekly journal of science was first published in 1869. And after one and a half century, it has finally completed one cycle of carcinization, by publishing an article about the Rust programming language.

It's a really good article.

What I liked about this article is that it didn't just talk about performance, or even just memory safety - it also talked about correctness.