Thanks to my sponsors: Tyler Bloom, Kamran Khan, old.woman.josiah, Romet Tagobert, Makoto Nakashima, Ahmad Alhashemi, callym, James Rhodes, Zeeger Lubsen, notryanb, Richard Pringle, Christian Bourjau, Daniel Wagner-Hall, Chirag Jain, Ernest French, Colin VanDervoort, Nicolas Goy, Ula, Ivo Murrell, Brandon Piña and 235 more
Understanding Rust futures by going way too deep
👋 This page was last updated ~3 years ago. Just so you know.
So! Rust futures! Easy peasy lemon squeezy. Until it's not. So let's do the easy thing, and then instead of waiting for the hard thing to sneak up on us, we'll go for it intentionally.
Cool bear's hot tip
That's all-around solid life advice.
Choo choo here comes the easy part 🚂💨
We make a new project:
$ cargo new waytoodeep
Created binary (application) `waytoodeep` package
We install cargo-edit
in case we don't have it yet, so we can just cargo add
later:
$ cargo install cargo-edit
Updating crates.io index
Downloaded cargo-edit v0.7.0
Downloaded 1 crate (57.6 KB) in 0.47s
Ignored package `cargo-edit v0.7.0` is already installed, use --force to override
..but we already have it.
Yeah, because it's really convenient. Readers just get confused because
subcommands like cargo new
, cargo build
, cargo test
, cargo run
etc.
are built into cargo, but cargo add
isn't.
Ah right! In fact I see there's a bunch of these, like cargo-hack, cargo-udeps, cargo-expand... the list goes on.
Then we pick an async runtime, because those futures won't poll themselves... and we'll pick tokio for no reason other than: that's what I've been using a bunch these past few months.
$ cargo add tokio@1.9.0 --features full
Updating 'https://github.com/rust-lang/crates.io-index' index
Adding tokio v1.9.0 to dependencies with features: ["full"]
Then we change up our main so it uses a default tokio executor (cargo new
generated one for us, but it's not adequate here):
// in `src/main.rs`
#[tokio::main]
async fn main() {
println!("Hello from a (so far completely unnecessary) async runtime");
}
$ cargo run
Finished dev [unoptimized + debuginfo] target(s) in 0.01s
Running `target/debug/waytoodeep`
Hello from a (so far completely unnecessary) async runtime
Cool!
But let's add some other nice things I just like to have in my projects.
First, for error handling - we're writing an app, we're going to get a bunch of different types from different libraries, it'd be neat if we could have one type to unify them all.
eyre gives us that (just like anyhow
)!
And since I like pretty colors I'll use color-eyre
$ cargo add color-eyre@0.5.11
Updating 'https://github.com/rust-lang/crates.io-index' index
Adding color-eyre v0.5.11 to dependencies
Now we need to install color-eyre
as the default panic handler, and I snuck in
some environment variable modification so we get backtraces by default.
use color_eyre::Report;
#[tokio::main]
async fn main() -> Result<(), Report> {
setup()?;
println!("Hello from a (so far completely unnecessary) async runtime");
Ok(())
}
fn setup() -> Result<(), Report> {
if std::env::var("RUST_LIB_BACKTRACE").is_err() {
std::env::set_var("RUST_LIB_BACKTRACE", "1")
}
color_eyre::install()?;
Ok(())
}
$ cargo run
Finished dev [unoptimized + debuginfo] target(s) in 0.02s
Running `target/debug/waytoodeep`
Hello from a (so far completely unnecessary) async runtime
Okay good! Now if we have an error from somewhere, we'll see the full stack trace, like so:
And finally, because I like my logs to be structured, let's add tracing and to print them with nice colors in the terminal, let's add tracing-subscriber.
$ cargo add tracing@0.1.26 tracing-subscriber@0.2.19
Updating 'https://github.com/rust-lang/crates.io-index' index
Adding tracing v0.1.26 to dependencies
Adding tracing-subscriber v0.2.19 to dependencies
We already have a setup
function so we'll just install tracing-subscriber
in
there.. and we'll change that println!
to an info!
. Also, again, some
environment variable manipulation so that if nothing is set, we default to the
info
log level for all crates.
use color_eyre::Report;
use tracing::info;
use tracing_subscriber::EnvFilter;
#[tokio::main]
async fn main() -> Result<(), Report> {
setup()?;
info!("Hello from a comfy nest we've made for ourselves");
Ok(())
}
fn setup() -> Result<(), Report> {
if std::env::var("RUST_LIB_BACKTRACE").is_err() {
std::env::set_var("RUST_LIB_BACKTRACE", "1")
}
color_eyre::install()?;
if std::env::var("RUST_LOG").is_err() {
std::env::set_var("RUST_LOG", "info")
}
tracing_subscriber::fmt::fmt()
.with_env_filter(EnvFilter::from_default_env())
.init();
Ok(())
}
$ cargo run
Finished dev [unoptimized + debuginfo] target(s) in 0.02s
Running `target/debug/waytoodeep`
Jul 25 17:03:46.993 INFO waytoodeep: Hello from a comfy nest we've made for ourselves
Alright, we're ready to do something useful!
Doing something useful
When deciding which article to read during their coffee break, people usually open several websites at the exact same moment, and read whichever article loads first.
And that's a fact. You can quote me on that because, well, who's going to go and verify that? That sounds like a lot of work. Just trust me on this.
So let's write a program that does exactly that.
Oh boy, we're gonna need more crates aren't we.
You guessed it! Let's bring in reqwest - although I don't love its API, it'll work nicely with the rest of our stack here.
Also we'll tell reqwest to use rustls because screw OpenSSL, that's why.
$ cargo add reqwest@0.11.4 --no-default-features --features rustls-tls
Updating 'https://github.com/rust-lang/crates.io-index' index
Adding reqwest v0.11.4 to dependencies with features: ["rustls-tls"]
We're ready to make a request!
use reqwest::Client;
#[tokio::main]
async fn main() -> Result<(), Report> {
setup()?;
info!("Hello from a comfy nest we've made for ourselves");
let client = Client::new();
let url = "https://fasterthanli.me";
// this will turn non-200 HTTP status codes into rust errors,
// so the first `?` propagates "we had a connection problem" and
// the second `?` propagates "we had a chat with the server and they
// were not pleased"
let res = client.get(url).send().await?.error_for_status()?;
info!(%url, content_type = ?res.headers().get("content-type"), "Got a response!");
Ok(())
}
And off we go!
$ cargo run
Compiling waytoodeep v0.1.0 (/home/amos/ftl/waytoodeep)
Finished dev [unoptimized + debuginfo] target(s) in 3.05s
Running `target/debug/waytoodeep`
Jul 25 17:12:32.276 INFO waytoodeep: Hello from a comfy nest we've made for ourselves
Jul 25 17:12:32.409 INFO waytoodeep: Got a response! url=https://fasterthanli.me content_type=Some("text/html; charset=utf-8")
And this is what I mean by structured logging. Well, part of it anyway. In that line here:
info!(%url, content_type = ?res.headers().get("content-type"), "Got a response!");
We have a message, Got a response!
, then a tag named url
whose value is the
Display-formatting
of the binding named url
, and a tag named content_type
, whose value is the
Debug-formatting of
the expression res.headers().get("content-type")
.
Easy peasy! name = %value
for Display
, name = ?value
, for Debug
, and if
both name
and value
have the same... name, we can use the short forms
%value
and ?value
.
Of course there's also spans, which are great, and to me the whole point of this is you can then send them to APM platforms like Datadog or Honeycomb or whoever, but this isn't an article about tracing.
Just to illustrate though, if we install a JSON tracing subscriber instead, this is what we get:
$ cargo run
Compiling waytoodeep v0.1.0 (/home/amos/ftl/waytoodeep)
Finished dev [unoptimized + debuginfo] target(s) in 3.09s
Running `target/debug/waytoodeep`
{"timestamp":"Jul 25 17:17:21.531","level":"INFO","fields":{"message":"Hello from a comfy nest we've made for ourselves"},"target":"waytoodeep"}
{"timestamp":"Jul 25 17:17:21.709","level":"INFO","fields":{"message":"Got a response!","url":"https://fasterthanli.me","content_type":"Some(\"text/html; charset=utf-8\")"},"target":"waytoodeep"}
Which should be enough to pique your interest.
To peak your interest?
Either. Both. Both is good.
Fetching two things
Okay, now let's fetch two things!
These two things:
const URL_1: &str = "https://fasterthanli.me/articles/whats-in-the-box";
const URL_2: &str = "https://fasterthanli.me/series/advent-of-code-2020/part-13";
...so that it's a fair comparison. Both these articles are hosted on my own website, and it's definitely not a marketing scheme, instead it's so that the fetch time is comparable and there's a chance one will finish fetching before the other (and that will change randomly over time).
Uh-huh, sure. If that's what you need to tell yourself.
We'll make a quick function to fetch a thing:
async fn fetch_thing(client: &Client, url: &str) -> Result<(), Report> {
let res = client.get(url).send().await?.error_for_status()?;
info!(%url, content_type = ?res.headers().get("content-type"), "Got a response!");
Ok(())
}
And use it:
#[tokio::main]
async fn main() -> Result<(), Report> {
setup()?;
info!("Hello from a comfy nest we've made for ourselves");
let client = Client::new();
fetch_thing(&client, URL_1);
fetch_thing(&client, URL_2);
Ok(())
}
And then run it:
$ cargo run
Compiling waytoodeep v0.1.0 (/home/amos/ftl/waytoodeep)
warning: unused implementer of `Future` that must be used
--> src/main.rs:15:5
|
15 | fetch_thing(&client, URL_1);
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
= note: `#[warn(unused_must_use)]` on by default
= note: futures do nothing unless you `.await` or poll them
warning: unused implementer of `Future` that must be used
--> src/main.rs:16:5
|
16 | fetch_thing(&client, URL_2);
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
= note: futures do nothing unless you `.await` or poll them
warning: 2 warnings emitted
Finished dev [unoptimized + debuginfo] target(s) in 3.01s
Running `target/debug/waytoodeep`
Jul 25 17:26:31.571 INFO waytoodeep: Hello from a comfy nest we've made for ourselves
Huh, weird, nothing happened.
long sigh amos ffs you ignored the yellow squigglies and the very noisy Rust warnings about those futures not being polled just to prove a point, I get it I get it now go fix it.
Okay yeesh sure I'll fix it.
fetch_thing(&client, URL_1).await?;
fetch_thing(&client, URL_2).await?;
$ cargo run
Compiling waytoodeep v0.1.0 (/home/amos/ftl/waytoodeep)
Finished dev [unoptimized + debuginfo] target(s) in 3.17s
Running `target/debug/waytoodeep`
Jul 25 17:27:29.768 INFO waytoodeep: Hello from a comfy nest we've made for ourselves
Jul 25 17:27:29.891 INFO waytoodeep: Got a response! url=https://fasterthanli.me/articles/whats-in-the-box content_type=Some("text/html; charset=utf-8")
Jul 25 17:27:29.974 INFO waytoodeep: Got a response! url=https://fasterthanli.me/series/advent-of-code-2020/part-13 content_type=Some("text/html; charset=utf-8")
So, uh yeah, lesson zero: futures don't do anything unless they're polled.
And that's because futures are just state, pretty much! In fact, let's make one:
// in `src/main.rs`
mod dumb;
// in `src/dumb.rs`
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use tracing::info;
pub struct DumbFuture {}
impl Future for DumbFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
info!("Hello from a dumb future!");
Poll::Ready(())
}
}
// back in `src/main.rs`
#[tokio::main]
async fn main() -> Result<(), Report> {
setup()?;
let fut = dumb::DumbFuture {};
Ok(())
}
There. That's pretty much what we did when we didn't .await
.
Running it does nothing other than print warnings:
$ cargo run
Compiling waytoodeep v0.1.0 (/home/amos/ftl/waytoodeep)
warning: unused variable: `fut`
--> src/main.rs:14:9
|
14 | let fut = dumb::DumbFuture {};
| ^^^ help: if this is intentional, prefix it with an underscore: `_fut`
|
= note: `#[warn(unused_variables)]` on by default
warning: 1 warning emitted
Finished dev [unoptimized + debuginfo] target(s) in 2.11s
Running `target/debug/waytoodeep`
Because how could it? We're literally just building a struct. A zero-sized struct at that!
If we .await
it though... then we're asking the runtime to run its event loop
until such time as the future is polled and it finally returns Poll::Ready
,
which ours does immediately:
#[tokio::main]
async fn main() -> Result<(), Report> {
setup()?;
info!("Building that dumb future...");
let fut = dumb::DumbFuture {};
info!("Awaiting that dumb future...");
fut.await;
info!("Done awaiting that dumb future");
Ok(())
}
$ cargo run
Compiling waytoodeep v0.1.0 (/home/amos/ftl/waytoodeep)
Finished dev [unoptimized + debuginfo] target(s) in 2.34s
Running `target/debug/waytoodeep`
Jul 25 17:37:09.261 INFO waytoodeep: Building that dumb future...
Jul 25 17:37:09.261 INFO waytoodeep: Awaiting that dumb future...
Jul 25 17:37:09.261 INFO waytoodeep::dumb: Hello from a dumb future!
Jul 25 17:37:09.262 INFO waytoodeep: Done awaiting that dumb future
And that's a bit different from, say, ECMAScript promises, which can do some amount of work even if they're not awaited at all.
But nope, Rust futures are just dumb boring state machines, and you can see the machinery if you cause trouble on purpose:
// in `src/dumb.rs`
impl Future for DumbFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
panic!("Oh heck no");
}
}
$ RUST_BACKTRACE=1 cargo run
Compiling waytoodeep v0.1.0 (/home/amos/ftl/waytoodeep)
Finished dev [unoptimized + debuginfo] target(s) in 2.28s
Running `target/debug/waytoodeep`
Jul 25 17:41:18.956 INFO waytoodeep: Building that dumb future...
Jul 25 17:41:18.956 INFO waytoodeep: Awaiting that dumb future...
The application panicked (crashed).
Message: Oh heck no
Location: src/dumb.rs:14
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ BACKTRACE ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
⋮ 6 frames hidden ⋮
7: <waytoodeep::dumb::DumbFuture as core::future::future::Future>::poll::h4a44780628f4c5f0
at /home/amos/ftl/waytoodeep/src/dumb.rs:14
8: waytoodeep::main::{{closure}}::h36de5a1f1f2a5c5b
at /home/amos/ftl/waytoodeep/src/main.rs:17
9: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll::h20a96e082c7a581e
at /home/amos/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/future/mod.rs:80
10: tokio::park::thread::CachedParkThread::block_on::{{closure}}::hdf98cb3c7fdf3de4
at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.9.0/src/park/thread.rs:263
11: tokio::coop::with_budget::{{closure}}::h6a86a24a246e220f
at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.9.0/src/coop.rs:106
12: std::thread::local::LocalKey<T>::try_with::h2ce0ac27c85965b6
at /home/amos/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/std/src/thread/local.rs:376
13: std::thread::local::LocalKey<T>::with::hc449f38c9f65fb53
at /home/amos/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/std/src/thread/local.rs:352
14: tokio::coop::with_budget::h5db157bd1e95e0e8
at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.9.0/src/coop.rs:99
15: tokio::coop::budget::h7b57383f1255ac24
at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.9.0/src/coop.rs:76
16: tokio::park::thread::CachedParkThread::block_on::hece399485213b91c
at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.9.0/src/park/thread.rs:263
17: tokio::runtime::enter::Enter::block_on::h89e9882e539e82d3
at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.9.0/src/runtime/enter.rs:151
18: tokio::runtime::thread_pool::ThreadPool::block_on::h1a0186470c00ba70
at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.9.0/src/runtime/thread_pool/mod.rs:71
19: tokio::runtime::Runtime::block_on::h7c21d6989b86d606
at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.9.0/src/runtime/mod.rs:452
20: waytoodeep::main::hb4dd5ffd46a5c032
at /home/amos/ftl/waytoodeep/src/main.rs:20
21: core::ops::function::FnOnce::call_once::hc1fcc87431f77d25
at /home/amos/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/ops/function.rs:227
⋮ 11 frames hidden ⋮
Run with COLORBT_SHOW_HIDDEN=1 environment variable to disable frame filtering.
Run with RUST_BACKTRACE=full to include source snippets.
This is much nicer with colors so I hope you're following along at home, but we
can see our actual main function at frame 20, then going up, we can see
Runtime::block_on
, a thread pool thingy, some parked threads, thread-local
stuff (the other TLS), a generated future (frame 9 and 8, which is basically
what our async fn main
ended up being), and finally our DumbFuture
's poll
method (frame 7).
Frames 6 through 1 are just panic machinery, again wholly out of scope for this article.
But please step up, dear spectator, and move your arms around the contraption to make sure that there's not trickery going on, no hidden wires, no..
What in the world are you going on about
...there's no "special handling" for async stacktraces is what I'm saying. Here, sure, we're panicking, that's a Rust-only thing, the OS never even knows we nearly avoided a catastrophe.
But we can make a much bigger mess, if we're willing to use unsafe
:
impl Future for DumbFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
unsafe {
*(0xF00D as *mut u64) = 0x0;
}
unreachable!(); // pinky promise
}
}
And then no amount of panic handling will save us:
$ RUST_BACKTRACE=1 cargo run
Compiling waytoodeep v0.1.0 (/home/amos/ftl/waytoodeep)
Finished dev [unoptimized + debuginfo] target(s) in 2.18s
Running `target/debug/waytoodeep`
Jul 25 17:46:53.926 INFO waytoodeep: Building that dumb future...
Jul 25 17:46:53.926 INFO waytoodeep: Awaiting that dumb future...
zsh: segmentation fault (core dumped) RUST_BACKTRACE=1 cargo run
However, GDB can!
$ cargo build && gdb --quiet --args ./target/debug/waytoodeep
Finished dev [unoptimized + debuginfo] target(s) in 0.04s
Reading symbols from ./target/debug/waytoodeep...
warning: Missing auto-load script at offset 0 in section .debug_gdb_scripts
of file /home/amos/ftl/waytoodeep/target/debug/waytoodeep.
Use `info auto-load python-scripts [REGEXP]' to list them.
(gdb) r
Starting program: /home/amos/ftl/waytoodeep/target/debug/waytoodeep
[Thread debugging using libthread_db enabled]
Using host libthread_db library "/lib/x86_64-linux-gnu/libthread_db.so.1".
[New Thread 0x7ffff7c28700 (LWP 129418)]
[New Thread 0x7ffff7a27700 (LWP 129419)]
[New Thread 0x7ffff7826700 (LWP 129420)]
[New Thread 0x7ffff7625700 (LWP 129421)]
[New Thread 0x7ffff7424700 (LWP 129422)]
[New Thread 0x7ffff7223700 (LWP 129423)]
[New Thread 0x7ffff7022700 (LWP 129424)]
[New Thread 0x7ffff6e1e700 (LWP 129425)]
[New Thread 0x7ffff6c1a700 (LWP 129426)]
[New Thread 0x7ffff6a16700 (LWP 129427)]
[New Thread 0x7ffff6812700 (LWP 129428)]
[New Thread 0x7ffff660e700 (LWP 129429)]
[New Thread 0x7ffff640a700 (LWP 129430)]
[New Thread 0x7ffff6206700 (LWP 129431)]
[New Thread 0x7ffff6002700 (LWP 129432)]
Jul 25 17:47:13.278 INFO waytoodeep: Building that dumb future...
Jul 25 17:47:13.279 INFO waytoodeep: Awaiting that dumb future...
Thread 1 "waytoodeep" received signal SIGSEGV, Segmentation fault.
<waytoodeep::dumb::DumbFuture as core::future::future::Future>::poll (self=..., _cx=0x7fffffffd690) at src/dumb.rs:15
15 *(0xF00D as *mut u64) = 0x0;
(gdb) bt
#0 <waytoodeep::dumb::DumbFuture as core::future::future::Future>::poll (self=..., _cx=0x7fffffffd690) at src/dumb.rs:15
#1 0x00005555555ab3a3 in waytoodeep::main::{{closure}} () at src/main.rs:17
#2 0x00005555555adb29 in <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll (self=..., cx=0x7fffffffd690)
at /home/amos/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/future/mod.rs:80
#3 0x00005555555adaa0 in tokio::park::thread::CachedParkThread::block_on::{{closure}} ()
at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.9.0/src/park/thread.rs:263
#4 0x00005555555b1742 in tokio::coop::with_budget::{{closure}} (cell=0x7ffff7c2c412)
at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.9.0/src/coop.rs:106
#5 0x00005555555a9f58 in std::thread::local::LocalKey<T>::try_with (self=0x555555925fc0, f=...)
at /home/amos/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/std/src/thread/local.rs:376
#6 0x00005555555a9e3d in std::thread::local::LocalKey<T>::with (self=0x555555925fc0, f=...)
at /home/amos/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/std/src/thread/local.rs:352
#7 0x00005555555ad7c8 in tokio::coop::with_budget (budget=..., f=...)
at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.9.0/src/coop.rs:99
#8 tokio::coop::budget (f=...) at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.9.0/src/coop.rs:76
#9 tokio::park::thread::CachedParkThread::block_on (self=0x7fffffffd7a0, f=...)
at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.9.0/src/park/thread.rs:263
#10 0x00005555555abcc9 in tokio::runtime::enter::Enter::block_on (self=0x7fffffffd7f0, f=...)
at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.9.0/src/runtime/enter.rs:151
#11 0x00005555555acf2e in tokio::runtime::thread_pool::ThreadPool::block_on (self=0x7fffffffd908, future=...)
at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.9.0/src/runtime/thread_pool/mod.rs:71
#12 0x00005555555b0dfd in tokio::runtime::Runtime::block_on (self=0x7fffffffd900, future=...)
at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.9.0/src/runtime/mod.rs:452
#13 0x00005555555aa807 in waytoodeep::main () at src/main.rs:20
(gdb)
Again, you're missing out on the pretty colors, here's a peek:
Which is just wonderful.
But back to actually useful code, let's remove all traces of our dumb future for
now (ie. the mod dumb
and the src/dumb.rs
file), and do it with a fetch future
instead:
#[tokio::main]
async fn main() -> Result<(), Report> {
setup()?;
info!("Building that fetch future...");
let client = Client::new();
let fut = fetch_thing(&client, URL_1);
info!("Awaiting that fetch future...");
fut.await?;
info!("Done awaiting that fetch future");
Ok(())
}
$ RUST_BACKTRACE=1 cargo run
Compiling waytoodeep v0.1.0 (/home/amos/ftl/waytoodeep)
Finished dev [unoptimized + debuginfo] target(s) in 2.99s
Running `target/debug/waytoodeep`
Jul 25 17:51:49.281 INFO waytoodeep: Building that fetch future...
Jul 25 17:51:49.282 INFO waytoodeep: Awaiting that fetch future...
Jul 25 17:51:49.437 INFO waytoodeep: Got a response! url=https://fasterthanli.me/articles/whats-in-the-box content_type=Some("text/html; charset=utf-8")
Jul 25 17:51:49.438 INFO waytoodeep: Done awaiting that fetch future
Okay! Same idea.
There's two ways to think about our function here. There's the syntactic-sugar-coated way: that it's an "async fn":
async fn fetch_thing(client: &Client, url: &str) -> Result<(), Report> {
let res = client.get(url).send().await?.error_for_status()?;
info!(%url, content_type = ?res.headers().get("content-type"), "Got a response!");
Ok(())
}
And then there's the crunchy-core way: that it's a regular "fn" that just happens to return a future:
use std::future::Future;
fn fetch_thing<'a>(
client: &'a Client,
url: &'a str,
) -> impl Future<Output = Result<(), Report>> + 'a {
async move {
let res = client.get(url).send().await?.error_for_status()?;
info!(%url, content_type = ?res.headers().get("content-type"), "Got a response!");
Ok(())
}
}
And since it borrows from both client
and url
, the Future
can only live as
long as they both do, which is why I've named a lifetime 'a
above, and the
return type is "something that implements Future (with that Output) and also
lives for 'a".
And the whole async move {}
block is also just "building state" - it evaluates
to a type that implements Future.
We just can't name it.
We can get a description of it, at best:
fn type_name_of<T>(_: &T) -> &'static str {
std::any::type_name::<T>()
}
// in main
#[tokio::main]
async fn main() -> Result<(), Report> {
setup()?;
info!("Building that fetch future...");
let client = Client::new();
let fut = fetch_thing(&client, URL_1);
info!(
type_name = type_name_of(&fut),
"That fetch future has a type.."
);
info!("Awaiting that fetch future...");
fut.await?;
info!("Done awaiting that fetch future");
Ok(())
}
$ cargo run
Finished dev [unoptimized + debuginfo] target(s) in 0.05s
Running `target/debug/waytoodeep`
Jul 25 18:00:39.774 INFO waytoodeep: Building that fetch future...
Jul 25 18:00:39.775 INFO waytoodeep: That fetch future has a type.. type_name="core::future::from_generator::GenFuture<waytoodeep::fetch_thing::{{closure}}>"
Jul 25 18:00:39.775 INFO waytoodeep: Awaiting that fetch future...
Jul 25 18:00:39.882 INFO waytoodeep: Got a response! url=https://fasterthanli.me/articles/whats-in-the-box content_type=Some("text/html; charset=utf-8")
Jul 25 18:00:39.882 INFO waytoodeep: Done awaiting that fetch future
...but yeah, it's a type generated by the compiler because we're using the
async
syntax. We can't name it in the sense that we can't declare a binding
of that type, or write a function that accepts only that type.
And to really convince ourselves that our future is not doing any work until we
actually poll it, well, we can turn on debug logging for reqwest
:
$ RUST_LOG=info,reqwest=debug cargo run
Compiling waytoodeep v0.1.0 (/home/amos/ftl/waytoodeep)
Finished dev [unoptimized + debuginfo] target(s) in 3.07s
Running `target/debug/waytoodeep`
Jul 25 18:05:07.384 INFO waytoodeep: Building that fetch future...
Jul 25 18:05:07.385 INFO waytoodeep: That fetch future has a type.. type_name="core::future::from_generator::GenFuture<waytoodeep::fetch_thing::{{closure}}>"
Jul 25 18:05:07.385 INFO waytoodeep: Awaiting that fetch future...
Jul 25 18:05:07.385 DEBUG reqwest::connect: starting new connection: https://fasterthanli.me/
Jul 25 18:05:07.503 DEBUG reqwest::async_impl::client: response '200 OK' for https://fasterthanli.me/articles/whats-in-the-box
Jul 25 18:05:07.503 INFO waytoodeep: Got a response! url=https://fasterthanli.me/articles/whats-in-the-box content_type=Some("text/html; charset=utf-8")
Jul 25 18:05:07.503 INFO waytoodeep: Done awaiting that fetch future
Or even for every crate, so we can hear from hyper and h2:
$ RUST_LOG=debug cargo run
Finished dev [unoptimized + debuginfo] target(s) in 0.04s
Running `target/debug/waytoodeep`
Jul 25 18:05:59.973 INFO waytoodeep: Building that fetch future...
Jul 25 18:05:59.973 INFO waytoodeep: That fetch future has a type.. type_name="core::future::from_generator::GenFuture<waytoodeep::fetch_thing::{{closure}}>"
Jul 25 18:05:59.973 INFO waytoodeep: Awaiting that fetch future...
Jul 25 18:05:59.974 DEBUG reqwest::connect: starting new connection: https://fasterthanli.me/
Jul 25 18:05:59.974 DEBUG hyper::client::connect::dns: resolving host="fasterthanli.me"
Jul 25 18:05:59.989 DEBUG hyper::client::connect::http: connecting to 172.67.196.144:443
Jul 25 18:06:00.000 DEBUG hyper::client::connect::http: connected to 172.67.196.144:443
Jul 25 18:06:00.000 DEBUG rustls::client::hs: No cached session for DNSNameRef("fasterthanli.me")
Jul 25 18:06:00.000 DEBUG rustls::client::hs: Not resuming any session
Jul 25 18:06:00.016 DEBUG rustls::client::hs: Using ciphersuite TLS13_CHACHA20_POLY1305_SHA256
Jul 25 18:06:00.016 DEBUG rustls::client::tls13: Not resuming
Jul 25 18:06:00.017 DEBUG rustls::client::tls13: TLS1.3 encrypted extensions: [ServerNameAck, Protocols([PayloadU8([104, 50])])]
Jul 25 18:06:00.017 DEBUG rustls::client::hs: ALPN protocol is Some(b"h2")
Jul 25 18:06:00.018 DEBUG h2::client: binding client connection
Jul 25 18:06:00.018 DEBUG h2::client: client connection bound
Jul 25 18:06:00.018 DEBUG h2::codec::framed_write: send frame=Settings { flags: (0x0), enable_push: 0, initial_window_size: 2097152, max_frame_size: 16384 }
Jul 25 18:06:00.019 DEBUG Connection{peer=Client}: h2::codec::framed_write: send frame=WindowUpdate { stream_id: StreamId(0), size_increment: 5177345 }
Jul 25 18:06:00.019 DEBUG hyper::client::pool: pooling idle connection for ("https", fasterthanli.me)
Jul 25 18:06:00.020 DEBUG Connection{peer=Client}: h2::codec::framed_write: send frame=Headers { stream_id: StreamId(1), flags: (0x5: END_HEADERS | END_STREAM) }
Jul 25 18:06:00.029 DEBUG Connection{peer=Client}: rustls::client::tls13: Ticket saved
Jul 25 18:06:00.029 DEBUG Connection{peer=Client}: rustls::client::tls13: Ticket saved
Jul 25 18:06:00.029 DEBUG Connection{peer=Client}: h2::codec::framed_read: received frame=Settings { flags: (0x0), max_concurrent_streams: 256, initial_window_size: 65536, max_frame_size: 16777215 }
Jul 25 18:06:00.030 DEBUG Connection{peer=Client}: h2::codec::framed_write: send frame=Settings { flags: (0x1: ACK) }
Jul 25 18:06:00.030 DEBUG Connection{peer=Client}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(0), size_increment: 2147418112 }
Jul 25 18:06:00.041 DEBUG Connection{peer=Client}: h2::codec::framed_read: received frame=Settings { flags: (0x1: ACK) }
Jul 25 18:06:00.041 DEBUG Connection{peer=Client}: h2::proto::settings: received settings ACK; applying Settings { flags: (0x0), enable_push: 0, initial_window_size: 2097152, max_frame_size: 16384 }
Jul 25 18:06:00.120 DEBUG Connection{peer=Client}: h2::codec::framed_read: received frame=Headers { stream_id: StreamId(1), flags: (0x4: END_HEADERS) }
Jul 25 18:06:00.120 DEBUG Connection{peer=Client}: h2::codec::framed_read: received frame=Data { stream_id: StreamId(1) }
Jul 25 18:06:00.121 DEBUG reqwest::async_impl::client: response '200 OK' for https://fasterthanli.me/articles/whats-in-the-box
Jul 25 18:06:00.121 INFO waytoodeep: Got a response! url=https://fasterthanli.me/articles/whats-in-the-box content_type=Some("text/html; charset=utf-8")
Jul 25 18:06:00.121 INFO waytoodeep: Done awaiting that fetch future
Jul 25 18:06:00.121 DEBUG Connection{peer=Client}: h2::codec::framed_read: received frame=Data { stream_id: StreamId(1) }
Jul 25 18:06:00.122 DEBUG Connection{peer=Client}: h2::codec::framed_write: send frame=Reset { stream_id: StreamId(1), error_code: CANCEL }
Jul 25 18:06:00.122 DEBUG Connection{peer=Client}: h2::codec::framed_write: send frame=GoAway { error_code: NO_ERROR, last_stream_id: StreamId(0) }
Jul 25 18:06:00.122 DEBUG Connection{peer=Client}: h2::proto::connection: Connection::poll; connection error error=NO_ERROR
Jul 25 18:06:00.122 DEBUG Connection{peer=Client}: rustls::session: Sending warning alert CloseNotify
Oh look, we are using rustls! And TLS 1.3!
TLS 1.3? You mean the thing I made a video about?
Oh that's right, I forgot you make videos now.
...let's say "now and then".
So that should be enough to convince you, unless you really only trust what the kernel says (and even then...), so let's ask what strace thinks just to be extra sure.
And also add a one-second sleep before we await the future, just to be extra sure:
use tokio::time::sleep;
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<(), Report> {
setup()?;
info!("Building that fetch future...");
let client = Client::new();
let fut = fetch_thing(&client, URL_1);
info!("Sleeping for a bit...");
sleep(Duration::from_secs(1)).await;
info!("Awaiting that fetch future...");
fut.await?;
info!("Done awaiting that fetch future");
Ok(())
}
$ cargo build && strace -e 'connect' ./target/debug/waytoodeep
Compiling waytoodeep v0.1.0 (/home/amos/ftl/waytoodeep)
Finished dev [unoptimized + debuginfo] target(s) in 3.13s
Jul 25 18:09:36.595 INFO waytoodeep: Building that fetch future...
Jul 25 18:09:36.596 INFO waytoodeep: Sleeping for a bit...
Jul 25 18:09:37.599 INFO waytoodeep: Awaiting that fetch future...
connect(9, {sa_family=AF_INET, sin_port=htons(443), sin_addr=inet_addr("104.21.92.169")}, 16) = -1 EINPROGRESS (Operation now in progress)
Jul 25 18:09:37.720 INFO waytoodeep: Got a response! url=https://fasterthanli.me/articles/whats-in-the-box content_type=Some("text/html; charset=utf-8")
Jul 25 18:09:37.721 INFO waytoodeep: Done awaiting that fetch future
+++ exited with 0 +++
Mh again colors make that a lot more readable, I just love colors as long as I don't have to pick them. Here's how it looks for me:
And because tracing-subscriber
's default formatter shows timestamps, you can
see it is sleeping for one second (and 3 milliseconds), and only when we await
it does it connect to whichever CDN node is serving that article today.
Okay, so! Let's try fetching two things again:
#[tokio::main]
async fn main() -> Result<(), Report> {
setup()?;
let client = Client::new();
let fut1 = fetch_thing(&client, URL_1);
let fut2 = fetch_thing(&client, URL_2);
fut1.await?;
fut2.await?;
Ok(())
}
And still look at some debug logs, but less of them:
$ RUST_LOG=info,reqwest=debug cargo run --quiet
Jul 25 18:31:47.396 DEBUG reqwest::connect: starting new connection: https://fasterthanli.me/
Jul 25 18:31:47.536 DEBUG reqwest::async_impl::client: response '200 OK' for https://fasterthanli.me/articles/whats-in-the-box
Jul 25 18:31:47.537 INFO waytoodeep: Got a response! url=https://fasterthanli.me/articles/whats-in-the-box content_type=Some("text/html; charset=utf-8")
Jul 25 18:31:47.627 DEBUG reqwest::async_impl::client: response '200 OK' for https://fasterthanli.me/series/advent-of-code-2020/part-13
Jul 25 18:31:47.627 INFO waytoodeep: Got a response! url=https://fasterthanli.me/series/advent-of-code-2020/part-13 content_type=Some("text/html; charset=utf-8")
Okay! Interesting. So from what I'm seeing here, reqwest
re-uses the same
connection for both requests. I say that because I only see one
reqwest::connect
log line.
Let's do a quick strace
check:
$ cargo build --quiet && strace -e 'connect' ./target/debug/waytoodeep > /dev/null
connect(9, {sa_family=AF_INET, sin_port=htons(443), sin_addr=inet_addr("172.67.196.144")}, 16) = -1 EINPROGRESS (Operation now in progress)
+++ exited with 0 +++
Yup sure enough, only one connection.
But still, we're waiting for the first request to be done before making the second request. The first one took... 536-396 = 140 milliseconds, and the second one took 627-537 = 90 milliseconds!
Roughly.
Ah, but aren't we running a debug build of our application right now?
That is true. I'm fairly sure our problem is IO-bound though, not CPU-bound.
There's definitely some overhead associated with a debug build of that app, but I doubt it has a significant impact on the latency here. Let's check anyway:
(mind the --release
)
$ RUST_LOG=info,reqwest=debug cargo run --quiet --release
Jul 25 18:34:59.211 DEBUG reqwest::connect: starting new connection: https://fasterthanli.me/
Jul 25 18:34:59.343 DEBUG reqwest::async_impl::client: response '200 OK' for https://fasterthanli.me/articles/whats-in-the-box
Jul 25 18:34:59.343 INFO waytoodeep: Got a response! url=https://fasterthanli.me/articles/whats-in-the-box content_type=Some("text/html; charset=utf-8")
Jul 25 18:34:59.427 DEBUG reqwest::async_impl::client: response '200 OK' for https://fasterthanli.me/series/advent-of-code-2020/part-13
Jul 25 18:34:59.427 INFO waytoodeep: Got a response! url=https://fasterthanli.me/series/advent-of-code-2020/part-13 content_type=Some("text/html; charset=utf-8")
Okay here we got 343-211 = 132ms, then 427-343 = 84ms.
A difference of a few milliseconds like that, could just as well be caused by the neighbor clicking on another YouTube video, resulting in a BURST of radio waves, which resulted in collisions (there's no air traffic controller for 802.11, it's a free-for-all!) and retransmissions and would explain slight latency changes like that.
Or another million reasons. That's why we don't do those measurements like that.
But back to the important thing.
It's waiting for the first one to finish
It is! It is waiting for the first one. So how do we make our program do both requests at the same time?
Well, there's a bunch of ways!
For example, we could spawn the futures on an executor, and then sleep for a second. That would be enough, right? One second?
#[tokio::main]
async fn main() -> Result<(), Report> {
setup()?;
let client = Client::new();
let fut1 = fetch_thing(&client, URL_1);
tokio::spawn(fut1);
let fut2 = fetch_thing(&client, URL_2);
tokio::spawn(fut2);
tokio::time::sleep(Duration::from_secs(1)).await;
Ok(())
}
$ RUST_LOG=info,reqwest=debug cargo run --quiet --release
error[E0597]: `client` does not live long enough
--> src/main.rs:17:28
|
17 | let fut1 = fetch_thing(&client, URL_1);
| ------------^^^^^^^--------
| | |
| | borrowed value does not live long enough
| argument requires that `client` is borrowed for `'static`
...
25 | }
| - `client` dropped here while still borrowed
error: aborting due to previous error
For more information about this error, try `rustc --explain E0597`.
error: could not compile `waytoodeep`
To learn more, run the command again with --verbose.
Ah, unless we can't.
And we can't because...
Oooh ooh I got this one! So by "spawning the future on the executor", we hand our future off to the executor, right? We transfer ownership of it and everything?
Correct.
And then even if we don't await it, the future we just spawned is just part of "what the executor needs to do", so it's being polled even if we return from main.
But if we return from main the whole program ex-
Yeah okay sure, here it's main but it could be any function. The point is we could return from our function, from which the future is borrowing some data, and that makes the borrow checker very sad.
Right!
And it makes me very, very happy, because it means we cannot accidentally access some data after it's been deallocated. The old UAF.
But it is getting in the way of this example.
So... we have to find another way. What if the future returned by fetch_thing
was 'static? What if it didn't borrow anything?
Currently it looks like this:
use std::future::Future;
fn fetch_thing<'a>(
client: &'a Client,
url: &'a str,
) -> impl Future<Output = Result<(), Report>> + 'a {
async move {
let res = client.get(url).send().await?.error_for_status()?;
info!(%url, content_type = ?res.headers().get("content-type"), "Got a response!");
Ok(())
}
}
Well it used to look like an async fn
but we just had to go ahead and eschew
the nice syntax for the sake of gaining some understanding.
Which is fortunate because really what we want is something like this:
fn fetch_thing<'a>(
client: &'a Client,
url: &'a str,
// 👇
) -> impl Future<Output = Result<(), Report>> + 'static {}
But hm since we currently borrow from client
and url
, we have to solve these.
url
is the easy one, since we're using those consts:
const URL_1: &str = "https://fasterthanli.me/articles/whats-in-the-box";
const URL_2: &str = "https://fasterthanli.me/series/advent-of-code-2020/part-13";
...they're already 'static
. So we can just require url to be 'static
as well.
fn fetch_thing<'a>(
client: &'a Client,
// 👇
url: &'static str,
) -> impl Future<Output = Result<(), Report>> + 'static {}
Alright! One lifetime down, one to go.
Well, we could require client
itself to live for static. And since it's a
reference to a Client
, that means the Client
itself must live for static.
fn fetch_thing(
// 👇
client: &'static Client,
url: &'static str,
) -> impl Future<Output = Result<(), Report>> + 'static {}
And since it's owned by main
, uhh, we can, we can... we can leak it.
#[tokio::main]
async fn main() -> Result<(), Report> {
setup()?;
let client = Client::new();
let leaked_client = Box::leak(Box::new(client));
let fut1 = fetch_thing(leaked_client, URL_1);
let fut2 = fetch_thing(leaked_client, URL_2);
tokio::spawn(fut1);
tokio::spawn(fut2);
tokio::time::sleep(Duration::from_secs(1)).await;
Ok(())
}
There! No lifetime problems.
Just leak everything. See? You don't need C!
$ RUST_LOG=info,reqwest=debug cargo run --quiet --release
Jul 25 18:54:53.614 DEBUG reqwest::connect: starting new connection: https://fasterthanli.me/
Jul 25 18:54:53.614 DEBUG reqwest::connect: starting new connection: https://fasterthanli.me/
Jul 25 18:54:53.708 DEBUG reqwest::async_impl::client: response '200 OK' for https://fasterthanli.me/articles/whats-in-the-box
Jul 25 18:54:53.708 INFO waytoodeep: Got a response! url=https://fasterthanli.me/articles/whats-in-the-box content_type=Some("text/html; charset=utf-8")
Jul 25 18:54:53.733 DEBUG reqwest::async_impl::client: response '200 OK' for https://fasterthanli.me/series/advent-of-code-2020/part-13
Jul 25 18:54:53.733 INFO waytoodeep: Got a response! url=https://fasterthanli.me/series/advent-of-code-2020/part-13 content_type=Some("text/html; charset=utf-8")
Iiiiiinteresting.
So our two requests are definitely going out concurrently, we know because we know making a request from my laptop to my website takes between 80ms and 140ms, but in the logs we can see a ~25ms interval between both responses coming in.
We can also see that reqwest
, which has a connection pooling mechanism, is
immediately creating two connections, probably because by the time we fire off
the second request, the first request's connection is not done establishing yet.
And that means strace
should see...
$ cargo build --quiet --release && strace -e 'connect' ./target/release/waytoodeep
Jul 25 18:58:16.425 INFO waytoodeep: Got a response! url=https://fasterthanli.me/articles/whats-in-the-box content_type=Some("text/html; charset=utf-8")
Jul 25 18:58:16.443 INFO waytoodeep: Got a response! url=https://fasterthanli.me/series/advent-of-code-2020/part-13 content_type=Some("text/html; charset=utf-8")
+++ exited with 0 +++
...two connect calls! I knew it!
Amos I hate to break it to you.
Mh?
There's zero connect calls there.
How efficient. With Rust you don't even need to establish TCP connections to make HTTP/2 requests. Truly revolutionary.
Okay uh that can't be right. Maybe it's doing it in another thread? And maybe strace only traces the main thread by default?
Ah, there, -f
should trace all "children processes", and as everyone knows,
on Linux threads are just processes in a trenchcoat (or is it the other way around),
so, off we go:
$ cargo build --quiet --release && strace -f -e 'connect' ./target/release/waytoodeep
strace: Process 154612 attached
strace: Process 154613 attached
strace: Process 154614 attached
strace: Process 154615 attached
strace: Process 154616 attached
strace: Process 154617 attached
strace: Process 154618 attached
strace: Process 154619 attached
strace: Process 154620 attached
strace: Process 154621 attached
strace: Process 154622 attached
strace: Process 154623 attached
strace: Process 154624 attached
strace: Process 154625 attached
strace: Process 154626 attached
strace: Process 154627 attached
strace: Process 154628 attached
[pid 154627] connect(9, {sa_family=AF_UNIX, sun_path="/var/run/nscd/socket"}, 110) = -1 ENOENT (No such file or directory)
[pid 154628] connect(10, {sa_family=AF_UNIX, sun_path="/var/run/nscd/socket"}, 110) = -1 ENOENT (No such file or directory)
[pid 154627] connect(9, {sa_family=AF_UNIX, sun_path="/var/run/nscd/socket"}, 110) = -1 ENOENT (No such file or directory)
[pid 154628] connect(9, {sa_family=AF_INET, sin_port=htons(53), sin_addr=inet_addr("127.0.0.53")}, 16) = 0
[pid 154627] connect(10, {sa_family=AF_INET, sin_port=htons(53), sin_addr=inet_addr("127.0.0.53")}, 16) = 0
[pid 154627] connect(9, {sa_family=AF_INET6, sin6_port=htons(0), sin6_flowinfo=htonl(0), inet_pton(AF_INET6, "2606:4700:3034::6815:5ca9", &sin6_addr), sin6_scope_id=0}, 28) = -1 ENETUNREACH (Network is unreachable)
[pid 154627] connect(9, {sa_family=AF_UNSPEC, sa_data="\0\0\0\0\0\0\0\0\0\0\0\0\0\0"}, 16) = 0
[pid 154627] connect(9, {sa_family=AF_INET6, sin6_port=htons(0), sin6_flowinfo=htonl(0), inet_pton(AF_INET6, "2606:4700:3031::ac43:c490", &sin6_addr), sin6_scope_id=0}, 28) = -1 ENETUNREACH (Network is unreachable)
[pid 154627] connect(9, {sa_family=AF_UNSPEC, sa_data="\0\0\0\0\0\0\0\0\0\0\0\0\0\0"}, 16) = 0
[pid 154627] connect(9, {sa_family=AF_INET, sin_port=htons(0), sin_addr=inet_addr("104.21.92.169")}, 16) = 0
[pid 154627] connect(9, {sa_family=AF_UNSPEC, sa_data="\0\0\0\0\0\0\0\0\0\0\0\0\0\0"}, 16) = 0
[pid 154627] connect(9, {sa_family=AF_INET, sin_port=htons(0), sin_addr=inet_addr("172.67.196.144")}, 16) = 0
[pid 154628] connect(10, {sa_family=AF_INET6, sin6_port=htons(0), sin6_flowinfo=htonl(0), inet_pton(AF_INET6, "2606:4700:3034::6815:5ca9", &sin6_addr), sin6_scope_id=0}, 28) = -1 ENETUNREACH (Network is unreachable)
[pid 154628] connect(10, {sa_family=AF_UNSPEC, sa_data="\0\0\0\0\0\0\0\0\0\0\0\0\0\0"}, 16) = 0
[pid 154628] connect(10, {sa_family=AF_INET6, sin6_port=htons(0), sin6_flowinfo=htonl(0), inet_pton(AF_INET6, "2606:4700:3031::ac43:c490", &sin6_addr), sin6_scope_id=0}, 28) = -1 ENETUNREACH (Network is unreachable)
[pid 154628] connect(10, {sa_family=AF_UNSPEC, sa_data="\0\0\0\0\0\0\0\0\0\0\0\0\0\0"}, 16) = 0
[pid 154628] connect(10, {sa_family=AF_INET, sin_port=htons(0), sin_addr=inet_addr("104.21.92.169")}, 16) = 0
[pid 154628] connect(10, {sa_family=AF_UNSPEC, sa_data="\0\0\0\0\0\0\0\0\0\0\0\0\0\0"}, 16) = 0
[pid 154628] connect(10, {sa_family=AF_INET, sin_port=htons(0), sin_addr=inet_addr("172.67.196.144")}, 16) = 0
[pid 154625] connect(9, {sa_family=AF_INET, sin_port=htons(443), sin_addr=inet_addr("104.21.92.169")}, 16) = -1 EINPROGRESS (Operation now in progress)
[pid 154626] connect(10, {sa_family=AF_INET, sin_port=htons(443), sin_addr=inet_addr("104.21.92.169")}, 16) = -1 EINPROGRESS (Operation now in progress)
Jul 25 19:00:53.862 INFO waytoodeep: Got a response! url=https://fasterthanli.me/articles/whats-in-the-box content_type=Some("text/html; charset=utf-8")
Jul 25 19:00:53.880 INFO waytoodeep: Got a response! url=https://fasterthanli.me/series/advent-of-code-2020/part-13 content_type=Some("text/html; charset=utf-8")
[pid 154628] +++ exited with 0 +++
[pid 154627] +++ exited with 0 +++
[pid 154618] +++ exited with 0 +++
[pid 154614] +++ exited with 0 +++
[pid 154612] +++ exited with 0 +++
[pid 154619] +++ exited with 0 +++
[pid 154617] +++ exited with 0 +++
[pid 154613] +++ exited with 0 +++
[pid 154615] +++ exited with 0 +++
[pid 154623] +++ exited with 0 +++
[pid 154616] +++ exited with 0 +++
[pid 154624] +++ exited with 0 +++
[pid 154621] +++ exited with 0 +++
[pid 154622] +++ exited with 0 +++
[pid 154626] +++ exited with 0 +++
[pid 154620] +++ exited with 0 +++
[pid 154625] +++ exited with 0 +++
+++ exited with 0 +++shell
Wowee that's a whole bunch of connects.
So first it tries to connect to nscd because apparently we still live in the 90s:
[pid 154627] connect(9, {sa_family=AF_UNIX, sun_path="/var/run/nscd/socket"}, 110) = -1 ENOENT (No such file or directory)
...thankfully my system doesn't have it, so it moves on to whatever
/etc/resolv.conf
says to use to make DNS lookups...
[pid 154628] connect(9, {sa_family=AF_INET, sin_port=htons(53), sin_addr=inet_addr("127.0.0.53")}, 16) = 0
And then it finally gets a bunch of results like 172.67.196.144
and
104.21.92.169
, which are Cloudflare IP addresses,
and also some IPv6 stuff which doesn't work because I forcibly disabled IPv6 in my
weird "HyperV VM on Windows 11" setup:
[pid 154627] connect(9, {sa_family=AF_INET6, sin6_port=htons(0), sin6_flowinfo=htonl(0), inet_pton(AF_INET6, "2606:4700:3034::6815:5ca9", &sin6_addr), sin6_scope_id=0}, 28) = -1 ENETUNREACH (Network is unreachable)
And finally it decides to use the IPv4 address 104.21.92.169
for both
requests, and we can see here that those are non-blocking connects because
instead of returning 0
it returns -1
which means "I'm doing it, I'm doing
it, check back later".
[pid 154625] connect(9, {sa_family=AF_INET, sin_port=htons(443), sin_addr=inet_addr("104.21.92.169")}, 16) = -1 EINPROGRESS (Operation now in progress)
[pid 154626] connect(10, {sa_family=AF_INET, sin_port=htons(443), sin_addr=inet_addr("104.21.92.169")}, 16) = -1 EINPROGRESS (Operation now in progress)
Okay! So we have two connects. If we ignore DNS.
We also have a bunch of threads.
Is this how async Rust works? We just have a bunch of threads? And that's how it can do work "in the background"?
Before we answer that question, let's change our code to actually wait for both futures to be done, instead of waiting for an arbitrary one second.
#[tokio::main]
async fn main() -> Result<(), Report> {
setup()?;
let client = Client::new();
let leaked_client = Box::leak(Box::new(client));
let fut1 = fetch_thing(leaked_client, URL_1);
let fut2 = fetch_thing(leaked_client, URL_2);
let handle1 = tokio::spawn(fut1);
let handle2 = tokio::spawn(fut2);
handle1.await.unwrap()?;
handle2.await.unwrap()?;
Ok(())
}
Wait, aren't we back to the point where we wait for the first request to be complete, and then we fire off the second request?
Well, no! See, if we run it a bunch of times:
$ RUST_LOG=info cargo run --quiet --release
Jul 25 19:11:07.934 INFO waytoodeep: Got a response! url=https://fasterthanli.me/articles/whats-in-the-box content_type=Some("text/html; charset=utf-8")
Jul 25 19:11:07.958 INFO waytoodeep: Got a response! url=https://fasterthanli.me/series/advent-of-code-2020/part-13 content_type=Some("text/html; charset=utf-8")
$ RUST_LOG=info cargo run --quiet --release
Jul 25 19:11:08.676 INFO waytoodeep: Got a response! url=https://fasterthanli.me/articles/whats-in-the-box content_type=Some("text/html; charset=utf-8")
Jul 25 19:11:08.680 INFO waytoodeep: Got a response! url=https://fasterthanli.me/series/advent-of-code-2020/part-13 content_type=Some("text/html; charset=utf-8")
$ RUST_LOG=info cargo run --quiet --release
Jul 25 19:11:09.325 INFO waytoodeep: Got a response! url=https://fasterthanli.me/articles/whats-in-the-box content_type=Some("text/html; charset=utf-8")
Jul 25 19:11:09.338 INFO waytoodeep: Got a response! url=https://fasterthanli.me/series/advent-of-code-2020/part-13 content_type=Some("text/html; charset=utf-8")
$ RUST_LOG=info cargo run --quiet --release
Jul 25 19:11:10.134 INFO waytoodeep: Got a response! url=https://fasterthanli.me/series/advent-of-code-2020/part-13 content_type=Some("text/html; charset=utf-8")
Jul 25 19:11:10.144 INFO waytoodeep: Got a response! url=https://fasterthanli.me/articles/whats-in-the-box content_type=Some("text/html; charset=utf-8")
...well, "box" wins most of the time (it does have a headstart), but "advent" comes in first sometimes! Which is exactly what I was looking for.
Right. So it's doing things in parallel. Because it has threads.
No. But don't trust me, let's take a look.
It's not because of threads
Let's run our little program in GDB, mostly because I still haven't acquired LLDB muscle memory, which I'm sure will show up any day now, through no effort of my own.
$ cargo build --quiet && gdb --quiet --args ./target/debug/waytoodeep
Reading symbols from ./target/debug/waytoodeep...
warning: Missing auto-load script at offset 0 in section .debug_gdb_scripts
of file /home/amos/ftl/waytoodeep/target/debug/waytoodeep.
Use `info auto-load python-scripts [REGEXP]' to list them.
(gdb)
There, good.
Now before we launch the program we're gonna set up a breakpoint. Did I say breakpoint? I meant catchpoint. I don't know the name of all the functions that are involved in making an HTTP/2 request, but I know the name of the syscall used to connect somewhere, and so, that's what we're going to break on. Or catch.
(gdb) catch syscall connect
Catchpoint 1 (syscall 'connect' [42])
Now we're good to go!
$ Starting program: /home/amos/ftl/waytoodeep/target/debug/waytoodeep
[Thread debugging using libthread_db enabled]
Using host libthread_db library "/lib/x86_64-linux-gnu/libthread_db.so.1".
[New Thread 0x7ffff7c28700 (LWP 158945)]
[New Thread 0x7ffff7a27700 (LWP 158946)]
[New Thread 0x7fffef826700 (LWP 158947)]
[New Thread 0x7ffff7826700 (LWP 158948)]
[New Thread 0x7ffff7625700 (LWP 158949)]
[New Thread 0x7ffff7424700 (LWP 158950)]
[New Thread 0x7ffff7223700 (LWP 158951)]
[New Thread 0x7ffff701f700 (LWP 158952)]
[New Thread 0x7ffff6e1e700 (LWP 158953)]
[New Thread 0x7ffff6c1a700 (LWP 158954)]
[New Thread 0x7ffff6a16700 (LWP 158955)]
[New Thread 0x7ffff680f700 (LWP 158956)]
[New Thread 0x7ffff660e700 (LWP 158957)]
[New Thread 0x7ffff640a700 (LWP 158958)]
[New Thread 0x7ffff6206700 (LWP 158959)]
[New Thread 0x7ffff5f4b700 (LWP 158960)]
[New Thread 0x7ffff5d4a700 (LWP 158961)]
[Switching to Thread 0x7ffff5f4b700 (LWP 158960)]
Thread 17 "tokio-runtime-w" hit Catchpoint 1 (call to syscall connect), 0x00007ffff7d5033b in __libc_connect (fd=fd@entry=9, addr=..., addr@entry=...,
len=len@entry=110) at ../sysdeps/unix/sysv/linux/connect.c:26
26 ../sysdeps/unix/sysv/linux/connect.c: No such file or directory.
(gdb)
Alright cool, that was fast! So we stopped in "Thread 17", which is named "tokio-runtime-w", because I guess all the other letters were taken already.
...the "w" is for "worker".
I don't know! Why do they truncate stuff like that?
Is this your first day at Unix?
Ok so, Thread 17, what are the other threads doing?
(gdb) info threads
Id Target Id Frame
1 Thread 0x7ffff7c2c6c0 (LWP 158941) "waytoodeep" syscall () at ../sysdeps/unix/sysv/linux/x86_64/syscall.S:38
2 Thread 0x7ffff7c28700 (LWP 158945) "tokio-runtime-w" syscall () at ../sysdeps/unix/sysv/linux/x86_64/syscall.S:38
3 Thread 0x7ffff7a27700 (LWP 158946) "tokio-runtime-w" 0x00007ffff7d4f5ce in epoll_wait (epfd=3, events=0x555556338b60, maxevents=1024, timeout=-1)
at ../sysdeps/unix/sysv/linux/epoll_wait.c:30
4 Thread 0x7fffef826700 (LWP 158947) "tokio-runtime-w" syscall () at ../sysdeps/unix/sysv/linux/x86_64/syscall.S:38
5 Thread 0x7ffff7826700 (LWP 158948) "tokio-runtime-w" syscall () at ../sysdeps/unix/sysv/linux/x86_64/syscall.S:38
6 Thread 0x7ffff7625700 (LWP 158949) "tokio-runtime-w" syscall () at ../sysdeps/unix/sysv/linux/x86_64/syscall.S:38
7 Thread 0x7ffff7424700 (LWP 158950) "tokio-runtime-w" syscall () at ../sysdeps/unix/sysv/linux/x86_64/syscall.S:38
8 Thread 0x7ffff7223700 (LWP 158951) "tokio-runtime-w" syscall () at ../sysdeps/unix/sysv/linux/x86_64/syscall.S:38
9 Thread 0x7ffff701f700 (LWP 158952) "tokio-runtime-w" syscall () at ../sysdeps/unix/sysv/linux/x86_64/syscall.S:38
10 Thread 0x7ffff6e1e700 (LWP 158953) "tokio-runtime-w" syscall () at ../sysdeps/unix/sysv/linux/x86_64/syscall.S:38
11 Thread 0x7ffff6c1a700 (LWP 158954) "tokio-runtime-w" syscall () at ../sysdeps/unix/sysv/linux/x86_64/syscall.S:38
12 Thread 0x7ffff6a16700 (LWP 158955) "tokio-runtime-w" syscall () at ../sysdeps/unix/sysv/linux/x86_64/syscall.S:38
13 Thread 0x7ffff680f700 (LWP 158956) "tokio-runtime-w" syscall () at ../sysdeps/unix/sysv/linux/x86_64/syscall.S:38
14 Thread 0x7ffff660e700 (LWP 158957) "tokio-runtime-w" syscall () at ../sysdeps/unix/sysv/linux/x86_64/syscall.S:38
15 Thread 0x7ffff640a700 (LWP 158958) "tokio-runtime-w" syscall () at ../sysdeps/unix/sysv/linux/x86_64/syscall.S:38
16 Thread 0x7ffff6206700 (LWP 158959) "tokio-runtime-w" syscall () at ../sysdeps/unix/sysv/linux/x86_64/syscall.S:38
* 17 Thread 0x7ffff5f4b700 (LWP 158960) "tokio-runtime-w" 0x00007ffff7d5033b in __libc_connect (fd=fd@entry=9, addr=..., addr@entry=..., len=len@entry=110)
at ../sysdeps/unix/sysv/linux/connect.c:26
18 Thread 0x7ffff5d4a700 (LWP 158961) "tokio-runtime-w" 0x00007ffff7d48a46 in __GI___mmap64 (offset=0, fd=-1, flags=16418, prot=0, len=134217728, addr=0x0)
at ../sysdeps/unix/sysv/linux/mmap64.c:59
Ah.
Could we maybe get one more stackframe?
(gdb) thread apply all backtrace 2
Thread 18 (Thread 0x7ffff5d4a700 (LWP 158961)):
#0 0x00007ffff7d48a46 in __GI___mmap64 (offset=0, fd=-1, flags=16418, prot=0, len=134217728, addr=0x0) at ../sysdeps/unix/sysv/linux/mmap64.c:59
#1 __GI___mmap64 (addr=addr@entry=0x0, len=len@entry=134217728, prot=prot@entry=0, flags=flags@entry=16418, fd=fd@entry=-1, offset=offset@entry=0) at ../sysdeps/unix/sysv/linux/mmap64.c:47
(More stack frames follow...)
Thread 17 (Thread 0x7ffff5f4b700 (LWP 158960)):
#0 0x00007ffff7d5033b in __libc_connect (fd=fd@entry=9, addr=..., addr@entry=..., len=len@entry=110) at ../sysdeps/unix/sysv/linux/connect.c:26
#1 0x00007ffff7d8b713 in open_socket (type=type@entry=GETFDHST, key=key@entry=0x7ffff7de5ccb "hosts", keylen=keylen@entry=6) at nscd_helper.c:185
(More stack frames follow...)
Thread 16 (Thread 0x7ffff6206700 (LWP 158959)):
#0 syscall () at ../sysdeps/unix/sysv/linux/x86_64/syscall.S:38
#1 0x0000555555b9f1d1 in parking_lot_core::thread_parker::imp::ThreadParker::futex_wait (self=0x7ffff6206498, ts=...) at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/parking_lot_core-0.8.3/src/thread_parker/linux.rs:112
(More stack frames follow...)
Thread 15 (Thread 0x7ffff640a700 (LWP 158958)):
#0 syscall () at ../sysdeps/unix/sysv/linux/x86_64/syscall.S:38
#1 0x0000555555b9f1d1 in parking_lot_core::thread_parker::imp::ThreadParker::futex_wait (self=0x7ffff640a498, ts=...) at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/parking_lot_core-0.8.3/src/thread_parker/linux.rs:112
(More stack frames follow...)
Thread 14 (Thread 0x7ffff660e700 (LWP 158957)):
#0 syscall () at ../sysdeps/unix/sysv/linux/x86_64/syscall.S:38
#1 0x0000555555b9f1d1 in parking_lot_core::thread_parker::imp::ThreadParker::futex_wait (self=0x7ffff660e498, ts=...) at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/parking_lot_core-0.8.3/src/thread_parker/linux.rs:112
(More stack frames follow...)
Thread 13 (Thread 0x7ffff680f700 (LWP 158956)):
#0 syscall () at ../sysdeps/unix/sysv/linux/x86_64/syscall.S:38
#1 0x0000555555b9f1d1 in parking_lot_core::thread_parker::imp::ThreadParker::futex_wait (self=0x7ffff680f498, ts=...) at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/parking_lot_core-0.8.3/src/thread_parker/linux.rs:112
(More stack frames follow...)
Ah. They're mostly just parked. Which means they're idle. More accurately, they're waiting for work to do.
We can also look at all those threads from htop, I mean we already know they're there, but I just think htop is neat. Thanks Hisham!
So uh, I can't help but notice we have a bunch of threads, and we also have a bunch of cores. Maybe it's creating one thread per core?
Ah, yup. And then some blocking threads for good measure, because, as we've seen
in the strace
output above, it's making some blocking connect
calls to do
DNS lookups (well, glibc is doing that), so that all runs outside of the worker
threads so it doesn't block the other tasks.
Okay yeah!
So it is threads. It can do multiple things at the same time because it has many threads.
Well bear, I'm scrolling the docs here and it says there's a single-threaded executor.
Yeah but if we use that then it'll do one request at a time! I mean it has to, right? Because it's doing all of that thanks to threads?
Here's the thing bear, I'm not so sure... let's try it out.
// 👇
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Report> {
// (same as before)
}
So now if we run it...
$ RUST_LOG=info cargo run --quiet --release
Jul 25 19:50:15.977 INFO waytoodeep: Got a response! url=https://fasterthanli.me/articles/whats-in-the-box content_type=Some("text/html; charset=utf-8")
Jul 25 19:50:15.994 INFO waytoodeep: Got a response! url=https://fasterthanli.me/series/advent-of-code-2020/part-13 content_type=Some("text/html; charset=utf-8")
Huh! Those responses are...
17 milliseconds apart.
Yeah that's not enough time to make a full request.
So it does make the requests in parallel?
...concurrently.
So then it must use threads.
Oh what is it with bears and threads. Or am I thinking of cats.
No matter, let's make sure we only have one thread.
AhAH!
Yeah okay but those are the blocking threads. It's all because of DNS. You can see that there's no longer a zillion (15) worker threads:
(The reason there used to be 15 worker threads, by the way, is that I leave one core for the host operating system to do its thing so that even if my Linux VM is firing on all cylinders, my computer doesn't stop responding).
If we took DNS out of the equation then we'd see it actually is using just the one thread, which I guess we're gonna do because I can feel you're still a little skeptical.
Interlude: let's not leak memory
But before we do: it really bothers me that we're leaking the reqwest Client
.
Instead of doing that, we can make it atomically-reference-counted, so that it lives as long as either task is still alive.
It's a pretty easy change:
// 👇 Atomically Reference Counted = Arc
use std::sync::Arc;
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Report> {
setup()?;
// 👇 there we go
let client = Arc::new(Client::new());
// 👇
let fut1 = fetch_thing(client.clone(), URL_1);
// (cloning it only increases the reference count)
let fut2 = fetch_thing(client.clone(), URL_2);
let handle1 = tokio::spawn(fut1);
let handle2 = tokio::spawn(fut2);
handle1.await.unwrap()?;
handle2.await.unwrap()?;
Ok(())
}
#[allow(clippy::manual_async_fn)]
fn fetch_thing(
// 👇 now taking this, we have shared ownership of it
client: Arc<Client>,
url: &'static str,
) -> impl Future<Output = Result<(), Report>> + 'static {
async move {
// luckily this 👇 only requires `&self`
let res = client.get(url).send().await?.error_for_status()?;
info!(%url, content_type = ?res.headers().get("content-type"), "Got a response!");
Ok(())
}
}
There. I feel much better. We're no longer leaking dozens of bytes in our program that never runs for more than a couple seconds. All is right with the world.
Amos you're not gonna believe this, I looked at the definition of reqwest
's
Client
and it's just this:
#[derive(Clone)]
pub struct Client {
inner: Arc<ClientRef>,
}
Ah, well, turns out it's already reference-counted, so we can just take a
Client
:
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Report> {
setup()?;
// 👇
let client = Client::new();
// 👇
let fut1 = fetch_thing(client.clone(), URL_1);
// no need to clone a second time
let fut2 = fetch_thing(client, URL_2);
let handle1 = tokio::spawn(fut1);
let handle2 = tokio::spawn(fut2);
handle1.await.unwrap()?;
handle2.await.unwrap()?;
Ok(())
}
#[allow(clippy::manual_async_fn)]
fn fetch_thing(
// 👇
client: Client,
url: &'static str,
) -> impl Future<Output = Result<(), Report>> + 'static {
async move {
let res = client.get(url).send().await?.error_for_status()?;
info!(%url, content_type = ?res.headers().get("content-type"), "Got a response!");
Ok(())
}
}
There.
Oh and for reference, the much simpler "async fn" version of that works just as well:
async fn fetch_thing(client: Client, url: &str) -> Result<(), Report> {
let res = client.get(url).send().await?.error_for_status()?;
info!(%url, content_type = ?res.headers().get("content-type"), "Got a response!");
Ok(())
}
We don't even need to specifically request that url
is borrowed for 'static
.
If it happens to be 'static
, then the resulting Future will be too. If it's
not, then it won't.
So for example, this breaks:
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Report> {
setup()?;
let client = Client::new();
// this is a `String`, owned by main
let url1 = String::from(URL_1);
// we're borrowing from main 👇
let fut1 = fetch_thing(client.clone(), &url1);
let fut2 = fetch_thing(client, URL_2);
let handle1 = tokio::spawn(fut1);
let handle2 = tokio::spawn(fut2);
handle1.await.unwrap()?;
handle2.await.unwrap()?;
Ok(())
}
$ cargo check
Checking waytoodeep v0.1.0 (/home/amos/ftl/waytoodeep)
error[E0597]: `url1` does not live long enough
--> src/main.rs:18:44
|
18 | let fut1 = fetch_thing(client.clone(), &url1);
| ----------------------------^^^^^-
| | |
| | borrowed value does not live long enough
| argument requires that `url1` is borrowed for `'static`
...
28 | }
| - `url1` dropped here while still borrowed
Very cool.
I mean, yes, until you change one bit of code and suddenly the whole Future
isn't Send
anymore, and you really need it to be Send
, which is what that whole
ordeal, I mean,
article, was about.
Before we go any further, I also wanted to mention that, well, spawning two
futures and immediately waiting for both of them to be done is sorta silly to do
with tokio::spawn
, here we could just as well reach for FuturesUnordered
.
$ cargo add futures@0.3.16
Updating 'https://github.com/rust-lang/crates.io-index' index
Adding futures v0.3.16 to dependencies
use futures::{stream::FuturesUnordered, StreamExt};
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Report> {
setup()?;
let client = Client::new();
let mut group = vec![
fetch_thing(client.clone(), URL_1),
fetch_thing(client, URL_2),
]
.into_iter()
.collect::<FuturesUnordered<_>>();
while let Some(item) = group.next().await {
// propagate errors
item?;
}
Ok(())
}
And with that solution, we can await an arbitrary number of futures, and they're still polled concurrently:
$ RUST_LOG=info cargo run --quiet --release
Jul 25 20:12:37.208 INFO waytoodeep: Got a response! url=https://fasterthanli.me/articles/whats-in-the-box content_type=Some("text/html; charset=utf-8")
Jul 25 20:12:37.227 INFO waytoodeep: Got a response! url=https://fasterthanli.me/series/advent-of-code-2020/part-13 content_type=Some("text/html; charset=utf-8")
Just... 19 milliseconds apart - those are concurrent for sure.
Let's get rid of DNS altogether
Okay so let's forget about reqwest
for a moment.
HTTP isn't that hard, we can just speak it ourselves. All we need is TCP.
use std::net::SocketAddr;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::TcpStream,
};
async fn fetch_thing(name: &str) -> Result<(), Report> {
// look mom, no DNS!
let addr: SocketAddr = ([1, 1, 1, 1], 80).into();
let mut socket = TcpStream::connect(addr).await?;
// we're writing straight to the socket, there's no buffering
// so no need to flush
socket.write_all(b"GET / HTTP/1.1\r\n").await?;
socket.write_all(b"Host: 1.1.1.1\r\n").await?;
socket.write_all(b"User-Agent: cool-bear\r\n").await?;
socket.write_all(b"Connection: close\r\n").await?;
socket.write_all(b"\r\n").await?;
let mut response = String::with_capacity(256);
socket.read_to_string(&mut response).await?;
let status = response.lines().next().unwrap_or_default();
info!(%status, %name, "Got response!");
// dropping the socket will close the connection
Ok(())
}
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Report> {
setup()?;
let mut group = vec![
fetch_thing("first"),
fetch_thing("second"),
]
.into_iter()
.collect::<FuturesUnordered<_>>();
while let Some(item) = group.next().await {
// propagate errors
item?;
}
Ok(())
}
Running it "works":
$ RUST_LOG=info cargo run --quiet --release
Jul 25 20:24:05.158 INFO waytoodeep: Got response! status=HTTP/1.1 301 Moved Permanently name=second
Jul 25 20:24:05.159 INFO waytoodeep: Got response! status=HTTP/1.1 301 Moved Permanently name=first
(Oh look, second
won the race!)
And there's no filthy filthy DNS getting in the way anymore.
Of course http://1.1.1.1
redirects us to the HTTPS version of the page, and
it's not technically that hard to just use rustls to speak TLS, but the article
is getting long and I don't think we should be...
chanting TLS, TLS, TLS!
Ahhhhhh alright.
$ cargo add tokio-rustls@0.22.0
Updating 'https://github.com/rust-lang/crates.io-index' index
Adding tokio-rustls v0.22.0 to dependencies
$ cargo add webpki@0.21.4
Updating 'https://github.com/rust-lang/crates.io-index' index
Adding webpki v0.21.4 to dependencies
$ cargo add webpki-roots@0.21.1
Updating 'https://github.com/rust-lang/crates.io-index' index
Adding webpki-roots v0.21.1 to dependencies
And while we're at it:
$ cargo rm reqwest
Removing reqwest from dependencies
use std::sync::Arc;
use webpki::DNSNameRef;
use tokio_rustls::{rustls::ClientConfig, TlsConnector};
async fn fetch_thing(name: &str) -> Result<(), Report> {
// look out it's port 443 now
let addr: SocketAddr = ([1, 1, 1, 1], 443).into();
let socket = TcpStream::connect(addr).await?;
// establish a TLS session...
let connector: TlsConnector = {
let mut config = ClientConfig::new();
config
.root_store
.add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS);
Arc::new(config).into()
};
// we have to use the proper DNS name now 👇
let dnsname = DNSNameRef::try_from_ascii_str("one.one.one.one")?;
let mut socket = connector.connect(dnsname, socket).await?;
// we're writing straight to the socket, there's no buffering
// so no need to flush
socket.write_all(b"GET / HTTP/1.1\r\n").await?;
// 👇
socket.write_all(b"Host: one.one.one.one\r\n").await?;
socket.write_all(b"User-Agent: cool-bear\r\n").await?;
socket.write_all(b"Connection: close\r\n").await?;
socket.write_all(b"\r\n").await?;
let mut response = String::with_capacity(256);
socket.read_to_string(&mut response).await?;
let status = response.lines().next().unwrap_or_default();
info!(%status, %name, "Got response!");
// dropping the socket will close the connection
Ok(())
}
$ RUST_LOG=info cargo run --quiet --release
Jul 25 20:31:32.627 INFO waytoodeep: Got response! status=HTTP/1.1 200 OK name=second
Jul 25 20:31:32.658 INFO waytoodeep: Got response! status=HTTP/1.1 200 OK name=first
There! Now it's a 200. You happy?
Very.
Now can we talk about the thing the article is about?
You mean we haven't talked about it yet?
I mean, the goal was to understand Rust futures and I guess we've done some good progress.
But let's consider the following scenario: we want to perform two requests concurrently, either of which can fail, and we want to stop as soon as either request fails, or when both requests succeed.
tokio's try_join macro
As it turns out, there's a macro for that!
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Report> {
setup()?;
let res = tokio::try_join!(fetch_thing("first"), fetch_thing("second"),)?;
info!(?res, "All done!");
Ok(())
}
Which does exactly what we wanted!
$ RUST_LOG=info cargo run --quiet --release
Jul 25 20:44:52.150 INFO waytoodeep: Got response! status=HTTP/1.1 200 OK name=first
Jul 25 20:44:52.165 INFO waytoodeep: Got response! status=HTTP/1.1 200 OK name=second
Jul 25 20:44:52.165 INFO waytoodeep: All done! res=((), ())
Again, quick check: the responses are coming in 15ms apart - they're definitely being sent concurrently.
try_join!
does the awaiting for us, and it's result-aware. If everything goes
well, we end up with a tuple of the contents of the Ok
variant of each of the
future's results (in order).
So, we can have our futures return something:
// 👇
async fn fetch_thing(name: &str) -> Result<&str, Report> {
// (omitted)
// 👇
Ok(name)
}
To convince ourselves that they're returned in order, no matter who wins the race:
$ RUST_LOG=info cargo run --quiet --release
Jul 25 20:47:56.967 INFO waytoodeep: Got response! status=HTTP/1.1 200 OK name=second
Jul 25 20:47:56.967 INFO waytoodeep: Got response! status=HTTP/1.1 200 OK name=first
Jul 25 20:47:56.967 INFO waytoodeep: All done! res=("first", "second")
$ RUST_LOG=info cargo run --quiet --release
Jul 25 20:47:57.933 INFO waytoodeep: Got response! status=HTTP/1.1 200 OK name=first
Jul 25 20:47:57.935 INFO waytoodeep: Got response! status=HTTP/1.1 200 OK name=second
Jul 25 20:47:57.935 INFO waytoodeep: All done! res=("first", "second")
$ RUST_LOG=info cargo run --quiet --release
Jul 25 20:47:58.942 INFO waytoodeep: Got response! status=HTTP/1.1 200 OK name=second
Jul 25 20:47:58.946 INFO waytoodeep: Got response! status=HTTP/1.1 200 OK name=first
Jul 25 20:47:58.946 INFO waytoodeep: All done! res=("first", "second")
Alright cool. Where do we go from there?
Well, first, now that we're DNS-free, we can put to rest the notion that things are happening "at the same time" thanks to threads.
Because, well, if we run our program under strace
, asking it to track childrens
with -f
(the f
is for "follow children" btw):
$ cargo build --quiet --release && strace -f -e 'connect' ./target/release/waytoodeep
connect(9, {sa_family=AF_INET, sin_port=htons(443), sin_addr=inet_addr("1.1.1.1")}, 16) = -1 EINPROGRESS (Operation now in progress)
connect(10, {sa_family=AF_INET, sin_port=htons(443), sin_addr=inet_addr("1.1.1.1")}, 16) = -1 EINPROGRESS (Operation now in progress)
Jul 25 20:51:54.004 INFO waytoodeep: Got response! status=HTTP/1.1 200 OK name=first
Jul 25 20:51:54.013 INFO waytoodeep: Got response! status=HTTP/1.1 200 OK name=second
Jul 25 20:51:54.015 INFO waytoodeep: All done! res=("first", "second")
+++ exited with 0 +++
...we see two connect calls as expected, but no threads whatsoever. And in that run, the responses came back 9 milliseconds apart! Which is less than my ping to 1.1.1.1:
$ ping -c 1 1.1.1.1
PING 1.1.1.1 (1.1.1.1) 56(84) bytes of data.
64 bytes from 1.1.1.1: icmp_seq=1 ttl=57 time=13.7 ms
--- 1.1.1.1 ping statistics ---
1 packets transmitted, 1 received, 0% packet loss, time 0ms
rtt min/avg/max/mdev = 13.748/13.748/13.748/0.000 ms
Okay, okay, I'm convinced. It's not because of the threads. It's because of some event loop that the executor has. It's making non-blocking syscalls and uhh..
...and it's subscribing to events related to the resources it's managing, so it knows when a socket is ready to read from / write to, for example.
Ah, that makes sense. I mean, in theory. In practice I uhh.. so futures are a bunch of state... and ye shall await them, and uhh where does the subscribing happen?
Well, let's try making our own try_join
- as a function, and only for exactly
two futures. And we'll see what happens.
Ah, we've made our own future before, how bad could it be?
Pretty bad as it turns out
Let's start simple! So we want a function that takes two futures and returns a single future.
// in `src/main.rs`
mod tj;
// in `src/tj.rs`
use std::future::Future;
pub fn try_join<A, B>(a: A, b: B) -> impl Future<Output = ()>
where
A: Future,
B: Future,
{
todo!("implement me!");
}
Mh. It shouldn't return the empty tuple, it should return a tuple of... the successful results. Or the first error that came in.
So we have to add a few more generic type parameters: one for the error type
(we'll assume both futures have the same error type), and one for the type of
the Ok
variant of each future.
pub fn try_join<A, B, AR, BR, E>(a: A, b: B) -> impl Future<Output = Result<(AR, BR), E>>
where
A: Future<Output = Result<AR, E>>,
B: Future<Output = Result<BR, E>>,
{
todo!("implement me!");
}
Okay! It's a mouthful, but I think we got everything we need.
Note that we're using "impl Trait" syntax so we don't have to make our "try join
future" type public. It really doesn't matter but it'll save us a few pub
keywords, and my fingers are getting tired of typing. Oh so tired.
So, let's make that type!
It's gonna need to hold A
, and B
, and be aware of the AR
, BR
and E
types, so, yeah, hope you're hungry for generic type parameter salad.
struct TryJoin<A, B, AR, BR, E>
where
A: Future<Output = Result<AR, E>>,
B: Future<Output = Result<BR, E>>,
{
a: A,
b: B,
}
And now we can return that from try_join
:
pub fn try_join<A, B, AR, BR, E>(a: A, b: B) -> impl Future<Output = Result<(AR, BR), E>>
where
A: Future<Output = Result<AR, E>>,
B: Future<Output = Result<BR, E>>,
{
// so simple!
TryJoin { a, b }
}
Which I think illustrates very nicely the fact that creating the future is just building state. It doesn't do any work.
Of course this doesn't compile because TryJoin
doesn't implement Future
yet.
But not to worry! rust-analyzer
can help us generate the missing bits:
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
impl<A, B, AR, BR, E> Future for TryJoin<A, B, AR, BR, E>
where
A: Future<Output = Result<AR, E>>,
B: Future<Output = Result<BR, E>>,
{
type Output = Result<(AR, BR), E>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
todo!()
}
}
And here's how we'd use it, if we'd actually finished it:
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Report> {
setup()?;
let res = tj::try_join(fetch_thing("first"), fetch_thing("second")).await?;
info!(?res, "All done!");
Ok(())
}
But right now it just panics:
$ RUST_LOG=info cargo run --quiet --release
The application panicked (crashed).
Message: not yet implemented
Location: src/tj.rs:32
Backtrace omitted.
Run with RUST_BACKTRACE=1 environment variable to display it.
Run with RUST_BACKTRACE=full to include source snippets.
So I guess we need to implement it!
Well, let's try polling at least one of our futures with it.
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let a = self.a.poll(cx);
todo!()
}
$ RUST_LOG=info cargo run --quiet --release
error[E0599]: no method named `poll` found for type parameter `A` in the current scope
--> src/tj.rs:32:24
|
32 | let a = self.a.poll(cx);
| ^^^^ method not found in `A`
|
::: /home/amos/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/future/future.rs:100:8
|
100 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
| ---- the method is available for `Pin<&mut A>` here
|
help: consider wrapping the receiver expression with the appropriate type
|
32 | let a = Pin::new(&mut self.a).poll(cx);
| ^^^^^^^^^^^^^ ^
Ah! Good start, good start.
Well I went into pinning in excruciating detail here, so let's just keep it short.
Methods usually have a receiver like that:
impl MyType {
fn do_thing(&self) {
println!("my value is {}", self.value)
}
}
Which is really just short for:
impl MyType {
fn do_thing(self: &Self) {
println!("my value is {}", self.value)
}
}
Where Self
is MyType
because we're in an impl MyType
block.
Good? Well, there's a lot of other possible receiver types, and Pin<&mut Self>
is one of them:
impl MyType {
fn do_thing(self: Pin<&mut Self>) {
// good luck!1
}
}
And what this means is that MyType
must be pinned somewhere - ie. it is
guaranteed not to move. Unless it implements Unpin
, and then it can be
unpinned, moved, and re-pinned again.
For the rest of this article, we won't assume that our futures A
and B
are
Unpin
, which means we'll never move them ourselves (only drop them).
You can tell we don't require A
and B
to be Unpin
because we didn't add
a specific where clause to require it. If we did, we'd have an additional trait
bound like this:
struct TryJoin<A, B, AR, BR, E>
where
// 👇
A: Future<Output = Result<AR, E>> + Unpin,
B: Future<Output = Result<BR, E>> + Unpin,
{}
But we don't, so we can't assume A
or B
are Unpin
.
So! Now our problem really is just pin projection.
We're holding a Pin<&mut TryJoin<A, B, ...>>
and we want to be holding a
Pin<&mut A>
(because that's what we need to poll A
).
In another situation, I would be reaching for something like the
pin-project crate, or perhaps
pin-project-lite, but the direction
we're going in will make using pin-project
really awkward so today, we
unsafe
instead.
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let a = unsafe { self.map_unchecked_mut(|this| &mut this.a) };
let a = a.poll(cx);
todo!()
}
That compiles. But we're using unsafe
, which means that the compiler has
officially stopped caring checking our work. We must enforce some invariants
ourselves, by being vewy vewy careful and having peers review our work, and
still getting it wrong occasionally but them's the breaks.
So now, we're able to poll a
, which is fantastic. It returns either
Poll::Ready(Result<AR, E>)
, if it's done, or Poll::Pending
if it'll be done
later.
We can match that:
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let a = unsafe { self.map_unchecked_mut(|this| &mut this.a) };
match a.poll(cx) {
Poll::Pending => {
info!("A is pending...");
return Poll::Pending;
}
Poll::Ready(res) => match res {
Ok(_) => info!("A is ready!"),
Err(e) => return Poll::Ready(Err(e)),
},
}
todo!()
}
Here we log "A is pending" until A becomes ready. This might take a few turns: after all, we are doing nontrivial stuff. We're establishing a TCP connection, then a TLS session on top of that, then doing a bunch of separate writes, then finally reading everything until EOF (end of file).
And indeed, if we run it:
$ RUST_LOG=info cargo run --quiet --release
Jul 25 22:54:14.227 INFO waytoodeep::tj: A is pending...
Jul 25 22:54:14.239 INFO waytoodeep::tj: A is pending...
Jul 25 22:54:14.239 INFO waytoodeep::tj: A is pending...
Jul 25 22:54:14.252 INFO waytoodeep::tj: A is pending...
Jul 25 22:54:14.252 INFO waytoodeep::tj: A is pending...
Jul 25 22:54:14.478 INFO waytoodeep::tj: A is pending...
Jul 25 22:54:14.478 INFO waytoodeep::tj: A is pending...
Jul 25 22:54:14.478 INFO waytoodeep::tj: A is pending...
Jul 25 22:54:14.478 INFO waytoodeep::tj: A is pending...
Jul 25 22:54:14.495 INFO waytoodeep::tj: A is pending...
Jul 25 22:54:14.495 INFO waytoodeep::tj: A is pending...
Jul 25 22:54:14.495 INFO waytoodeep::tj: A is pending...
Jul 25 22:54:14.495 INFO waytoodeep::tj: A is pending...
Jul 25 22:54:14.495 INFO waytoodeep::tj: A is pending...
Jul 25 22:54:14.495 INFO waytoodeep::tj: A is pending...
Jul 25 22:54:14.495 INFO waytoodeep::tj: A is pending...
Jul 25 22:54:14.513 INFO waytoodeep::tj: A is pending...
Jul 25 22:54:14.513 INFO waytoodeep::tj: A is pending...
Jul 25 22:54:14.513 INFO waytoodeep::tj: A is pending...
Jul 25 22:54:14.513 INFO waytoodeep::tj: A is pending...
Jul 25 22:54:14.513 INFO waytoodeep::tj: A is pending...
Jul 25 22:54:14.514 INFO waytoodeep::tj: A is pending...
Jul 25 22:54:14.522 INFO waytoodeep::tj: A is pending...
Jul 25 22:54:14.522 INFO waytoodeep::tj: A is pending...
Jul 25 22:54:14.522 INFO waytoodeep::tj: A is pending...
Jul 25 22:54:14.522 INFO waytoodeep::tj: A is pending...
Jul 25 22:54:14.522 INFO waytoodeep::tj: A is pending...
Jul 25 22:54:14.523 INFO waytoodeep::tj: A is pending...
Jul 25 22:54:14.523 INFO waytoodeep::tj: A is pending...
Jul 25 22:54:14.530 INFO waytoodeep::tj: A is pending...
Jul 25 22:54:14.530 INFO waytoodeep::tj: A is pending...
Jul 25 22:54:14.530 INFO waytoodeep: Got response! status=HTTP/1.1 200 OK name=first
Jul 25 22:54:14.530 INFO waytoodeep::tj: A is ready!
The application panicked (crashed).
Message: not yet implemented
Location: src/tj.rs:46
Backtrace omitted.
Run with RUST_BACKTRACE=1 environment variable to display it.
Run with RUST_BACKTRACE=full to include source snippets.
We see that it does take a few turns.
Note that that code only returns Poll::Ready
if A errors out, because we want
to gather the results of both A and B.
So let's do the same with B:
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let a = unsafe { self.map_unchecked_mut(|this| &mut this.a) };
match a.poll(cx) {
Poll::Pending => {
info!("A is pending...");
return Poll::Pending;
}
Poll::Ready(res) => match res {
Ok(_) => info!("A is ready!"),
Err(e) => return Poll::Ready(Err(e)),
},
}
let b = unsafe { self.map_unchecked_mut(|this| &mut this.b) };
match b.poll(cx) {
Poll::Pending => {
info!("B is pending...");
return Poll::Pending;
}
Poll::Ready(res) => match res {
Ok(_) => info!("B is ready!"),
Err(e) => return Poll::Ready(Err(e)),
},
}
todo!()
}
And.. whoops:
RUST_LOG=info cargo run --quiet --release
error[E0382]: use of moved value: `self`
--> src/tj.rs:46:26
|
33 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
| ---- move occurs because `self` has type `Pin<&mut TryJoin<A, B, AR, BR, E>>`, which does not implement the `Copy` trait
34 | let a = unsafe { self.map_unchecked_mut(|this| &mut this.a) };
| ------------------------------------- `self` moved due to this method call
...
46 | let b = unsafe { self.map_unchecked_mut(|this| &mut this.b) };
| ^^^^ value used here after move
|
note: this function takes ownership of the receiver `self`, which moves `self`
--> /home/amos/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/pin.rs:776:43
|
776 | pub unsafe fn map_unchecked_mut<U, F>(self, func: F) -> Pin<&'a mut U>
| ^^^^
Right. map_unchecked_mut
consumed self
.
No worries though, we can use .as_mut()
:
// 👇
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// 👇
let a = unsafe { self.as_mut().map_unchecked_mut(|this| &mut this.a) };
match a.poll(cx) {
Poll::Pending => {
info!("A is pending...");
return Poll::Pending;
}
Poll::Ready(res) => match res {
Ok(_) => info!("A is ready!"),
Err(e) => return Poll::Ready(Err(e)),
},
}
// 👇
let b = unsafe { self.as_mut().map_unchecked_mut(|this| &mut this.b) };
match b.poll(cx) {
Poll::Pending => {
info!("B is pending...");
return Poll::Pending;
}
Poll::Ready(res) => match res {
Ok(_) => info!("B is ready!"),
Err(e) => return Poll::Ready(Err(e)),
},
}
todo!()
}
But this still doesn't work:
$ RUST_LOG=info cargo run --quiet --release
(cut)
Jul 25 22:57:07.913 INFO waytoodeep::tj: A is pending...
Jul 25 22:57:07.913 INFO waytoodeep::tj: A is pending...
Jul 25 22:57:07.913 INFO waytoodeep: Got response! status=HTTP/1.1 200 OK name=first
Jul 25 22:57:07.913 INFO waytoodeep::tj: A is ready!
The application panicked (crashed).
Message: `async fn` resumed after completion
Location: src/main.rs:24
Backtrace omitted.
Run with RUST_BACKTRACE=1 environment variable to display it.
Run with RUST_BACKTRACE=full to include source snippets.
See, as soon as a Future
returns Poll::Ready
, we should not poll it anymore.
And why would we? It's already given us its output. And if the output is
non-Copy
, it might only be able to give it to us once.
So, we need to 1) keep track that A is done, and 2) store its output somewhere.
Well, we can just add a couple fields to our struct!
struct TryJoin<A, B, AR, BR, E>
where
A: Future<Output = Result<AR, E>>,
B: Future<Output = Result<BR, E>>,
{
a: A,
b: B,
// 👇
a_res: Option<AR>,
b_res: Option<BR>,
}
Let's not forget to initialize them:
pub fn try_join<A, B, AR, BR, E>(a: A, b: B) -> impl Future<Output = Result<(AR, BR), E>>
where
A: Future<Output = Result<AR, E>>,
B: Future<Output = Result<BR, E>>,
{
TryJoin {
a,
b,
// 👇
a_res: None,
b_res: None,
}
}
And now the plan is:
- if
a_res
isSome
, then we don't need to polla
because it already finished - same for
b_res
andb
Alright let's do it. Also, because we're already using unsafe
code, and so
we're already in charge of maintaining the invariants, I'm going to make an
executive decision and pin-project both a and b in one fell swoop, like so:
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = unsafe { self.get_unchecked_mut() };
let (a, b) = unsafe {
(
Pin::new_unchecked(&mut this.a),
Pin::new_unchecked(&mut this.b),
)
};
if this.a_res.is_none() {
match a.poll(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(res) => match res {
Ok(x) => this.a_res = Some(x),
Err(e) => return Poll::Ready(Err(e)),
},
}
}
if this.b_res.is_none() {
match b.poll(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(res) => match res {
Ok(x) => this.b_res = Some(x),
Err(e) => return Poll::Ready(Err(e)),
},
}
}
todo!()
}
Alright, this should at least give a
and b
the opportunity to complete
before we panic:
$ RUST_LOG=info cargo run --quiet --release
Jul 25 23:11:03.851 INFO waytoodeep: Got response! status=HTTP/1.1 200 OK name=first
Jul 25 23:11:04.380 INFO waytoodeep: Got response! status=HTTP/1.1 200 OK name=second
The application panicked (crashed).
Message: not yet implemented
Location: src/tj.rs:69
Backtrace omitted.
Run with RUST_BACKTRACE=1 environment variable to display it.
Run with RUST_BACKTRACE=full to include source snippets.
Splendid! Now all we have to do is extract both results and return those.
// instead of the `todo!()`:
if let (Some(_), Some(_)) = (&this.a_res, &this.b_res) {
let a = this.a_res.take().unwrap();
let b = this.b_res.take().unwrap();
Poll::Ready(Ok((a, b)))
} else {
Poll::Pending
}
And that works:
$ RUST_LOG=info cargo run --quiet --release
Jul 25 23:13:32.497 INFO waytoodeep: Got response! status=HTTP/1.1 200 OK name=first
Jul 25 23:13:32.829 INFO waytoodeep: Got response! status=HTTP/1.1 200 OK name=second
Jul 25 23:13:32.829 INFO waytoodeep: All done! res=("first", "second")
...but it's not a try_join
implementation. What we're doing is exactly the
same as this:
// (pseudo-code, buncha things are missing)
async fn try_join(a: A, b: B) {
let a = self.a.await?;
let b = self.b.await?;
Ok((a, b))
}
ie. it's sequential. Remember, just because tokio's executor might use a bunch
of threads doesn't automatically mean things are happening at the same time.
Earlier, we had to use tokio::spawn
, or UnorderedFutures
, or try_join!
to
get that to happen.
So let's review... what happens when we poll a
?
if this.a_res.is_none() {
match a.poll(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(res) => match res {
Ok(x) => this.a_res = Some(x),
Err(e) => return Poll::Ready(Err(e)),
},
}
}
Mhh if it's pending we return pending, and so... ah, that's it already. We
should not return if a
is pending. Because what if b
is already ready with
an error?
What if, for example, we invoke our try_join
like this:
use std::time::Duration;
use tokio::time::sleep;
info!("Joining...");
let res = tj::try_join(
async move {
sleep(Duration::from_millis(2000)).await;
Ok(())
},
async move {
sleep(Duration::from_millis(10)).await;
Err::<(), Report>(color_eyre::eyre::eyre!("uh oh"))
},
)
.await;
...then a
takes 2 seconds to get ready, whereas b
would return an error
within 10 milliseconds, if only we polled it!
Alas, we do not:
$ RUST_LOG=info cargo run --quiet --release
Jul 25 23:19:26.972 INFO waytoodeep: Joining...
Jul 25 23:19:28.990 INFO waytoodeep: All done! res=Err(
0: uh oh
Location:
src/main.rs:28
(cut)
(Look at the timestamps)
The whole point of try_join
is that it fails early: as soon as any of the
promises returns Result::Err
.
So we must poll a
and b
at the same time. Well... not strictly at the same time.
We must poll them concurrently, every time our TryJoin
future is polled,
until the give an output.
That's a relatively easy fix - just don't return Poll::Pending
when either
future returns Poll::Pending
!
Also, I'm tired of typing Poll::Ready
and Poll<T>
implements From<T>
, so
we can golf our way down using .into()
:
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = unsafe { self.get_unchecked_mut() };
let (a, b) = unsafe {
(
Pin::new_unchecked(&mut this.a),
Pin::new_unchecked(&mut this.b),
)
};
if this.a_res.is_none() {
if let Poll::Ready(res) = a.poll(cx) {
match res {
Ok(x) => this.a_res = Some(x),
Err(e) => return Err(e).into(),
}
}
}
if this.b_res.is_none() {
if let Poll::Ready(res) = b.poll(cx) {
match res {
Ok(x) => this.b_res = Some(x),
Err(e) => return Err(e).into(),
}
}
}
if let (Some(_), Some(_)) = (&this.a_res, &this.b_res) {
let a = this.a_res.take().unwrap();
let b = this.b_res.take().unwrap();
Ok((a, b)).into()
} else {
Poll::Pending
}
}
There! And that...
$ RUST_LOG=info cargo run --quiet --release
Jul 25 23:22:40.238 INFO waytoodeep: Joining...
Jul 25 23:22:40.253 INFO waytoodeep: All done! res=Err(
0: uh oh
Location:
src/main.rs:28
(cut)
...works! By which I mean it fails. As expected. Expected failure is success. Which sounds pessimistic but what fault of mine is it that they're always right eventually.
And if we go back to the way we used to invoke try_join
:
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Report> {
setup()?;
info!("Joining...");
let res = tj::try_join(fetch_thing("first"), fetch_thing("second")).await?;
info!(?res, "All done!");
Ok(())
}
We can see that the race is back on: sometimes first
finishes first, sometimes
it finishes second:
$ RUST_LOG=info cargo run --quiet --release
Jul 25 23:25:25.925 INFO waytoodeep: Joining...
Jul 25 23:25:26.224 INFO waytoodeep: Got response! status=HTTP/1.1 200 OK name=first
Jul 25 23:25:26.236 INFO waytoodeep: Got response! status=HTTP/1.1 200 OK name=second
Jul 25 23:25:26.236 INFO waytoodeep: All done! res=("first", "second")
$ RUST_LOG=info cargo run --quiet --release
Jul 25 23:25:26.937 INFO waytoodeep: Joining...
Jul 25 23:25:27.237 INFO waytoodeep: Got response! status=HTTP/1.1 200 OK name=first
Jul 25 23:25:27.242 INFO waytoodeep: Got response! status=HTTP/1.1 200 OK name=second
Jul 25 23:25:27.242 INFO waytoodeep: All done! res=("first", "second")
$ RUST_LOG=info cargo run --quiet --release
Jul 25 23:25:27.865 INFO waytoodeep: Joining...
Jul 25 23:25:28.164 INFO waytoodeep: Got response! status=HTTP/1.1 200 OK name=second
Jul 25 23:25:28.818 INFO waytoodeep: Got response! status=HTTP/1.1 200 OK name=first
Jul 25 23:25:28.818 INFO waytoodeep: All done! res=("first", "second")
$ RUST_LOG=info cargo run --quiet --release
Jul 25 23:25:30.153 INFO waytoodeep: Joining...
Jul 25 23:25:31.477 INFO waytoodeep: Got response! status=HTTP/1.1 200 OK name=second
Jul 25 23:25:31.496 INFO waytoodeep: Got response! status=HTTP/1.1 200 OK name=first
Jul 25 23:25:31.496 INFO waytoodeep: All done! res=("first", "second")
...and yet the results are in the correct order.
Alright, we did it!
But!
We can do better
And lucky for us, worse is better.
See, this type bothers me:
struct TryJoin<A, B, AR, BR, E>
where
A: Future<Output = Result<AR, E>>,
B: Future<Output = Result<BR, E>>,
{
a: A,
b: B,
a_res: Option<AR>,
b_res: Option<BR>,
}
We don't need to have a_res
until a
has completed. And once it has
completed, and a_res
contains its result, we don't need a
anymore.
In fact, it's stronger than that, we should never touch a
anymore.
It almost sounds like we should have either A
, or AR
, but never both...
Mhhh. Mhhhhhhh.
SUM TYPES!
Mh?
ahem sorry: sum types. This is a job for sum types.
It sure is, but which ones? Hahaha
...Amos
So! Sum types. Rust enums. These are exactly what we want. Let's make a type
called State
, and it'll have two variants: one for when it's still a future,
and another for when it's a result. Easy!
enum State<F, T, E>
where
F: Future<Output = Result<T, E>>,
{
Future(F),
Ok(T),
}
Ah, this is gonna be great.
Let's give our TryJoin
struct two of these:
struct TryJoin<A, B, AR, BR, E>
where
A: Future<Output = Result<AR, E>>,
B: Future<Output = Result<BR, E>>,
{
a: State<A, AR, E>,
b: State<B, BR, E>,
}
The symmetry! Beautiful.
And initialize them properly:
pub fn try_join<A, B, AR, BR, E>(a: A, b: B) -> impl Future<Output = Result<(AR, BR), E>>
where
A: Future<Output = Result<AR, E>>,
B: Future<Output = Result<BR, E>>,
{
TryJoin {
a: State::Future(a),
b: State::Future(b),
}
}
Cool cool cool. Now we just have to tune our poll
method a little, so uh we
have a Pin<&mut Self>
, which we violently turn into a &mut Self
...
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = unsafe { self.get_unchecked_mut() };
...which is fine, it's fine because we pinky promise to uphold the invariants,
in this case it means we don't move what's inside of a State::Future
.
And then if a
is a State::Future
we poll it, and if it's ready we propagate
the error, or store the result for later...
if let State::Future(a) = &mut this.a {
let a = unsafe { Pin::new_unchecked(a) };
if let Poll::Ready(res) = a.poll(cx) {
match res {
Ok(t) => this.a = State::Ok(t),
Err(e) => return Err(e).into(),
}
}
}
And we do the same with b...
// you can figure that one out, I believe in you
And then if they're both State::Ok
, we're done! Otherwise we return
Poll::Pending
, so we uh:
match (this.a, this.b) {
(State::Ok(a), State::Ok(b)) => Ok((a, b)).into(),
_ => Poll::Pending,
}
Ah, so nice.
Except it doesn't compile:
$ RUST_LOG=info cargo run --quiet --release
error[E0507]: cannot move out of `this.a` which is behind a mutable reference
--> src/tj.rs:65:16
|
65 | match (this.a, this.b) {
| ^^^^^^ move occurs because `this.a` has type `State<A, AR, E>`, which does not implement the `Copy` trait
error[E0507]: cannot move out of `this.b` which is behind a mutable reference
--> src/tj.rs:65:24
|
65 | match (this.a, this.b) {
| ^^^^^^ move occurs because `this.b` has type `State<B, BR, E>`, which does not implement the `Copy` trait
error: aborting due to 2 previous errors
For more information about this error, try `rustc --explain E0507`.
error: could not compile `waytoodeep`
To learn more, run the command again with --verbose.
Because uhh... all we have is a &mut Self
. Not a Self
.
We don't own ourselves. We merely borrow ourselves.
That's.. that's deep.
So, yeah, we cannot move things out of our members, because there's really nothing that prevents someone from polling us again. And in that case, we should panic.
We should?
We should panic!.
Ah!
Of course things would be easier if we had a .take()
method just like
Option<T>
has. Where it returns whatever the Option had, replacing it with
None
.
But we don't really have a None
. We have State::Future
, and State::Ok
, but
no "neutral" state.
So let's make one!
enum State<F, T, E>
where
F: Future<Output = Result<T, E>>,
{
Future(F),
Ok(T),
Gone,
}
And now, we can replace both this.a
and this.b
with State::Gone
... and
whatever is returned, we own! So we can move out of it.
But also... we need to pattern match again.
Like so:
match (&this.a, &this.b) {
(State::Ok(_), State::Ok(_)) => {
let a = match std::mem::replace(&mut this.a, State::Gone) {
State::Ok(t) => t,
_ => unreachable!(),
};
let b = match std::mem::replace(&mut this.b, State::Gone) {
State::Ok(t) => t,
_ => unreachable!(),
};
Ok((a, b)).into()
}
_ => Poll::Pending,
}
Which honestly... I've seen worse code. It's just not very DRY.
It does work great though!
$ RUST_LOG=info cargo run --quiet --release
Jul 25 23:52:24.097 INFO waytoodeep: Joining...
Jul 25 23:52:25.050 INFO waytoodeep: Got response! status=HTTP/1.1 200 OK name=second
Jul 25 23:52:25.061 INFO waytoodeep: Got response! status=HTTP/1.1 200 OK name=first
Jul 25 23:52:25.061 INFO waytoodeep: All done! res=("first", "second")
Look at that, 11 milliseconds apart!
Deeper?
Something again bothers me about this code:
struct TryJoin<A, B, AR, BR, E>
where
A: Future<Output = Result<AR, E>>,
B: Future<Output = Result<BR, E>>,
{
a: State<A, AR, E>,
b: State<B, BR, E>,
}
because now a
and b
are tri-state: Future
, Ok
, or Gone
.
What if only one of a
or b
is Gone
? That state makes no sense!
If it happens, we would currently just return Poll::Pending
forever, which
isn't great - that's a deadlock.
Really what we want is... two enums. In fact we want the whole TryJoin
type to
be an enum
.
enum TryJoin<A, B, AR, BR, E>
where
A: Future<Output = Result<AR, E>>,
B: Future<Output = Result<BR, E>>,
{
Polling {
a: State<A, AR, E>,
b: State<B, BR, E>,
},
Done,
}
There. Initialize it like thaaaat:
pub fn try_join<A, B, AR, BR, E>(a: A, b: B) -> impl Future<Output = Result<(AR, BR), E>>
where
A: Future<Output = Result<AR, E>>,
B: Future<Output = Result<BR, E>>,
{
TryJoin::Polling {
a: State::Future(a),
b: State::Future(b),
}
}
And then, surprise! Poll<T>
implements the
Try trait, so we can
use ?
with it, so our final code is actually pretty short and sweet:
impl<A, B, AR, BR, E> Future for TryJoin<A, B, AR, BR, E>
where
A: Future<Output = Result<AR, E>>,
B: Future<Output = Result<BR, E>>,
{
type Output = Result<(AR, BR), E>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = unsafe { self.get_unchecked_mut() };
let (a, b) = match this {
Self::Polling { a, b } => (a, b),
Self::Done => panic!("TryJoin future polled after completion"),
};
if let State::Future(fut) = a {
if let Poll::Ready(res) = unsafe { Pin::new_unchecked(fut) }.poll(cx) {
*a = State::Ok(res?);
}
}
if let State::Future(fut) = b {
if let Poll::Ready(res) = unsafe { Pin::new_unchecked(fut) }.poll(cx) {
*b = State::Ok(res?);
}
}
match (a, b) {
(State::Ok(_), State::Ok(_)) => match std::mem::replace(this, Self::Done) {
Self::Polling {
a: State::Ok(a),
b: State::Ok(b),
} => Ok((a, b)).into(),
_ => unreachable!(),
},
_ => Poll::Pending,
}
}
}
Now I know what you're thinking.
I know what I'm thinking!! Isn't Pin<&mut T>
precisely for preventing things
like std::mem::swap
and std::mem::replace
? Those move things around in memory!
Which is sehr verboten!
Well, my furry friend. It's only verboten if we promised not to move it. But in
this case, we only move self
/ this
after we're done polling both futures.
Past that point, we never use those futures again, pinned or unpinned. And we never promised the result themselves were going to be pinned!
Mh. Mhhhhhhhh.
We just have to make up our mind on whether something's going to be "always pin"
or "never pin", and then we can't lose might write code that turns out to be
sound.
In our case, TryJoin::Polling(State::Future(_))
is always pinned, and
everything else isn't.
Sure, we make a quick trip from Pin<&mut Self>
to &mut Self
and back to
Pin<&mut A>
, but as long as we don't move ourselves in-between, it's all good.
If we used std::mem::replace
or std::mem::swap
while we're still holding
the futures, then that would be unsound. But we're not. So we're fine. I think.
I'm fairly sure. I'm sure people will write in if it's not.
That's it
Gaze upon our work, and rejoice:
// in `src/tj.rs`
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
pub fn try_join<A, B, AR, BR, E>(a: A, b: B) -> impl Future<Output = Result<(AR, BR), E>>
where
A: Future<Output = Result<AR, E>>,
B: Future<Output = Result<BR, E>>,
{
TryJoin::Polling {
a: State::Future(a),
b: State::Future(b),
}
}
enum State<F, T, E>
where
F: Future<Output = Result<T, E>>,
{
Future(F),
Ok(T),
}
enum TryJoin<A, B, AR, BR, E>
where
A: Future<Output = Result<AR, E>>,
B: Future<Output = Result<BR, E>>,
{
Polling {
a: State<A, AR, E>,
b: State<B, BR, E>,
},
Done,
}
impl<A, B, AR, BR, E> Future for TryJoin<A, B, AR, BR, E>
where
A: Future<Output = Result<AR, E>>,
B: Future<Output = Result<BR, E>>,
{
type Output = Result<(AR, BR), E>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = unsafe { self.get_unchecked_mut() };
let (a, b) = match this {
Self::Polling { a, b } => (a, b),
Self::Done => panic!("TryJoin future polled after completion"),
};
if let State::Future(fut) = a {
if let Poll::Ready(res) = unsafe { Pin::new_unchecked(fut) }.poll(cx) {
*a = State::Ok(res?);
}
}
if let State::Future(fut) = b {
if let Poll::Ready(res) = unsafe { Pin::new_unchecked(fut) }.poll(cx) {
*b = State::Ok(res?);
}
}
match (a, b) {
(State::Ok(_), State::Ok(_)) => match std::mem::replace(this, Self::Done) {
Self::Polling {
a: State::Ok(a),
b: State::Ok(b),
} => Ok((a, b)).into(),
_ => unreachable!(),
},
_ => Poll::Pending,
}
}
}
And our little HTTPS client:
// in `src/main.rs`
use color_eyre::Report;
use std::{net::SocketAddr, sync::Arc};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::TcpStream,
};
use tokio_rustls::{rustls::ClientConfig, TlsConnector};
use tracing::info;
use tracing_subscriber::EnvFilter;
use webpki::DNSNameRef;
mod tj;
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Report> {
setup()?;
info!("Joining...");
let res = tj::try_join(fetch_thing("first"), fetch_thing("second")).await?;
info!(?res, "All done!");
Ok(())
}
#[allow(dead_code)]
async fn fetch_thing(name: &str) -> Result<&str, Report> {
// look out it's port 443 now
let addr: SocketAddr = ([1, 1, 1, 1], 443).into();
let socket = TcpStream::connect(addr).await?;
// establish a TLS session...
let connector: TlsConnector = {
let mut config = ClientConfig::new();
config
.root_store
.add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS);
Arc::new(config).into()
};
let dnsname = DNSNameRef::try_from_ascii_str("one.one.one.one")?;
let mut socket = connector.connect(dnsname, socket).await?;
// we're writing straight to the socket, there's no buffering
// so no need to flush
socket.write_all(b"GET / HTTP/1.1\r\n").await?;
socket.write_all(b"Host: one.one.one.one\r\n").await?;
socket.write_all(b"User-Agent: cool-bear\r\n").await?;
socket.write_all(b"Connection: close\r\n").await?;
socket.write_all(b"\r\n").await?;
let mut response = String::with_capacity(256);
socket.read_to_string(&mut response).await?;
let status = response.lines().next().unwrap_or_default();
info!(%status, %name, "Got response!");
// dropping the socket will close the connection
Ok(name)
}
fn setup() -> Result<(), Report> {
if std::env::var("RUST_LIB_BACKTRACE").is_err() {
std::env::set_var("RUST_LIB_BACKTRACE", "1")
}
color_eyre::install()?;
if std::env::var("RUST_LOG").is_err() {
std::env::set_var("RUST_LOG", "info")
}
tracing_subscriber::fmt::fmt()
.with_env_filter(EnvFilter::from_default_env())
.init();
Ok(())
}
And it works.
$ RUST_LOG=info cargo run --quiet --release
Jul 26 00:08:13.399 INFO waytoodeep: Joining...
Jul 26 00:08:13.707 INFO waytoodeep: Got response! status=HTTP/1.1 200 OK name=first
Jul 26 00:08:13.709 INFO waytoodeep: Got response! status=HTTP/1.1 200 OK name=second
Jul 26 00:08:13.710 INFO waytoodeep: All done! res=("first", "second")
Two milliseconds apart! That has to be a new record.
Well I hope you enjoyed the article, and that hopefully it solidified your understanding of Rust futures. I've tried maintaining a healthy 50/50 mix of "of course you know that" (and if you don't you can search for it yourself) and "you've heard of that but let me show you ten different ways you can actually see it for yourself instead of vaguely being aware that's how things normally go".
So that, y'know, the article eventually ends. But if you're puzzled by some of the things you now know the keywords to look for them. Knowing the keywords is half the expertise.
Until next time, take care!
Here's another article just for you:
Aiming for correctness with types
The Nature weekly journal of science was first published in 1869. And after one and a half century, it has finally completed one cycle of carcinization, by publishing an article about the Rust programming language.
It's a really good article.
What I liked about this article is that it didn't just talk about performance, or even just memory safety - it also talked about correctness.