Surviving Rust async interfaces
I used to be afraid of async Rust. It's easy to get into trouble!
But thanks to the work done by the whole community, async Rust is getting easier to use every week. One project I think is doing particularly great work in this area is async-std.
Let's say we want to compute the SHA3-256 hash of a file. It's very easy to do with synchronous I/O:
$ cargo new surviving Created binary (application) `surviving` package $ cargo add argh sha3 color-eyre Updating 'https://github.com/rust-lang/crates.io-index' index Adding argh v0.1.3 to dependencies Adding sha3 v0.9.1 to dependencies Adding color-eyre v0.5.1 to dependencies
Whoa, amos, cool off. Why all the dependencies?
What's the matter cool bear? Too fast for you?
Okay then, let's go through these one by one.
The color-eyre crate (hi Jane!) provides
an Error
type that collects backtraces (and spantraces, but more on that
later), and it also happens to format them in a way that looks suspiciously
like color-backtrace - ie, I like
it.
It's super easy to get started with:
// in `src/main.rs` use color_eyre::eyre; use std::io; fn main() -> Result<(), eyre::Error> { color_eyre::install().unwrap(); // I am: too lazy to make a custom Error type right now, AMA let e = io::Error::new( io::ErrorKind::PermissionDenied, "you can't cut back on error handling! you will regret it", ); Err(e.into()) }
$ cargo run -q Error: 0: you can't cut back on error handling! you will regret this Backtrace omitted. Run with RUST_BACKTRACE=1 environment variable to display it. Run with RUST_BACKTRACE=full to include source snippets.
And here's the output with RUST_BACKTRACE=1
:
$ RUST_BACKTRACE=1 cargo run -q Error: 0: you can't cut back on error handling! you will regret this ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ BACKTRACE ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ ⋮ 5 frames hidden ⋮ 6: <T as core::convert::Into<U>>::into::h330a3dcd2ad914ef at /home/amos/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/src/libcore/convert/mod.rs:559 7: surviving::main::ha622503931ea5ba8 at /home/amos/ftl/surviving/src/main.rs:11 ⋮ 10 frames hidden ⋮ Run with COLORBT_SHOW_HIDDEN=1 environment variable to disable frame filtering. Run with RUST_BACKTRACE=full to include source snippets.
And with RUST_BACKTRACE=full
:
$ RUST_BACKTRACE=full cargo run -q Error: 0: you can't cut back on error handling! you will regret this ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ BACKTRACE ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ ⋮ 5 frames hidden ⋮ 6: <T as core::convert::Into<U>>::into::h330a3dcd2ad914ef at /home/amos/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/src/libcore/convert/mod.rs:559 557 │ { 558 │ fn into(self) -> U { 559 > U::from(self) 560 │ } 561 │ } 7: surviving::main::ha622503931ea5ba8 at /home/amos/ftl/surviving/src/main.rs:11 9 │ "you can't cut back on error handling! you will regret this", 10 │ ); 11 > Err(e.into()) 12 │ } ⋮ 10 frames hidden ⋮ Run with COLORBT_SHOW_HIDDEN=1 environment variable to disable frame filtering.
Of course the color part doesn't quite carry to the blog format - I have yet to integrate ANSI escape codes to HTML conversion in my content pipeline, so in the meantime, here's a screenshot:
As you can see: it is quite good.
Next up is argh, which is also entirely gratuitous for this example, but which I'm going to use anyway, because I like using a declarative approach for command-line flags.
Also, it lets me showcase that you can have types like PathBuf
in
command-line arguments.
Didn't some folks react to your Mr Golang's Wild Ride saying: sure, Rust doesn't treat paths as strings, but std::env::args()
returns an Iterator
that yields String
items! Checkmate!
Yeah, exactly - and the answer is, if you don't feel like using
args_os yourself
(OsString
instances are a bit annoying to manipulate), then just use a
crate that does the right thing for you.
Right! And argh
is basically a spin on
structopt, but optimized for code size?
Yup.
use argh::FromArgs; use color_eyre::eyre; use std::path::PathBuf; /// Prints the SHA3-256 hash of a file. #[derive(FromArgs)] struct Args { /// the file whose contents to hash and print #[argh(positional)] file: PathBuf, } fn main() -> Result<(), eyre::Error> { color_eyre::install().unwrap(); let args: Args = argh::from_env(); let metadata = std::fs::metadata(&args.file)?; println!("{} is {} bytes", args.file.display(), metadata.len()); Ok(()) }
$ cargo run -q -- /etc/hosts /etc/hosts is 184 bytes
And then there's just sha3 left.
Speaking of - what do you mean, SHA3-256
- are there several
variants of SHA-3?
There is! MD5 has a digest size of 128 bits, SHA-1 is 160 bits, SHA-2 has a couple options (224, 256, 384 or 512 bits), and SHA-3 is also a family - a subset of the Keccak family, first published in 2015 by Bertoni, Daemen, Peeters and Assche.
2015 huh? I guess you won't have many things to compare your program's output with?
That's true - I haven't seen any SHA-3 variants in the wild yet. For example, AUR packages tend to provide either SHA-1 or SHA-256 sums. For the Ubuntu 20.04.1 downloads, only SHA-256 sums are available.
But that's not a problem - we can just compute a reference hash using the OpenSSL command-line interface:
$ time openssl dgst -sha3-256 ubuntu.iso SHA3-256(ubuntu.iso)= 5659eaf1fe2b98f149cb751aaa38b39e2b9573d5f19e773e78c5cc655262b915 openssl dgst -sha3-256 ubuntu.iso 6.68s user 0.35s system 99% cpu 7.032 total
And now for our version:
use argh::FromArgs; use color_eyre::eyre; use sha3::Digest; use std::{fs::File, io::Read, path::PathBuf}; /// Prints the SHA3-256 hash of a file. #[derive(FromArgs)] struct Args { /// the file whose contents to hash and print #[argh(positional)] file: PathBuf, } fn main() -> Result<(), eyre::Error> { color_eyre::install().unwrap(); let args: Args = argh::from_env(); let mut file = File::open(&args.file)?; let mut hasher = sha3::Sha3_256::new(); let mut buf = vec![0u8; 256 * 1024]; loop { let n = file.read(&mut buf[..])?; match n { 0 => break, n => hasher.update(&buf[..n]), } } let hash = hasher.finalize(); print!("{} ", args.file.display()); for x in hash { print!("{:02x}", x); } println!(); Ok(()) }
$ cargo build --release -q && time ./target/release/surviving ubuntu.iso ubuntu.iso 5659eaf1fe2b98f149cb751aaa38b39e2b9573d5f19e773e78c5cc655262b915 ./target/release/surviving ubuntu.iso 8.64s user 0.23s system 99% cpu 8.884 total
Hey, that's slower!
A bit, yeah! But more importantly - the result is correct. And it's pure Rust. And I'm sure it could be made just as fast, if someone really wanted to.
Excuses, excuses.
Okay tell you what cool bear - you go and squeeze out extra performance out of
sha3
, and I'll just finish this article.
For the rest of this article, we'll be hashing a slightly smaller file - the Wine 5.0.2 release (download here).
$ openssl dgst -sha3-256 wine-5.0.2.tar.xz SHA3-256(wine-5.0.2.tar.xz)= 2a48ac5363318b2b8dd222933002bac9fa0e1cc051605c17ebdae9f78ff03892 $ ./target/release/surviving wine-5.0.2.tar.xz wine-5.0.2.tar.xz 2a48ac5363318b2b8dd222933002bac9fa0e1cc051605c17ebdae9f78ff03892
Okay, let's bring some async
into the mix:
$ cargo add async-std Updating 'https://github.com/rust-lang/crates.io-index' index Adding async-std v1.6.2 to dependencies
We'll want some extra features so we'll change:
# in Cargo.toml async-std = "1.6.2"
To:
# in Cargo.toml async-std = { version = "1.6.2", features = ["attributes"] }
And now we can make our main function async!
// in `src/main.rs` #[async_std::main] async fn main() -> Result<(), eyre::Error> { // same as before }
$ cargo build --release -q && ./target/release/surviving wine-5.0.2.tar.xz wine-5.0.2.tar.xz 2a48ac5363318b2b8dd222933002bac9fa0e1cc051605c17ebdae9f78ff03892
That was easy! Well, as usual, thanks for reading and I'll see you ne..
Hoooooold on.
Oh hey bear, how are the performance optimizations going?
I gave up. Turns out crypto is hard.
But your program - nothing is async in there. It's all blocking.
Yeah, yeah, you're right of course. Let's make at least the file reads asynchronous:
use argh::FromArgs; // new: those were imported from `std` previously use async_std::{fs::File, io::ReadExt}; use color_eyre::eyre; use sha3::Digest; use std::path::PathBuf; /// Prints the SHA3-256 hash of a file. #[derive(FromArgs)] struct Args { /// the file whose contents to hash and print #[argh(positional)] file: PathBuf, } #[async_std::main] async fn main() -> Result<(), eyre::Error> { color_eyre::install().unwrap(); let args: Args = argh::from_env(); // changed: now awaited let mut file = File::open(&args.file).await?; let mut hasher = sha3::Sha3_256::new(); let mut buf = vec![0u8; 256 * 1024]; loop { // changed: now awaited (read is provided by `ReadExt`) let n = file.read(&mut buf[..]).await?; match n { 0 => break, n => hasher.update(&buf[..n]), } } let hash = hasher.finalize(); print!("{} ", args.file.display()); for x in hash { print!("{:02x}", x); } println!(); Ok(()) }
$ cargo build --release -q && ./target/release/surviving wine-5.0.2.tar.xz wine-5.0.2.tar.xz 2a48ac5363318b2b8dd222933002bac9fa0e1cc051605c17ebdae9f78ff03892
Okay, everything still works.
What uh.. what's the difference though? Our program worked before, and it works now. What did async give us? Is it faster? Does it do stuff in parallel?
Does it use threads?
$ cargo build -q $ gdb --args ./target/debug/surviving ./wine-5.0.2.tar.xz ... (gdb) break pthread_create Function "pthread_create" not defined. Make breakpoint pending on future shared library load? (y or [n]) y Breakpoint 1 (pthread_create) pending. (gdb) r Starting program: /home/amos/ftl/surviving/target/debug/surviving ./wine-5.0.2.tar.xz [Thread debugging using libthread_db enabled] Using host libthread_db library "/usr/lib/libthread_db.so.1". Breakpoint 1, 0x00007ffff7f6d700 in pthread_create@@GLIBC_2.2.5 () from /usr/lib/libpthread.so.0 (gdb) bt #0 0x00007ffff7f6d700 in pthread_create@@GLIBC_2.2.5 () from /usr/lib/libpthread.so.0 #1 0x000055555577f43d in std::sys::unix::thread::Thread::new () at src/libstd/sys/unix/thread.rs:66 #2 0x00005555556dbdac in std::thread::Builder::spawn_unchecked (self=..., f=...) at /home/amos/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/src/libstd/thread/mod.rs:492 #3 0x00005555556dc389 in std::thread::Builder::spawn (self=..., f=...) at /home/amos/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/src/libstd/thread/mod.rs:386 #4 0x00005555556ddf10 in async_std::rt::RUNTIME::{{closure}} () at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/async-std-1.6.2/src/rt/mod.rs:28 #5 0x00005555556c920e in core::ops::function::FnOnce::call_once () at /home/amos/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/src/libcore/ops/function.rs:232 #6 0x00005555556c91fb in core::ops::function::FnOnce::call_once () at /home/amos/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/src/libcore/ops/function.rs:232 #7 0x00005555556d7109 in once_cell::sync::Lazy<T,F>::force::{{closure}} () at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/once_cell-1.4.0/src/lib.rs:953 #8 0x00005555556d728f in once_cell::sync::OnceCell<T>::get_or_init::{{closure}} () at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/once_cell-1.4.0/src/lib.rs:786 #9 0x00005555556d8ec9 in once_cell::imp::OnceCell<T>::initialize::{{closure}} () at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/once_cell-1.4.0/src/imp_std.rs:97 #10 0x000055555575a5f2 in once_cell::imp::initialize_inner (my_state_and_queue=0x555555855040 <async_std::rt::RUNTIME>, init=...) at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/once_cell-1.4.0/src/imp_std.rs:171 #11 0x00005555556d8dc0 in once_cell::imp::OnceCell<T>::initialize (self=0x555555855040 <async_std::rt::RUNTIME>, f=...) at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/once_cell-1.4.0/src/imp_std.rs:95 #12 0x00005555556d735c in once_cell::sync::OnceCell<T>::get_or_try_init (self=0x555555855040 <async_std::rt::RUNTIME>, f=...) at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/once_cell-1.4.0/src/lib.rs:826 #13 0x00005555556d722e in once_cell::sync::OnceCell<T>::get_or_init (self=0x555555855040 <async_std::rt::RUNTIME>, f=...) at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/once_cell-1.4.0/src/lib.rs:786 #14 0x00005555556d7046 in once_cell::sync::Lazy<T,F>::force (this=0x555555855040 <async_std::rt::RUNTIME>) at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/once_cell-1.4.0/src/lib.rs:952 #15 0x000055555559c25f in async_std::task::builder::Builder::build (self=..., future=...) at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/async-std-1.6.2/src/task/builder.rs:42 #16 0x000055555559c439 in async_std::task::builder::Builder::blocking (self=..., future=...) at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/async-std-1.6.2/src/task/builder.rs:146 #17 0x00005555555943c6 in async_std::task::block_on::block_on (future=...) at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/async-std-1.6.2/src/task/block_on.rs:33 #18 0x000055555559d66b in surviving::main () at src/main.rs:16
Yeah, okay, it uses threads.
But it doesn't do anything in parallel - our code as written is sequential. First open the file, then read from it, feed it to the hasher (which is synchronous, and CPU-bound), etc.
Things would get interesting if we were to spawn tasks, to hash multiple files in parallel:
use argh::FromArgs; use async_std::{fs::File, io::ReadExt}; use color_eyre::eyre; use sha3::Digest; use std::path::{Path, PathBuf}; /// Prints the SHA3-256 hash of some files #[derive(FromArgs)] struct Args { /// the files whose contents to hash and print #[argh(positional)] files: Vec<PathBuf>, } #[async_std::main] async fn main() -> Result<(), eyre::Error> { color_eyre::install().unwrap(); let args: Args = argh::from_env(); let mut handles = Vec::new(); for file in &args.files { let file = file.clone(); let handle = async_std::task::spawn(async move { let res = hash_file(&file).await; if let Err(e) = res { println!("While hashing {}: {}", file.display(), e); } }); handles.push(handle); } for handle in handles { handle.await; } Ok(()) } async fn hash_file(path: &Path) -> Result<(), eyre::Error> { let mut file = File::open(path).await?; let mut hasher = sha3::Sha3_256::new(); let mut buf = vec![0u8; 256 * 1024]; loop { let n = file.read(&mut buf[..]).await?; match n { 0 => break, n => hasher.update(&buf[..n]), } } let hash = hasher.finalize(); print!("{} ", path.display()); for x in hash { print!("{:02x}", x); } println!(); Ok(()) }
$ cargo build --release -q && ./target/release/surviving wine-* wine-4.0.4.tar.xz d53a567e7a14a2b8dbd669afece1603daa769ada93522ec1f26fe69674d7c433 wine-5.0.2.tar.xz 2a48ac5363318b2b8dd222933002bac9fa0e1cc051605c17ebdae9f78ff03892 wine-5.0.1.tar.xz 4af0d295e56db9f723daa4822ee4b4416ac8840278ccd7df73ef357027e5b663
Okay, cool - multiple files. But is it using multiple threads?
Well I don't know! It works, but.. I don't know a lot beyond that.
So let's dig a little.
Here's one thing that's rather easy to do: threads have a unique
identifier (process-wide). We can just grab it and print it from
whichever place we choose, for example... right before calling read()
:
async fn hash_file(path: &Path) -> Result<(), eyre::Error> { // etc. loop { println!("{:?}", std::thread::current().id()); let n = file.read(&mut buf[..]).await?; match n { 0 => break, n => hasher.update(&buf[..n]), } } // (cut)
$ cargo build --release -q && time ./target/release/surviving wine-* ThreadId(3) ThreadId(3) ThreadId(2) ThreadId(2) ThreadId(3) ThreadId(1) ThreadId(1) ThreadId(3) ThreadId(2) ThreadId(3) ThreadId(2) ThreadId(3) ThreadId(1) ThreadId(2)
Ooooooh it's using threads alright.
Yeah. But are the tasks (each task is hashing a different file) always on the same thread?
Let's do some more printing:
// in `fn hash_file` // in `loop` println!("{} => {:?}", path.display(), std::thread::current().id());
$ cargo build --release -q && time ./target/release/surviving wine-* wine-4.0.4.tar.xz => ThreadId(3) wine-5.0.1.tar.xz => ThreadId(3) wine-5.0.2.tar.xz => ThreadId(2) wine-5.0.1.tar.xz => ThreadId(3) wine-5.0.2.tar.xz => ThreadId(1) wine-4.0.4.tar.xz => ThreadId(2) wine-4.0.4.tar.xz => ThreadId(2) wine-5.0.1.tar.xz => ThreadId(1) wine-5.0.2.tar.xz => ThreadId(3) wine-4.0.4.tar.xz => ThreadId(2) wine-5.0.2.tar.xz => ThreadId(2) wine-5.0.1.tar.xz => ThreadId(3) wine-4.0.4.tar.xz => ThreadId(1) wine-5.0.2.tar.xz => ThreadId(2)
No, they are not pinned. Not at all, in fact. We can see that the
wine-4.0.4.tar.xz
task hops from thread 3, to thread 2, then to thread 1.
Everything is happening a little too fast for me though. I'd like to try something else - I'd like to try and make an asynchronous reader that sleeps before each read.
Okay, wrapping a Reader is simple right? All we have to do is...
declare a struct, which itself has an inner reader - something that implements async_std::io::Read
trait, which is just futures::io::AsyncRead in disguise.
For clarity, we'll add futures
to our Cargo.toml
and use that trait directly:
$ cargo add futures Updating 'https://github.com/rust-lang/crates.io-index' index Adding futures v0.3.5 to dependencies
use futures::io::AsyncRead; struct TracingReader<R> where R: AsyncRead, { inner: R, } impl<R> AsyncRead for TracingReader<R> where R: AsyncRead {}
Very good - now, what trait items do we need to implement for AsyncRead
?
Let's let rust-analyzer implement the missing method for us:
use futures::io::AsyncRead; use std::{ io, pin::Pin, task::{Context, Poll}, }; struct TracingReader<R> where R: AsyncRead, { inner: R, } impl<R> AsyncRead for TracingReader<R> where R: AsyncRead, { fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<io::Result<usize>> { todo!() } }
This looks uhh... different from std::io::Read
.
Yeah, it does! Well, it takes a &mut [u8]
, which I recognize. It also
returns a type that contains an io::Result<usize>
.
But everything else is different. It has a Context
, which I'm guessing
has something to do with the async executor. It returns a Poll
type,
and its self
feels a bit.. unfamiliar maybe?
Let's do a bit of reading on pinning:
Types that pin data to its location in memory.
It is sometimes useful to have objects that are guaranteed not to move, in the sense that their placement in memory does not change, and can thus be relied upon. A prime example of such a scenario would be building self-referential structs, as moving an object with pointers to itself will invalidate them, which could cause undefined behavior.
So, one could probably write a whole book about pinning.
A book you say..
...but let's keep this short and sweet. Basically, pinning something prevents us from ever getting a mutable reference to it. Which means we can no longer use, for example, std::mem::swap, or std::mem::replace.
However, given a Pin<&mut T>
, we can get a mutable reference to one of our fields.
...right?
impl<R> AsyncRead for TracingReader<R> where R: AsyncRead, { fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<io::Result<usize>> { let inner: &mut R = &mut self.inner; todo!() } }
$ cargo check Checking surviving v0.1.0 (/home/amos/ftl/surviving) error[E0596]: cannot borrow data in a dereference of `std::pin::Pin<&mut TracingReader<R>>` as mutable --> src/main.rs:90:29 | 90 | let inner: &mut R = &mut self.inner; | ^^^^^^^^^^^^^^^ cannot borrow as mutable | = help: trait `DerefMut` is required to modify through a dereference, but it is not implemented for `std::pin::Pin<&mut TracingReader<R>>` error: aborting due to previous error
Ah, uh, it's not that simple. To get a &mut self.inner
, we'd have to get a &mut self
first, and that's precisely what Pin
prevents.
Turns out - we have two options. Either we can decide that "pinning is not
structural" for our field, inner
, and then we can use unsafe code to get a &mut R
:
impl<R> AsyncRead for TracingReader<R> where R: AsyncRead, { fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<io::Result<usize>> { let inner: &mut R = unsafe { &mut self.get_unchecked_mut().inner }; todo!() } }
The problem here is that... even though R
implements AsyncRead
, we
cannnot call poll_read
on our inner
variable:
impl<R> AsyncRead for TracingReader<R> where R: AsyncRead, { fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<io::Result<usize>> { let inner: &mut R = unsafe { &mut self.get_unchecked_mut().inner }; inner.poll_read(cx, buf) } }
In fact, the compiler error is a bit.. lacking for that particular scenario:
$ cargo check Checking surviving v0.1.0 (/home/amos/ftl/surviving) error[E0599]: no method named `poll_read` found for mutable reference `&mut R` in the current scope --> src/main.rs:91:15 | 91 | inner.poll_read(cx, buf) | ^^^^^^^^^ method not found in `&mut R` | = help: items from traits can only be used if the type parameter is bounded by the trait help: the following trait defines an item `poll_read`, perhaps you need to restrict type parameter `R` with it: | 81 | impl<R: futures_io::if_std::AsyncRead> AsyncRead for TracingReader<R> | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Which doesn't stop it from being absolutely correct. There is no method
named poll_read
for R that takes a self: &mut Self
(usually written as
its shorthand, &mut self
).
There is one that takes Pin<&mut Self>
, though.
Which brings us to option two: we can decide that "pinning is structural" for our field inner
.
Which it is - since we want to call poll_read
on it, which needs a Pin<&mut Self>
, we must
guarantee that, while TracingReader<R>
is pinned, TracingReader::inner
is also pinned.
And there is also a bit of unsafe code that lets us obtain a Pin<&mut R>
:
impl<R> AsyncRead for TracingReader<R> where R: AsyncRead, { fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<io::Result<usize>> { // tracing let address = &self as *const _; println!("{:?} => {:?}", address, std::thread::current().id()); // reading - pinning is structural for `self.inner` let inner: Pin<&mut R> = unsafe { self.map_unchecked_mut(|x| &mut x.inner) }; inner.poll_read(cx, buf) } }
Now we can use our TracingReader
directly from hash_file
:
async fn hash_file(path: &Path) -> Result<(), eyre::Error> { let file = File::open(path).await?; let mut file = TracingReader { inner: file }; let mut hasher = sha3::Sha3_256::new(); let mut buf = vec![0u8; 256 * 1024]; loop { let n = file.read(&mut buf[..]).await?; match n { 0 => break, n => hasher.update(&buf[..n]), } } let hash = hasher.finalize(); print!("{} ", path.display()); for x in hash { print!("{:02x}", x); } println!(); Ok(()) }
$ cargo build --release -q && time ./target/release/surviving wine-* 0x7f54804343e0 => ThreadId(2) 0x7f54804343e0 => ThreadId(2) 0x7f54804343e0 => ThreadId(2) 0x7f54804343e0 => ThreadId(2) 0x7ffff9685970 => ThreadId(1) 0x7f547b3f83e0 => ThreadId(10) 0x7f54804343e0 => ThreadId(2) 0x7ffff9685970 => ThreadId(1) 0x7f547b3f83e0 => ThreadId(10)
Wait a minute... if you can obtain both a &mut R
and a Pin<&mut R>
...
doesn't that kind of defeat the purpose of pinning?
One could pass a Pin<&mut R>
to a method, and the next moment, they could
use a &mut R
to move it, deinitialize it, etc.
What a good-looking question. You definitely can get into trouble that way,
but you'll note that it is using unsafe
.
I was getting to that - why are we using unsafe
in the first place?
Isn't there another way? A better way?
There is! There's nothing in the Rust type system that prevents you from calling
both get_unchecked_mut
and map_unchecked_mut
for the same field - that's why
they're both unsafe.
But the problem can be solved another way - and I want to hear the people in the back this time: there is a crate for that.
$ cargo add pin-project Updating 'https://github.com/rust-lang/crates.io-index' index Adding pin-project v0.4.23 to dependencies
Project? Like, project management?
No, like, projection. But I was confused as well the first couple times it showed up in the dependencies of a project. Don't tell anyone though.
With pin-project
, you get to decide which fields are pinned, and which
fields aren't.
In our case, we have only one field, and it is pinned:
use pin_project::pin_project; // Generate projection types #[pin_project] struct TracingReader<R> where R: AsyncRead, { // pinning is structural for `inner` #[pin] inner: R, }
Then, we can use self.project()
(which itself takes a Pin<&mut Self>
) to get
&mut T
for unpinned fields, and Pin<&mut T>
for pinned fields.
impl<R> AsyncRead for TracingReader<R> where R: AsyncRead, { fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<io::Result<usize>> { // tracing let address = &self as *const _; println!("{:?} => {:?}", address, std::thread::current().id()); // reading self.project().inner.poll_read(cx, buf) } }
That way - there's no unsafe
code directly in our crate, and we can't
accidentally change our mind between a field being pinned and a field being
unpinned.
Okay. That pinning stuff is still very fresh in my bear mind right now, but... I have another question.
Why isn't AsyncRead::poll_read
an async fn
?
Ah - an "easy" question.
(Keep in mind this is written in August 2020 - this article might be in dire need of being updated at some point.)
At the time of this writing, async
functions in traits are not supported.
What?
See for yourself:
trait SimpleRead { async fn simple_read(&mut self, buf: &mut [u8]) -> io::Result<usize>; }
$ cargo check error[E0706]: functions in traits cannot be declared `async` --> src/main.rs:107:5 | 107 | async fn simple_read(&mut self, &mut buf[u8]) -> io::Result<usize>; | -----^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | | | `async` because of this | = note: `async` trait functions are not currently supported = note: consider using the `async-trait` crate: https://crates.io/crates/async-trait
Hopefully that won't always be the case, but for the time being we're stuck with it.
Wait, wait - the compiler just told us about a crate.
Of course it did.
async-trait lets us work that limitation a little, so let's try it out:
$ cargo add async-trait Updating 'https://github.com/rust-lang/crates.io-index' index Adding async-trait v0.1.36 to dependencies
use async_trait::async_trait; #[async_trait] trait SimpleRead { async fn simple_read(&mut self, buf: &mut [u8]) -> io::Result<usize>; }
And.. voilà! This compiles just fine.
Okay... where's the catch?
Can we try implementing it for TracingReader
, for example?
Sure, yeah, let's do it:
use async_trait::async_trait; #[async_trait] trait SimpleRead { async fn simple_read(&mut self, buf: &mut [u8]) -> io::Result<usize>; } #[async_trait] impl<R> SimpleRead for TracingReader<R> where R: AsyncRead, { async fn simple_read(&mut self, buf: &mut [u8]) -> io::Result<usize> { self.inner.read(buf).await } }
$ cargo check Checking surviving v0.1.0 (/home/amos/ftl/surviving) error[E0277]: `R` cannot be unpinned --> src/main.rs:119:20 | 119 | self.inner.read(buf).await | ^^^^ the trait `std::marker::Unpin` is not implemented for `R` | help: consider further restricting this bound | 116 | R: AsyncRead + std::marker::Unpin, | ^^^^^^^^^^^^^^^^^^^^
Uh oh. Well, let's see... we have a &mut Self
. In order to call
AsyncReadExt::read
, we need a &mut R
where R
is Unpin
.
Well... you know what is Unpin
? Pin<&mut R>
.
If we were to take a Pin<&mut Self>
, we could use project
(generated via
pin-project
) and get a Pin<&mut R>
:
#[async_trait] trait SimpleRead { async fn simple_read(self: Pin<&mut Self>, buf: &mut [u8]) -> io::Result<usize>; } #[async_trait] impl<R> SimpleRead for TracingReader<R> where R: AsyncRead, { async fn simple_read(self: Pin<&mut Self>, buf: &mut [u8]) -> io::Result<usize> { self.project().inner.read(buf).await } }
$ cargo check Checking surviving v0.1.0 (/home/amos/ftl/surviving) error: future cannot be sent between threads safely --> src/main.rs:118:85 | 118 | async fn simple_read(self: Pin<&mut Self>, buf: &mut [u8]) -> io::Result<usize> { | _____________________________________________________________________________________^ 119 | | self.project().inner.read(buf).await 120 | | } | |_____^ future returned by `__simple_read` is not `Send` | = help: within `TracingReader<R>`, the trait `std::marker::Send` is not implemented for `R` note: future is not `Send` as this value is used across an await --> src/main.rs:119:9 | 118 | async fn simple_read(self: Pin<&mut Self>, buf: &mut [u8]) -> io::Result<usize> { | ---- has type `std::pin::Pin<&mut TracingReader<R>>` which is not `Send` 119 | self.project().inner.read(buf).await | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ await occurs here, with `self` maybe used later 120 | } | - `self` is later dropped here = note: required for the cast to the object type `dyn core::future::future::Future<Output = std::result::Result<usize, std::io::Error>> + std::marker::Send` help: consider further restricting this bound | 116 | R: AsyncRead + std::marker::Send, | ^^^^^^^^^^^^^^^^^^^
Okay, we're 90% of the way there - it no longers complains about R
not
being Unpin
. But now it complains about the future not being Send
.
The future? What future.
Well, every async
function is a normal function in disguise, that returns
an impl Future
.
Oh.. and futures can be polled? By executors?
Passing them a context, yes.
Just like poll_read
?
Just like poll_read
, yeah!
In our case, since we're in future "async fn in traits are supported" land,
we don't see the concrete Future
type - it's all generated by the compiler
under the cover.
But we know that the generated code uses a Pin<&mut R>
at some point, which
refers to the self.inner
field, and R
is not Send
, so our Future is not
Send
either.
And that's one of the big limitations of async-trait
. When using native
async fn
support in Rust, sometimes futures end up being Send
, sometimes
they end up not being Send
(ie. they cannot be safely sent across threads).
But when using async-trait
, you have to pick one. And the default is:
whatever Future the async fn
returns, must be Send
.
We can lift that limitation by using #[async_trait(?Send)]
:
#[async_trait(?Send)] trait SimpleRead { async fn simple_read(self: Pin<&mut Self>, buf: &mut [u8]) -> io::Result<usize>; } #[async_trait(?Send)] impl<R> SimpleRead for TracingReader<R> where R: AsyncRead, { async fn simple_read(self: Pin<&mut Self>, buf: &mut [u8]) -> io::Result<usize> { self.project().inner.read(buf).await } }
And then we have a fully working trait, which we can use from hash_file
.
Right?
async fn hash_file(path: &Path) -> Result<(), eyre::Error> { let file = File::open(path).await?; let mut file = TracingReader { inner: file }; let mut hasher = sha3::Sha3_256::new(); let mut buf = vec![0u8; 256 * 1024]; loop { let n = file.simple_read(&mut buf[..]).await?; match n { 0 => break, n => hasher.update(&buf[..n]), } } // etc. }
$ cargo check Checking surviving v0.1.0 (/home/amos/ftl/surviving) error[E0599]: no method named `simple_read` found for struct `TracingReader<async_std::fs::file::File>` in the current scope --> src/main.rs:51:22 | 51 | let n = file.simple_read(&mut buf[..]).await?; | ^^^^^^^^^^^ method not found in `TracingReader<async_std::fs::file::File>` ... 79 | / struct TracingReader<R> 80 | | where 81 | | R: AsyncRead, 82 | | { ... | 85 | | inner: R, 86 | | } | |_- method `simple_read` not found for this ... 110 | async fn simple_read(self: Pin<&mut Self>, buf: &mut [u8]) -> io::Result<usize>; | -------------- the method might not be found because of this arbitrary self type
Oh no. This sounds awfully familiar.
Oh that's right! Our simple_read
method takes a Pin<&mut Self>
now, not a &mut self
.
I guess we can just... pin it ourselves?
let n = Pin::new(&mut file).simple_read(&mut buf[..]).await?;
$ cargo check Checking surviving v0.1.0 (/home/amos/ftl/surviving) error: future cannot be sent between threads safely --> src/main.rs:27:22 | 27 | let handle = async_std::task::spawn(async move { | ^^^^^^^^^^^^^^^^^^^^^^ future created by async block is not `Send` | ::: /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/async-std-1.6.2/src/task/spawn.rs:28:29 | 28 | F: Future<Output = T> + Send + 'static, | ---- required by this bound in `async_std::task::spawn::spawn` | = help: the trait `std::marker::Send` is not implemented for `dyn core::future::future::Future<Output = std::result::Result<usize, std::io::Error>>` note: future is not `Send` as it awaits another future which is not `Send` --> src/main.rs:51:17 | 51 | let n = Pin::new(&mut file).simple_read(&mut buf[..]).await?; | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ await occurs here on type `std::pin::Pin<std::boxed::Box<dyn core::future::future::Future<Output = std::result::Result<usize, std::io::Error>>>>`, which is not `Send`
Ohhhhh.
Oh!
Ah. Turns out, spawn
requires the, well, spawned Future
to be Send
.
Which makes sense! Since it spawns the Future on the threaded executor, as
we've seen (by printing the ThreadId
).
What if.. what if we tried spawning it on the thread-local executor?
Turns out, as of async-std
1.6.2, that's an unstable feature, but
let's try it anyway:
# in `Cargo.toml` async-std = { version = "1.6.2", features = ["attributes", "unstable"] }
$ cargo build --release -q && time ./target/release/surviving wine-* wine-4.0.4.tar.xz d53a567e7a14a2b8dbd669afece1603daa769ada93522ec1f26fe69674d7c433 wine-5.0.1.tar.xz 4af0d295e56db9f723daa4822ee4b4416ac8840278ccd7df73ef357027e5b663 wine-5.0.2.tar.xz 2a48ac5363318b2b8dd222933002bac9fa0e1cc051605c17ebdae9f78ff03892 ./target/release/surviving wine-* 0.22s user 0.04s system 117% cpu 0.218 total
Oh hey it works!
But.. how does it work? Does it just do them in sequence? Let's compare timings:
$ for i in wine-*; do time ./target/release/surviving $i; done wine-4.0.4.tar.xz d53a567e7a14a2b8dbd669afece1603daa769ada93522ec1f26fe69674d7c433 ./target/release/surviving $i 0.05s user 0.03s system 103% cpu 0.078 total wine-5.0.1.tar.xz 4af0d295e56db9f723daa4822ee4b4416ac8840278ccd7df73ef357027e5b663 ./target/release/surviving $i 0.08s user 0.00s system 103% cpu 0.081 total wine-5.0.2.tar.xz 2a48ac5363318b2b8dd222933002bac9fa0e1cc051605c17ebdae9f78ff03892 ./target/release/surviving $i 0.08s user 0.00s system 102% cpu 0.082 total
So, measuring like that is a terrible idea, and don't do it, and all
the usual caveats, BUT this is 2020, all bets are off, so: if we add up the
"total" times for our for i in wine-*
(sequential) run, we get results
around 0.24s - whereas the surviving wine-*
(mystery) run stays around the
0.21s range.
This is.. this not scientific at all.
Also, aren't you running these on a laptop?
Yeah, yes, absolutely. This is absolute garbage methodology - so, let's do the same thing we did before and just print the current thread's identifier:
#[async_trait(?Send)] impl<R> SimpleRead for TracingReader<R> where R: AsyncRead, { async fn simple_read(self: Pin<&mut Self>, buf: &mut [u8]) -> io::Result<usize> { // tracing let address = &self as *const _; println!("{:?} => {:?}", address, std::thread::current().id()); // reading self.project().inner.read(buf).await } }
$ cargo build --release -q && time ./target/release/surviving wine-* 0x55a274428fe8 => ThreadId(1) 0x55a2744292c8 => ThreadId(1) 0x55a274429458 => ThreadId(1) 0x55a274428fe8 => ThreadId(1) 0x55a2744292c8 => ThreadId(1) 0x55a274429458 => ThreadId(1) 0x55a274428fe8 => ThreadId(1) 0x55a2744292c8 => ThreadId(1) 0x55a274429458 => ThreadId(1) 0x55a274428fe8 => ThreadId(1) 0x55a2744292c8 => ThreadId(1) 0x55a274429458 => ThreadId(1) 0x55a274428fe8 => ThreadId(1) 0x55a2744292c8 => ThreadId(1)
Whoa.
It's all running on the same thread!
Yes it is! And what's even more interesting is that you can see the tasks
alternate: first fe8
, then 2c8
, then 458
, etc.
So, my feeling is that it is actually a little bit faster, because it's queueing reads, switching to another task whenever they're ready, blocking on the hash computation, then switching again, etc. - it spends less time waiting for I/O syscalls.
But if we really want to get faster performance, we probably do want multi-threading.
So we'll have to make our futures Send
:
#[async_trait] trait SimpleRead { async fn simple_read(self: Pin<&mut Self>, buf: &mut [u8]) -> io::Result<usize>; } #[async_trait] impl<R> SimpleRead for TracingReader<R> where R: AsyncRead, { async fn simple_read(self: Pin<&mut Self>, buf: &mut [u8]) -> io::Result<usize> { // tracing let address = &self as *const _; println!("{:?} => {:?}", address, std::thread::current().id()); // reading self.project().inner.read(buf).await } }
$ cargo check Checking surviving v0.1.0 (/home/amos/ftl/surviving) error: future cannot be sent between threads safely --> src/main.rs:118:85 | 118 | async fn simple_read(self: Pin<&mut Self>, buf: &mut [u8]) -> io::Result<usize> { | _____________________________________________________________________________________^ 119 | | // tracing 120 | | let address = &self as *const _; 121 | | println!("{:?} => {:?}", address, std::thread::current().id()); ... | 124 | | self.project().inner.read(buf).await 125 | | } | |_____^ future returned by `__simple_read` is not `Send` | = help: within `impl core::future::future::Future`, the trait `std::marker::Send` is not implemented for `*const std::pin::Pin<&mut TracingReader<R>>` note: future is not `Send` as this value is used across an await --> src/main.rs:124:9 | 120 | let address = &self as *const _; | ------- has type `*const std::pin::Pin<&mut TracingReader<R>>` which is not `Send` ... 124 | self.project().inner.read(buf).await | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ await occurs here, with `address` maybe used later 125 | } | - `address` is later dropped here = note: required for the cast to the object type `dyn core::future::future::Future<Output = std::result::Result<usize, std::io::Error>> + std::marker::Send`
Okay, back to square one. Let's squint at the compiler messages: we have to
constrain R
further.
Right now we're only asking of R
that it implements AsyncRead
. If we also
ask that it is Send
, then our TracingReader<R>
should also automatically
implement Send
, since the only field of the struct is of type R
.
In fact, we only have to add it to our impl SimpleRead
block:
#[async_trait] impl<R> SimpleRead for TracingReader<R> where R: AsyncRead + Send, { async fn simple_read(self: Pin<&mut Self>, buf: &mut [u8]) -> io::Result<usize> { // tracing let address = &self as *const _; println!("{:?} => {:?}", address, std::thread::current().id()); // reading self.project().inner.read(buf).await } }
There! It's done, it's ready for the prime ti..
$ cargo check Checking surviving v0.1.0 (/home/amos/ftl/surviving) error: future cannot be sent between threads safely --> src/main.rs:118:85 | 118 | async fn simple_read(self: Pin<&mut Self>, buf: &mut [u8]) -> io::Result<usize> { | _____________________________________________________________________________________^ 119 | | // tracing 120 | | let address = &self as *const _; 121 | | println!("{:?} => {:?}", address, std::thread::current().id()); ... | 124 | | self.project().inner.read(buf).await 125 | | } | |_____^ future returned by `__simple_read` is not `Send` | = help: within `impl core::future::future::Future`, the trait `std::marker::Send` is not implemented for `*const std::pin::Pin<&mut TracingReader<R>>` note: future is not `Send` as this value is used across an await --> src/main.rs:124:9 | 120 | let address = &self as *const _; | ------- has type `*const std::pin::Pin<&mut TracingReader<R>>` which is not `Send` ... 124 | self.project().inner.read(buf).await | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ await occurs here, with `address` maybe used later 125 | } | - `address` is later dropped here = note: required for the cast to the object type `dyn core::future::future::Future<Output = std::result::Result<usize, std::io::Error>> + std::marker::Send`
it's not done. That error is really confusing.
Let's see - address
has type *const std::pin::Pin<&mut TracingReader<R>>
(not what I was going for, but ah well), and that type is not Send
, so we cannot hold on to it
across an await
.
That part actually makes sense. When we .await
, what actually happens is that
the poll
method of our Future
returns Poll::Pending
, and the next time it's polled,
it might be polled from a different thread - since we promised our Future was send
(as is async-trait
's default, which we just reverted back to).
The part that doesn't make sense is with address maybe used later
.
Where?? Where may it be used later? Tell me rustc!
Well, address
only gets dropped at the end of the scope - and Drop::drop
takes a &mut self
. So if it does implement Drop
, it'll be used after
the await
!
Okay - but then why doesn't it work if we just explicitly call drop(address)
after the println!
?
I uh...
Uh-huh. I'm fairly sure it's a case of "the compiler is not sufficiently smart to figure this yet" - Rust is a research project gone terribly right, after all - at any rate, we can solve our problem using a scope:
async fn simple_read(self: Pin<&mut Self>, buf: &mut [u8]) -> io::Result<usize> { // tracing { let address = &self as *const _; println!("{:?} => {:?}", address, std::thread::current().id()); } // reading self.project().inner.read(buf).await }
And with that change, everything compiles again.
But I'm not really satisfied yet. Having to take Pin<&mut Self>
, and
having to pin manually when calling simple_read
seems like a major
usability issue.
Can't we just let the compiler, uh, hand waves handle it?
Let's see. The last time we tried to take &mut self
, the compiler
complained that R
wasn't Unpin
. So, what if we just make it Unpin
?
#[async_trait] impl<R> SimpleRead for TracingReader<R> where R: AsyncRead + Send + Unpin, { async fn simple_read(&mut self, buf: &mut [u8]) -> io::Result<usize> { // tracing { let address = &self as *const _; println!("{:?} => {:?}", address, std::thread::current().id()); } // reading self.inner.read(buf).await } }
And now we can once again use it without pinning ourselves:
// in `fn hash_file` let n = file.simple_read(&mut buf[..]).await?;
That works just fine! But we're still using async_std::task::spawn_local
.
Can we use it with async_std::task::spawn
?
Yeah! And if we do, it's definitely faster than hashing each file in sequence:
$ for i in wine-*; do time ./target/release/surviving $i > /dev/null; done ./target/release/surviving $i > /dev/null 0.06s user 0.02s system 101% cpu 0.079 total ./target/release/surviving $i > /dev/null 0.09s user 0.00s system 101% cpu 0.085 total ./target/release/surviving $i > /dev/null 0.08s user 0.00s system 101% cpu 0.082 total
$ cargo build --release -q && time ./target/release/surviving wine-* > /dev/null ./target/release/surviving wine-* > /dev/null 0.24s user 0.01s system 279% cpu 0.089 total
Okay, very well. But what's the point of using the async machinery then?
It seems to me like, either you use spawn_local
, and then everything
is on the same thread and only I/O is non-blocking, or you use spawn
and it ends up on a bunch of threads - but then why not just use
std::thread::spawn
yourself?
Well! Spawning one thread per task would eventually become expensive. Creating a thread is fast compared to making a network request, but it's slow compared to just polling a task for completion. A threaded executor will usually have a maximum number of threads, and it'll execute N tasks (where N could be very large) on M threads (where M remains reasonably low).
Also, keep in mind that here, we're reading from a fast I/O device: a local SSD. But what if we were reading from the network instead? Or god forbid... tape storage?
Then we'd spend a lot of time waiting for I/O - and an executor (threaded or not) would be able to let each task advance as soon as their I/O request is ready.
So async is not just a "make things faster" button?
No, it's not. In fact, it's more of a "make things slower" button - because there is more bookkeeping to do. But it also allows "things" to scale better. Note that futures are not the only way to do asynchronous I/O in Rust - you can definitely poll file descriptors yourself, have your own task system, etc.
Futures are just what ended up becoming standard, and a whole ecosystem is growing around them.
Speaking of standards - what if you have a type that implements a trait with
async methods, like SimpleRead
, and you want to implement AsyncRead
on top
of it?
So that it can be passed to async-std's copy
method, for example?
Ah, that part is not as easy. But it's not impossible either!
Let's remove our AsyncRead
implementation for TracingReader
.
We can also remove pin-project
from it, leaving us with:
// Generate projection types struct TracingReader<R> where R: AsyncRead, { inner: R, }
...and our impl SimpleRead for TracingReader
block.
Now, can we make a type that forward AsyncRead
to another type that
implements SimpleRead
?
Let's try:
use pin_project::pin_project; #[pin_project] struct SimpleAsyncReader<R> where R: SimpleRead, { // we're going to need a `&mut R` out of a `Pin<&mut Self>`, // to call `SimpleRead::simple_read`. inner: R, } impl<R> AsyncRead for SimpleAsyncReader<R> where R: SimpleRead, { fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<io::Result<usize>> { // step 1: project let inner = self.project().inner; // step 2: obtain future let mut fut = inner.simple_read(buf); // step 3: pin and poll future Pin::new(&mut fut).poll(cx) } }
(I've commented out the tracing part so the output is less verbose).
Now we have to actually use it:
async fn hash_file(path: &Path) -> Result<(), eyre::Error> { let file = File::open(path).await?; let file = TracingReader { inner: file }; // new! let mut file = SimpleAsyncReader { inner: file }; let mut hasher = sha3::Sha3_256::new(); let mut buf = vec![0u8; 256 * 1024]; loop { // back to `read` (used to be `simple_read`) let n = file.read(&mut buf[..]).await?; match n { 0 => break, n => hasher.update(&buf[..n]), } } let hash = hasher.finalize(); print!("{} ", path.display()); for x in hash { print!("{:02x}", x); } println!(); Ok(()) }
So, did we nail it?
$ cargo build --release -q && time ./target/release/surviving wine-* wine-4.0.4.tar.xz d53a567e7a14a2b8dbd669afece1603daa769ada93522ec1f26fe69674d7c433 wine-5.0.2.tar.xz 2a48ac5363318b2b8dd222933002bac9fa0e1cc051605c17ebdae9f78ff03892 wine-5.0.1.tar.xz 4af0d295e56db9f723daa4822ee4b4416ac8840278ccd7df73ef357027e5b663 ./target/release/surviving wine-* 0.24s user 0.02s system 276% cpu 0.093 total
Mhhh it appears to work... suspicious.
Let's try changing up our TracingReader
a bit. Instead of making it
trace, we'll make it artificially slow down reads.
$ cargo add futures-timer Updating 'https://github.com/rust-lang/crates.io-index' index Adding futures-timer v3.0.2 to dependencies
#[async_trait] impl<R> SimpleRead for TracingReader<R> where R: AsyncRead + Send + Unpin, { async fn simple_read(&mut self, buf: &mut [u8]) -> io::Result<usize> { use futures_timer::Delay; use std::time::Duration; // artificial slowdown Delay::new(Duration::from_millis(50)).await; // reading self.inner.read(buf).await } }
Now, if we run the program it remains stuck forever.
It's time... to do some tracing.
$ cargo add tracing tracing-futures tracing-tree tracing-subscriber Updating 'https://github.com/rust-lang/crates.io-index' index Adding tracing v0.1.18 to dependencies Adding tracing-futures v0.2.4 to dependencies Adding tracing-tree v0.1.4 to dependencies Adding tracing-subscriber v0.2.10 to dependencies
use tracing_subscriber::{prelude::*, Registry}; #[async_std::main] #[tracing::instrument] async fn main() -> Result<(), eyre::Error> { let subscriber = Registry::default().with(HierarchicalLayer::new(2)); tracing::subscriber::set_global_default(subscriber).unwrap(); // etc. } #[async_trait] impl<R> SimpleRead for TracingReader<R> where R: AsyncRead + Send + Unpin, { #[tracing::instrument(skip(self, buf))] async fn simple_read(&mut self, buf: &mut [u8]) -> io::Result<usize> { use futures_timer::Delay; use std::time::Duration; // artificial slowdown tracing::debug!("doing delay..."); Delay::new(Duration::from_millis(50)).await; tracing::debug!("doing delay...done!"); // reading tracing::debug!("doing read..."); let res = self.inner.read(buf).await; tracing::debug!("doing read...done!"); res } } impl<R> AsyncRead for SimpleAsyncReader<R> where R: SimpleRead, { #[tracing::instrument(skip(self, buf))] fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<io::Result<usize>> { let inner = self.project().inner; let mut fut = inner.simple_read(buf); tracing::debug!("polling future..."); let res = Pin::new(&mut fut).poll(cx); match &res { Poll::Ready(_) => tracing::debug!("future was ready!"), Poll::Pending => tracing::debug!("future was pending!"), } res } }
Let's give it a shot:
$ cargo build -q && RUST_LOG=debug ./target/debug/surviving ./wine-4.0.4.tar.xz poll_read{cx=Context { waker: Waker { data: 0x55a63274c4d0, vtable: 0x55a63056d730 } }} 0ms DEBUG polling future... simple_read{} 0ms DEBUG doing delay... 0ms DEBUG future is pending!
Mhh. Well, we're probably doing something wrong, but I can
Hang on hang on I think I've got it.
UHh go ahead cool bear, I'm listening.
Well, when you call .read()
, you're creating a Future
, right?
That's right, or at least, read()
returns a Future
.
And then you poll it immediately, and then return from poll_read
with
whatever std::task::Poll
variant that future returned.
Yes, but I don't see where you're going wit..
What happens to the future after it's polled?
I'm not sure, what does happen to the future?
Well, you're not storing it anywhere, are you?
No, I'm not. I guess it's dropped at the end of poll_read
?
It is! So you're only polling it once at most.
And what happens you drop a future? It gets cancelled!
AhAH! I think I get it. But then... why did it work before we added the delay?
Before you added the delay. I'm reading up on sponge functions.
I guess it worked because... all futures that return Poll::Pending
need to
let the executor know (via the Context
) when to wake them up. And I guess
in the case of an async_std::fs::File
, it subscribed to "the next read is
ready", and even though the future was dropped, poll_read
still got called
again.
But when you - yes, you - added a delay, it probably subscribed to a timer event,
and when the future was dropped, it immediately unsubscribed, so poll_read
(called from the outer Future - the task itself) never got called again.
So, if I'm hearing you right... once we get a future, we need to store it, so that we can poll it until it's ready.
But Future
is a trait! So it probably needs to be boxed. And .poll
takes
a Pin<&mut self>
, so it probably needs to be pinned, too.
Okay, let's give it a shot:
#[pin_project] struct SimpleAsyncReader<R> where R: SimpleRead, { inner: R, state: State, } enum State { Idle, Pending(Pin<Box<dyn Future<Output = io::Result<usize>> + Send>>), } impl<R> AsyncRead for SimpleAsyncReader<R> where R: SimpleRead, { #[tracing::instrument(skip(self, buf))] fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<io::Result<usize>> { let res = match self.state { State::Idle => { tracing::debug!("getting new future..."); let inner = self.project().inner; let mut fut = Box::pin(inner.simple_read(buf)); let res = fut.as_mut().poll(cx); self.state = State::Pending(fut); res } State::Pending(ref mut fut) => { tracing::debug!("polling existing future..."); fut.as_mut().poll(cx) } }; match &res { Poll::Ready(_) => tracing::debug!("future was ready!"), Poll::Pending => tracing::debug!("future was pending!"), } res } }
Okay, this looks promising. Our SimpleAsyncReader
has a State
, which
can be either Idle
(no future yet), or Pending
(existing future to poll),
and depending on which state it is, it does the right thing.
It looks good, except for one thing:
$ cargo check Checking surviving v0.1.0 (/home/amos/ftl/surviving) error[E0495]: cannot infer an appropriate lifetime for lifetime parameter 'pin in function call due to conflicting requirements --> src/main.rs:152:34 | 152 | let inner = self.project().inner; | ^^^^^^^ | note: first, the lifetime cannot outlive the anonymous lifetime #1 defined on the method body at 143:5... --> src/main.rs:143:5 | 143 | #[tracing::instrument(skip(self, buf))] | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ note: ...so that the types are compatible --> src/main.rs:152:34 | 152 | let inner = self.project().inner; | ^^^^^^^ = note: expected `std::pin::Pin<&mut SimpleAsyncReader<R>>` found `std::pin::Pin<&mut SimpleAsyncReader<R>>` = note: but, the lifetime must be valid for the static lifetime... note: ...so that the expression is assignable --> src/main.rs:155:45 | 155 | self.state = State::Pending(fut); | ^^^ = note: expected `std::pin::Pin<std::boxed::Box<(dyn core::future::future::Future<Output = std::result::Result<usize, std::io::Error>> + std::marker::Send + 'static)>>` found `std::pin::Pin<std::boxed::Box<dyn core::future::future::Future<Output = std::result::Result<usize, std::io::Error>> + std::marker::Send>>` = note: this error originates in an attribute macro (in Nightly builds, run with -Z macro-backtrace for more info)
It doesn't compile.
Why doesn't it compile? Well, when use self.project()
, we get a &mut R
,
but it's only valid for as long as the Pin<&mut Self>
. But now we're storing
it in self.state
, which means it must outlive the current function!
How can we work around that?
I have an idea: let's try letting the Future take ownership of R
. That way,
R
will definitely live for as long as the Future
lives.
And since we'll need R
back for further poll_read
calls, we'll have the
future return (literally, give back) R
along with the io::Result<usize>
Let's even use a type alias so the code is more readable:
#[pin_project] struct SimpleAsyncReader<R> where R: SimpleRead, { state: State<R>, } type BoxFut<T> = Pin<Box<dyn Future<Output = T> + Send>>; enum State<R> { Idle(R), Pending(BoxFut<(R, io::Result<usize>)>), Transitional, } impl<R> AsyncRead for SimpleAsyncReader<R> where R: SimpleRead + Send, { #[tracing::instrument(skip(self, buf))] fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<io::Result<usize>> { let proj = self.project(); let mut state = State::Transitional; std::mem::swap(proj.state, &mut state); let mut fut = match state { State::Idle(mut inner) => { tracing::debug!("getting new future..."); Box::pin(async move { let res = inner.simple_read(buf).await; (inner, res) }) } State::Pending(fut) => { tracing::debug!("polling existing future..."); fut } State::Transitional => unreachable!(), }; match fut.as_mut().poll(cx) { Poll::Ready((inner, result)) => { tracing::debug!("future was ready!"); *proj.state = State::Idle(inner); Poll::Ready(result) } Poll::Pending => { tracing::debug!("future was pending!"); *proj.state = State::Pending(fut); Poll::Pending } } } }
There. I'm sure now we've added a sufficient amount of complications to resolve our original problem, and the compiler will definitely not get in the way of anyth...
$ cargo check Checking surviving v0.1.0 (/home/amos/ftl/surviving) error[E0495]: cannot infer an appropriate lifetime due to conflicting requirements --> src/main.rs:157:37 | 157 | Box::pin(async move { | _____________________________________^ 158 | | let res = inner.simple_read(buf).await; 159 | | (inner, res) 160 | | }) | |_________________^ | note: first, the lifetime cannot outlive the anonymous lifetime #4 defined on the method body at 144:5... --> src/main.rs:144:5 | 144 | #[tracing::instrument(skip(self, buf))] | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ note: ...so that the types are compatible --> src/main.rs:157:37 | 157 | Box::pin(async move { | _____________________________________^ 158 | | let res = inner.simple_read(buf).await; 159 | | (inner, res) 160 | | }) | |_________________^ = note: expected `&mut [u8]` found `&mut [u8]` = note: but, the lifetime must be valid for the static lifetime... note: ...so that the expression is assignable --> src/main.rs:177:46 | 177 | *proj.state = State::Pending(fut); | ^^^ = note: expected `std::pin::Pin<std::boxed::Box<(dyn core::future::future::Future<Output = (R, std::result::Result<usize, std::io::Error>)> + std::marker::Send + 'static)>>` found `std::pin::Pin<std::boxed::Box<dyn core::future::future::Future<Output = (R, std::result::Result<usize, std::io::Error>)> + std::marker::Send>>` = note: this error originates in an attribute macro (in Nightly builds, run with -Z macro-backtrace for more info)
Oh.
That's unfortunate.
But the compiler diagnostics are pretty good this time. The Box<dyn Future>
wants a Future that's 'static
. But we have a &mut [u8]
only for a much,
much shorter time than that. We don't even have it for the full lifetime of
SimpleAsyncReader
.
We have it for just the duration of the poll_read
call. And now that
we're storing futures across poll_read
calls, well... we can't hold on to
that &mut [u8]
at all.
So here's one idea: we could give our SimpleAsyncReader
its own buffer.
It would... reserve enough bytes for a read of length buf.len()
, transfer
ownership to the Future, and then get it back.
Let's see how this would look in practice:
First, we're going to need to keep track of our buffer in both states:
enum State<R> { Idle(R, Vec<u8>), Pending(BoxFut<(R, Vec<u8>, io::Result<usize>)>), Transitional, }
And next, well... we're going to need to capture our Vec<u8>
(named
internal_buf
in the code that follows) within the Future, and get it back.
And when we get it back and a read was successful, then we shall copy from
our internal buffer to the output buffer, buf
.
impl<R> AsyncRead for SimpleAsyncReader<R> where // new: R must now be `'static`, since it's captured // by the future which is, itself, `'static`. R: SimpleRead + Send + 'static, { #[tracing::instrument(skip(self, buf))] fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<io::Result<usize>> { let proj = self.project(); let mut state = State::Transitional; std::mem::swap(proj.state, &mut state); let mut fut = match state { State::Idle(mut inner, mut internal_buf) => { tracing::debug!("getting new future..."); internal_buf.clear(); internal_buf.reserve(buf.len()); unsafe { internal_buf.set_len(buf.len()) } Box::pin(async move { let res = inner.simple_read(&mut internal_buf[..]).await; (inner, internal_buf, res) }) } State::Pending(fut) => { tracing::debug!("polling existing future..."); fut } State::Transitional => unreachable!(), }; match fut.as_mut().poll(cx) { Poll::Ready((inner, mut internal_buf, result)) => { tracing::debug!("future was ready!"); if let Ok(n) = &result { let n = *n; unsafe { internal_buf.set_len(n) } let dst = &mut buf[..n]; let src = &internal_buf[..]; dst.copy_from_slice(src); } else { unsafe { internal_buf.set_len(0) } } *proj.state = State::Idle(inner, internal_buf); Poll::Ready(result) } Poll::Pending => { tracing::debug!("future was pending!"); *proj.state = State::Pending(fut); Poll::Pending } } } }
And finally, we need to adjust how we build our SimpleAsyncReader
:
async fn hash_file(path: &Path) -> Result<(), eyre::Error> { let file = File::open(path).await?; let file = TracingReader { inner: file }; let mut file = SimpleAsyncReader { state: State::Idle(file, Default::default()), }; // etc. }
And with that, our little state machine is complete.
Wind it up and watch it go:
$ cargo build -q && RUST_LOG=debug ./target/debug/surviving ./wine-4.0.4.tar.xz poll_read{cx=Context { waker: Waker { data: 0x55f4b52ca240, vtable: 0x55f4b3e926f0 } }} 0ms DEBUG getting new future... simple_read{} 0ms DEBUG doing delay... 0ms DEBUG future was pending! poll_read{cx=Context { waker: Waker { data: 0x55f4b52ca240, vtable: 0x55f4b3e926f0 } }} 0ms DEBUG polling existing future... simple_read{} 50ms DEBUG doing delay...done! 50ms DEBUG doing read... 0ms DEBUG future was pending! poll_read{cx=Context { waker: Waker { data: 0x55f4b52ca240, vtable: 0x55f4b3e926f0 } }} 0ms DEBUG polling existing future... simple_read{} 51ms DEBUG doing read...done! 0ms DEBUG future was ready! poll_read{cx=Context { waker: Waker { data: 0x55f4b52ca240, vtable: 0x55f4b3e926f0 } }} 0ms DEBUG getting new future... simple_read{} 0ms DEBUG doing delay... 0ms DEBUG future was pending! poll_read{cx=Context { waker: Waker { data: 0x55f4b52ca240, vtable: 0x55f4b3e926f0 } }} 0ms DEBUG polling existing future... simple_read{} 50ms DEBUG doing delay...done! 50ms DEBUG doing read... 0ms DEBUG future was pending!
Wonderful! Let's give it a shot in release mode and with less tracing:
$ cargo build -q --release && ./target/release/surviving ./wine-* ./wine-4.0.4.tar.xz d53a567e7a14a2b8dbd669afece1603daa769ada93522ec1f26fe69674d7c433 ./wine-5.0.1.tar.xz 4af0d295e56db9f723daa4822ee4b4416ac8840278ccd7df73ef357027e5b663 ./wine-5.0.2.tar.xz 2a48ac5363318b2b8dd222933002bac9fa0e1cc051605c17ebdae9f78ff03892 $ openssl dgst -sha3-256 wine-4.0.4.tar.xz SHA3-256(wine-4.0.4.tar.xz)= d53a567e7a14a2b8dbd669afece1603daa769ada93522ec1f26fe69674d7c433 $ openssl dgst -sha3-256 wine-5.0.1.tar.xz SHA3-256(wine-5.0.1.tar.xz)= 4af0d295e56db9f723daa4822ee4b4416ac8840278ccd7df73ef357027e5b663 $ openssl dgst -sha3-256 wine-5.0.2.tar.xz SHA3-256(wine-5.0.2.tar.xz)= 2a48ac5363318b2b8dd222933002bac9fa0e1cc051605c17ebdae9f78ff03892
And just like that, everything works.
If you liked what you saw, please support my work!