Getting in and out of trouble with Rust futures
I started experimenting with asynchronous Rust code back when futures 0.1
was all we had - before async/await
. I was a Rust baby then (I'm at least
a toddler now), so I quickly drowned in a sea of .and_then
, .map_err
and Either<A, B>
.
But that's all in the past! I guess!
Now everything is fine, and things go smoothly. For the most part. But even
with async/await
, there are still some cases where the compiler diagnostics are,
just, so much.
There's been serious improvemenst already in terms of diagnostics - the errors aren't as rough as they used to be, but there's still ways to go. Despite that, it's not impossible to go around them and achieve the result you need.
So let's try to do some HTTP requests, get ourselves in trouble, and instead of just "seeing if a different crate would work", get to the bottom of it, and come out the other side slightly more knowledgeable.
Let's get started!
$ cargo new trouble Created binary (application) `trouble` package
Idealized HTTP requests
For this article, we'll use the tokio runtime, with the reqwest HTTP client.
# in `Cargo.toml` [dependencies] tokio = { version = "0.2.21", features = ["full"] } reqwest = "0.10.6"
You can opt in and out of Tokio features, but Amos is just
being lazy here and using the full
feature to get "the whole
package".
Hey, no dissent in the ranks. We have places to go!
So, we'll make a request to the IANA-managed reserved
domain example.org
:
// in `src/main.rs` use std::{cmp::min, error::Error}; #[tokio::main] async fn main() -> Result<(), Box<dyn Error>> { let text = reqwest::get("http://example.org").await?.text().await?; println!("response = {}", &text[..min(text.len(), 40)]); Ok(()) }
Mhh. Seems a little too simple to be a fully-working example, but:
$ cargo run -q response = <!doctype html> <html> <head> <title
It works!
Ah, I wish I could end the article here. Look how pretty it is. Even though
we're in in an async
function, we can return a boxed error (Box<dyn Error>
), we can use the ?
sigil to bubble up errors, we can chain
await
s. First to send the request, and then to retrieve the body of the
response.
It's beautiful.
Alright champ, settle down.
But I'm not really satisfied with it - there's so many of you following at home, coding as you go along - I wouldn't want y'all to DDoS the IANA servers. That would be uncouth.
So let's spin up a test server of our own. We'll grab a crate at random closes eyes, spins lib.rs uhhhhhh that one.
$ cargo add tiny_http Updating 'https://github.com/rust-lang/crates.io-index' index Adding tiny_http v0.7.0 to dependencies
We'll also do proper logging:
$ cargo add log pretty_env_logger Updating 'https://github.com/rust-lang/crates.io-index' index Adding log v0.4.8 to dependencies Adding pretty_env_logger v0.4.0 to dependencies
// in `src/main.rs` use std::error::Error; mod server; #[tokio::main] async fn main() -> Result<(), Box<dyn Error>> { pretty_env_logger::formatted_timed_builder() .filter_level(log::LevelFilter::Info) .init(); server::start(); let text = reqwest::get("http://localhost:1729").await?.text().await?; log::info!("Request successful: {}", &text[..]); Ok(()) }
1729 is Ramanujan's number - it is the smallest number expressible as the sum of two cubes in two different ways.
// in `src/server.rs` use std::thread::spawn; use tiny_http::{Response, Server}; pub fn start() { spawn(|| run()); } fn run() { let server = Server::http("127.0.0.1:1729").unwrap(); for req in server.incoming_requests() { log::info!("Serving {:?}", req.url()); let res = Response::from_string("Hello from test server!"); req.respond(res).unwrap(); } }
$ cargo run -q 2020-07-07T15:00:28.055Z INFO trouble::server > Serving "/" 2020-07-07T15:00:28.056Z INFO trouble > Request successful: Hello from test server!
Wonderful.
Real-world HTTP requests (in the Paleoproterozoic Era)
The approach we used above is pretty naive though. Real-world HTTP requests don't always succeed. Sometimes they fail, and sometimes, it's not even your fault!
It's a story as old as time.
The year is 1800 million years ago.
Okay, gonna sit this one out, good luck readers.
You are a string of beads connected by a very fine thread, and you're trying to break into the service industry. You hear about this internet thing, investigate, and before you know it, you've signed a contract with a CDN company with a minimum 10TB monthly commitment.
Things appear to be running smoothly, until you notice a series of failed requests in the log. Turns out the CDN you picked has POP in Laurentia, but not in Baltica!
Your Baltic customers are seeing request errors left and right, it's a disater. But you can't afford to break off your six-month engagement with your CDN, so you decide to solve the solution client-side.
Meanwhile, at your CDN's offices, they've just deployed their state of the art DDoS protection:
// in `src/server.rs` fn run() { let server = Server::http("127.0.0.1:1729").unwrap(); let mut map: HashMap<_, usize> = HashMap::new(); for req in server.incoming_requests() { let count = *map .entry(req.remote_addr().ip()) .and_modify(|c| *c += 1) .or_default(); log::info!("Serving {:?}", req.url()); let res = if count < 2 { Response::from_string("The mainframe is warming up...").with_status_code(503) } else { Response::from_string("Hello from test server!") }; req.respond(res).unwrap(); } }
Your client is unprepared for such measures, as it doesn't even return an error:
$ cargo run -q 2020-07-07T15:27:46.388Z INFO trouble::server > Serving "/" 2020-07-07T15:27:46.389Z INFO trouble > Request successful: The mainframe is warming up...
Luckily, it's pretty easy to address that
// in `src/main.rs` // in `async fn main()` let text = reqwest::get("http://localhost:1729") .await? .error_for_status()? .text() .await?;
$ cargo run -q 2020-07-07T15:29:24.127Z INFO trouble::server > Serving "/" Error: reqwest::Error { kind: Status(503), url: "http://localhost:1729/" }
..but that doesn't really solve your problem, now does it?
You decide to simply retry a request if it fails:
// in `src/main.rs` #[tokio::main] async fn main() -> Result<(), Box<dyn Error>> { // omitted: server & logging setup async fn do_req() -> Result<(), Box<dyn Error>> { let text = reqwest::get("http://localhost:1729") .await? .error_for_status()? .text() .await?; log::info!("Request successful: {}", &text[..]); Ok(()) } for _ in 0..5 { if let Err(e) = do_req().await { log::error!("{}", e); } else { break; } } Ok(()) }
$ cargo run -q 2020-07-07T15:51:54.177Z INFO trouble::server > Serving "/" 2020-07-07T15:51:54.178Z ERROR trouble > HTTP status server error (503 Service Unavailable) for url (http://localhost:1729/) 2020-07-07T15:51:54.183Z INFO trouble::server > Serving "/" 2020-07-07T15:51:54.184Z ERROR trouble > HTTP status server error (503 Service Unavailable) for url (http://localhost:1729/) 2020-07-07T15:51:54.190Z INFO trouble::server > Serving "/" 2020-07-07T15:51:54.190Z INFO trouble > Request successful: Hello from test server!
Better! For you.
But not for your CDN.
If at first you don't succeed, back off
Shortly after you roll out your client update, your Paleophone rings.
"Hello? Yes, this is Horodyskia..."
The conversation is short and to the point.
"High request volume, you say? I'll be right on it."
Your CDN rep is not happy. Not happy at all. In a bout of anger, they scolded you: "it's not the Archean anymore - get with the program!"
So, you look up "http request retry best practices", and quickly realize you're supposed to wait before retrying, to give the backend some time to recover.
// in `src/main.rs` use std::time::Duration; // in `async fn main()` for _ in 0..5 { if let Err(e) = do_req().await { log::error!("{}", e); tokio::time::delay_for(Duration::from_secs(1)).await; } else { break; } }
Things seem a little better:
$ cargo run -q 2020-07-07T16:01:30.501Z INFO trouble::server > Serving "/" 2020-07-07T16:01:30.502Z ERROR trouble > HTTP status server error (503 Service Unavailable) for url (http://localhost:1729/) 2020-07-07T16:01:31.509Z INFO trouble::server > Serving "/" 2020-07-07T16:01:31.510Z ERROR trouble > HTTP status server error (503 Service Unavailable) for url (http://localhost:1729/) 2020-07-07T16:01:32.517Z INFO trouble::server > Serving "/" 2020-07-07T16:01:32.518Z INFO trouble > Request successful: Hello from test server!
But you're not about to stop there. The CDN rep's diatribe has left you wounded. Your code? Bad? That's impossible. But just in case, you started a full code review.
And you realize there's a lot of code duplication. For every request you do,
you have to use a for
loop to retry.
You decide to make a re-usable facility to retry HTTP requests.
// in `src/client.rs` use reqwest::{Error, IntoUrl, Method, Request, RequestBuilder, Response}; use std::time::Duration; pub struct Client { inner: reqwest::Client, } impl Client { pub fn new() -> Result<Self, Error> { let inner = reqwest::Client::builder() .user_agent("horo bot/1.0") .build()?; Ok(Self { inner }) } pub fn request<U: IntoUrl>(&self, method: Method, url: U) -> RequestBuilder { self.inner.request(method, url) } pub async fn execute(&self, req: Request) -> Result<Response, Error> { let mut tries: usize = 5; loop { let res = self .inner .execute(req.try_clone().unwrap()) .await .and_then(|r| r.error_for_status()); match res { Err(e) if tries > 1 => { tries -= 1; log::error!("{}", e); tokio::time::delay_for(Duration::from_secs(1)).await; } res => return res, } } } }
You're pleased with how easy it is to use:
// in `src/main.rs` mod client; #[tokio::main] async fn main() -> Result<(), Box<dyn Error>> { pretty_env_logger::formatted_timed_builder() .filter_level(log::LevelFilter::Info) .init(); server::start(); let client = client::Client::new()?; let req = client .request(reqwest::Method::GET, "http://localhost:1729") .build()?; let text = client.execute(req).await?.text().await?; log::info!("Request successful: {}", &text[..]); Ok(()) }
Sure, it panics if the request has a body. But bodies haven't been invented yet, so all is well in Proto-Laurasia.
As you re-read "Retrying HTTP requests - 10 best practices", you notice number 7 says to wait longer and longer between requests - and to add a random amount of time ("jitter", they call it), to avoid having many instances of your client synchronize their retry cycles and hammer the backend mercilessly.
"Well", you think to yourself (you don't possess any mouth-like appendages), that seems like something another organism has done before.
And sure enough, crates.io (which has always existed) has your back.
$ cargo add backoff backoff-futures Updating 'https://github.com/rust-lang/crates.io-index' index Adding backoff v0.1.6 to dependencies Adding backoff-futures v0.3.0 to dependencies
It's great timing, too, as your CDN has decided to increase the number of "warm-up rounds" to 5.
// in `src/server.rs` // in `fn run()` // in `for req in server.incoming_requests()` let res = if count < 5 {
You find that the backoff
crates are rather easy to use, and
you're happy:
// in `src/client.rs` impl Client { // omitted: other methods pub async fn execute(&self, req: Request) -> Result<Response, Error> { let exec = || async { self.inner .execute(req.try_clone().unwrap()) .await .and_then(|r| r.error_for_status()) .map_err(backoff::Error::Transient) }; let mut backoff = backoff::ExponentialBackoff::default(); use backoff_futures::BackoffExt; exec.with_backoff(&mut backoff).await.map_err(|e| match e { backoff::Error::Permanent(e) | backoff::Error::Transient(e) => e, }) } }
It even seems to work!
$ cargo run -q 2020-07-07T16:58:18.982Z INFO trouble::server > Serving "/" 2020-07-07T16:58:19.356Z INFO trouble::server > Serving "/" 2020-07-07T16:58:19.767Z INFO trouble::server > Serving "/" 2020-07-07T16:58:21.425Z INFO trouble::server > Serving "/" 2020-07-07T16:58:22.338Z INFO trouble::server > Serving "/" 2020-07-07T16:58:25.834Z INFO trouble::server > Serving "/" 2020-07-07T16:58:25.835Z INFO trouble > Request successful: Hello from test server!
15 minutes in and not a single Futures-related problem.
Nicely done.
A Futures-related problem
Time has passed - it's time... for the Mesoproterozoic era.
It comes with its share of code churn. What used to be a standalone executable must now become a shared library, so it can be loaded as a "native add-on" for an Electron app.
The #[tokio::main]
attribute can no longer be used - since there is
no longer a main
function. You have to manage the Runtime manually.
And all the library calls are synchronous, so you have to spawn tasks
onto the runtime and deal with the outcome later.
// in `src/main.rs` use std::error::Error; use tokio::runtime::Runtime; mod client; mod server; fn main() -> Result<(), Box<dyn Error>> { pretty_env_logger::formatted_timed_builder() .filter_level(log::LevelFilter::Info) .init(); server::start(); let mut runtime = Runtime::new().unwrap(); let task = || async { let client = client::Client::new()?; let req = client .request(reqwest::Method::GET, "http://localhost:1729") .build()?; let text = client.execute(req).await?.text().await?; log::info!("Request successful: {}", &text[..]); let res: Result<_, Box<dyn Error>> = Ok(()); res }; let join_handle = runtime.spawn((|| async move { match task().await { Ok(_) => {} Err(e) => { log::error!("Something went wrong: {}", e); } } })()); runtime.block_on(join_handle).unwrap(); Ok(()) }
There's uhhh.. there's still a main
function.
Yeah cool bear, I know. There's still a main
function. But do you
really want me to go into details of exposing our functionality as
a native node.js addon now?
Ah. Maybe later.
Exactly.
The important thing is that it compiles and runs just fine, and as you can see, it..
$ cargo run -q error: future cannot be sent between threads safely --> src/main.rs:29:31 | 29 | let join_handle = runtime.spawn((|| async { | ^^^^^ future created by async block is not `Send` | = help: the trait `std::marker::Send` is not implemented for `dyn std::future::Future<Output = std::result::Result<reqwest::async_impl::response::Response, backoff::error::Error<reqwest::error::Error>>>` note: future is not `Send` as it awaits another future which is not `Send` --> src/client.rs:33:9 | 33 | exec.with_backoff(&mut backoff).await.map_err(|e| match e { | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ await occurs here on type `std::pin::Pin<std::boxed::Box<dyn std::future::Future<Output = std::result::Result<reqwest::async_impl::response::Response, backoff::error::Error<reqwest::error::Error>>>>>`, which is not `Send`
...doesn't?
But it just worked. And you haven't touched that part of the code.
What happened?
rustc says, "Send
is not implemented for this Future
". And that's because,
this Future
awaits another future
, which itself is not Send
.
What's the std::marker::Send trait for again?
Types that can be transferred across thread boundaries.
This trait is automatically implemented when the compiler determines it's appropriate.
An example of a non-
Send
type is the reference-counting pointerrc::Rc
. If two threads attempt to cloneRc
s that point to the same reference-counted value, they might try to update the reference count at the same time, which is undefined behavior becauseRc
doesn't use atomic operations. Its cousinsync::Arc
does use atomic operations (incurring some overhead) and thus isSend
.See the Nomicon for more details.
Ah, that makes sense. I think.
We're using Runtime::spawn
to "poll this future in the background", it
would make sense that, since we've enabled the threaded executor (via the
full
feature), it would be sent from the main thread to another thread.
Even though you end up using Runtime::block_on
to wait on it from the
same thread?
Yeah! Even then, it might run in a different thread. We need to know that
we can send it to any thread we want, and that's what Send
is about.
But what about our Future
isn't Send
?
Let's try to see what the concrete type of our Future
is. A cool trick
to find the type of something is to try to assign it to a variable
of type ()
:
// in `src/main.rs` let f: () = task();
$ cargo check -q error[E0308]: mismatched types --> src/main.rs:29:17 | 15 | let task = || async { | _________________________- 16 | | let client = client::Client::new()?; 17 | | 18 | | let req = client ... | 26 | | res 27 | | }; | |_____- the found generator 28 | 29 | let f: () = task(); | -- ^^^^^^ expected `()`, found opaque type | | | expected due to this | ::: /home/amos/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/src/libcore/future/mod.rs:48:43 | 48 | pub const fn from_generator<T>(gen: T) -> impl Future<Output = T::Return> | ------------------------------- the found opaque type | = note: expected unit type `()` found opaque type `impl std::future::Future`
Ah. An opaque type. Well that trick doesn't work here, but it's a cool one anyway. Keep it in mind for synchronous code.
Let's try another trick:
fn take_f<F, O>(f: F) where F: std::future::Future<Output = O> + Send, { } take_f(task());
$ cargo check -q error: future cannot be sent between threads safely --> src/main.rs:35:5 | 29 | fn take_f<F, O>(f: F) | ------ required by a bound in this 30 | where 31 | F: std::future::Future<Output = O> + Send, | ---- required by this bound in `main::take_f` ... 35 | take_f(task()); | ^^^^^^ future created by async block is not `Send` | = help: the trait `std::marker::Send` is not implemented for `dyn std::future::Future<Output = std::result::Result<reqwest::async_ impl::response::Response, backoff::error::Error<reqwest::error::Error>>>` note: future is not `Send` as it awaits another future which is not `Send` --> src/client.rs:33:9 | 33 | exec.with_backoff(&mut backoff).await.map_err(|e| match e { | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ await occurs here on type `std::pin::Pin<std::boxed::Box<dyn std::future::Future<Output = std::result::Result<reqwest::async_impl::response::Response, backoff::error::Error<reqwest::error::Error>>>>>`, which is not `Send`
Variations on a theme, huh. But at least we can now climb up the chain,
and see where the lack of Send
really comes from.
For example, we could go in client.rs
and mess with execute
:
// in `src/client.rs` impl Client { pub async fn execute(&self, req: Request) -> Result<Response, Error> { let exec = || async { self.inner .execute(req.try_clone().unwrap()) .await .and_then(|r| r.error_for_status()) .map_err(backoff::Error::Transient) }; // new: fn take_f<F, O>(f: F) where F: std::future::Future<Output = O> + Send, { } take_f(exec()); let mut backoff = backoff::ExponentialBackoff::default(); use backoff_futures::BackoffExt; exec.with_backoff(&mut backoff).await.map_err(|e| match e { backoff::Error::Permanent(e) | backoff::Error::Transient(e) => e, }) } }
This compiles just fine. So the future returned by exec()
is Send
. Which
means Result<_, _>
is Send
, which means reqwest::Response
and reqwest::Error
are both Send
.
Let's move down the chain a bit. We'll improve our checking function so that it
returns the Future
as-is:
// in `src/client.rs` impl Client { pub async fn execute(&self, req: Request) -> Result<Response, Error> { let exec = || async { self.inner .execute(req.try_clone().unwrap()) .await .and_then(|r| r.error_for_status()) .map_err(backoff::Error::Transient) }; let mut backoff = backoff::ExponentialBackoff::default(); use backoff_futures::BackoffExt; let f = async { exec.with_backoff(&mut backoff).await.map_err(|e| match e { backoff::Error::Permanent(e) | backoff::Error::Transient(e) => e, }) }; fn check<F, O>(f: F) -> F where F: std::future::Future<Output = O> + Send, { f } check(f).await } }
$ cargo check -q error: future cannot be sent between threads safely --> src/client.rs:45:9 | 39 | fn check<F, O>(f: F) -> F | ----- required by a bound in this 40 | where 41 | F: std::future::Future<Output = O> + Send, | ---- required by this bound in `client::Client::execute::{{closure}}#0::check` ... 45 | check(f).await | ^^^^^ future created by async block is not `Send`
AhAH! So backoff-futures
is the problem.
$ mkdir vendor $ git clone https://github.com/jakubadamw/backoff-futures vendor/backoff-futures Cloning into 'vendor/backoff-futures'... (etc.) $ git checkout 0.3.0 Note: switching to '0.3.0'. You are in 'detached HEAD' state. You can look around, make experimental (etc.)
Take a look around, git say. Alright, let's use ripgrep for that:
$ rg -A 1 'Send' src/lib.rs 91:#[async_trait::async_trait(?Send)] 92-pub trait BackoffExt<T, E, Fut, F> { -- 123:#[async_trait::async_trait(?Send)] 124-impl<T, E, Fut, F> BackoffExt<T, E, Fut, F> for F
Those are the only two mentions of Send
in the entire codebase.
You're starting to doubt that codebase ever worked, so, you decide to run the tests:
$ cargo t -q running 5 tests ..... test result: ok. 5 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out running 1 test test src/lib.rs - (line 13) ... ok test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out
Nope, everything's fine.
Alright, let's look at that async_trait
then:
// in `vendor/backoff-futures/src/lib.rs` #[async_trait::async_trait(?Send)] pub trait BackoffExt<T, E, Fut, F> { /// Returns a future that, when polled, will first ask `self` for a new future (with an output /// type `Result<T, backoff::Error<_>>` to produce the expected result. /// /// If the underlying future is ready with an `Err` value, the nature of the error /// (permanent/transient) will determine whether polling the future will employ the provided /// `backoff` strategy and will result in the work being retried. /// /// Specifically, [`backoff::Error::Permanent`] errors will be returned immediately. /// [`backoff::Error::Transient`] errors will, depending on the particular [`backoff::backoff::Backoff`], /// result in a retry attempt, most likely with a delay. /// /// If the underlying future is ready with an [`std::result::Result::Ok`] value, it will be returned immediately. async fn with_backoff<B>(self, backoff: &mut B) -> Result<T, Error<E>> where B: Backoff, T: 'async_trait, E: 'async_trait, Fut: 'async_trait; /// Same as [`BackoffExt::with_backoff`] but takes an extra `notify` closure that will be called every time /// a new backoff is employed on transient errors. The closure takes the new delay duration as an argument. async fn with_backoff_notify<B, N>(self, backoff: &mut B, notify: N) -> Result<T, Error<E>> where B: Backoff, N: FnMut(&Error<E>, Duration), T: 'async_trait, E: 'async_trait, Fut: 'async_trait; }
Wait. An async fn
in a trait? I thought that wasn't stable yet?
Well... there's a crate for that.
And in that crate's documentation in the Non-threadsafe futures section, it says:
Not all async traits need futures that are
dyn Future + Send
. To avoid having Send and Sync bounds placed on the async trait methods, invoke the async trait macro as#[async_trait(?Send)]
on both the trait and the impl blocks.
AhAH! So async-trait
transforms async fn
s into regular fn
s, which by
default return Send
futures, but if you specify ?Send
, they're not Send
.
You take a moment to collect your thoughts. Things escalated quickly, you weren't prepared for this. But heroes rise to the occasion - even if, in that scenario, "rising" means "maintaining your own fork".
So you decide to get your beads dirty, and fix it:
// in `vendor/backoff-futures/src/lib.rs` #[async_trait::async_trait] pub trait BackoffExt<T, E, Fut, F> { // etc. } #[async_trait::async_trait] impl<T, E, Fut, F> BackoffExt<T, E, Fut, F> for F where F: FnMut() -> Fut, Fut: Future<Output = Result<T, backoff::Error<E>>> { // etc. }
$ # in vendor/backoff-futures $ cargo check -q cargo check -q error[E0277]: `F` cannot be sent between threads safely --> src/lib.rs:135:5 | 129 | async fn with_backoff<B>(self, backoff: &mut B) -> Result<T, Error<E>> | ------------------- within this `impl std::future::Future` ... 135 | / { 136 | | let backoff_struct = BackoffFutureBuilder { backoff, f: self }; 137 | | backoff_struct.fut(|_, _| {}).await 138 | | } | |_____^ `F` cannot be sent between threads safely | = help: within `impl std::future::Future`, the trait `std::marker::Send` is not implemented for `F` = note: required because it appears within the type `[static generator@src/lib.rs:135:5: 138:6 _self:F, backoff:&mut B for<'r , 's, 't0, 't1, 't2, 't3> {std::future::ResumeTy, F, &'r mut B, BackoffFutureBuilder<'s, B, F, Fut, T, E>, [closure@src/lib.rs:13 7:28: 137:37], impl std::future::Future, ()}]` = note: required because it appears within the type `std::future::from_generator::GenFuture<[static generator@src/lib.rs:135: 5: 138:6 _self:F, backoff:&mut B for<'r, 's, 't0, 't1, 't2, 't3> {std::future::ResumeTy, F, &'r mut B, BackoffFutureBuilder<'s, B , F, Fut, T, E>, [closure@src/lib.rs:137:28: 137:37], impl std::future::Future, ()}]>` = note: required because it appears within the type `impl std::future::Future` = note: required because it appears within the type `impl std::future::Future` = note: required for the cast to the object type `dyn std::future::Future<Output = std::result::Result<T, backoff::error::Err or<E>>> + std::marker::Send` help: consider further restricting this bound | 126 | F: FnMut() -> Fut + std::marker::Send, | ^^^^^^^^^^^^^^^^^^^ (omitted: 10 more errors)
Okay, well, this should be easy. You just need to add a bunch of Send
bounds, right?
// in `vendor/backoff-futures/src/lib.rs` #[async_trait::async_trait] pub trait BackoffExt<T, E, Fut, F> { async fn with_backoff<B>(self, backoff: &mut B) -> Result<T, Error<E>> where B: Backoff + Send, // new! T: 'async_trait, E: 'async_trait, Fut: 'async_trait; /// Same as [`BackoffExt::with_backoff`] but takes an extra `notify` closure that will be called every time /// a new backoff is employed on transient errors. The closure takes the new delay duration as an argument. async fn with_backoff_notify<B, N>(self, backoff: &mut B, notify: N) -> Result<T, Error<E>> where B: Backoff + Send, // new! N: FnMut(&Error<E>, Duration) + Send, // new! T: 'async_trait, E: 'async_trait, Fut: 'async_trait; } #[async_trait::async_trait] impl<T, E, Fut, F> BackoffExt<T, E, Fut, F> for F where F: (FnMut() -> Fut) + Send, // new T: Send, // new E: Send, // new Fut: Future<Output = Result<T, backoff::Error<E>>> + Send, // new { async fn with_backoff<B>(self, backoff: &mut B) -> Result<T, Error<E>> where B: Backoff + Send, // new T: 'async_trait, E: 'async_trait, Fut: 'async_trait, { let backoff_struct = BackoffFutureBuilder { backoff, f: self }; backoff_struct.fut(|_, _| {}).await } async fn with_backoff_notify<B, N>(self, backoff: &mut B, notify: N) -> Result<T, Error<E>> where B: Backoff + Send, // new N: FnMut(&Error<E>, Duration) + Send, // new T: 'async_trait, E: 'async_trait, Fut: 'async_trait, { let backoff_struct = BackoffFutureBuilder { backoff, f: self }; backoff_struct.fut(notify).await } }
One, two, three... ten! Ten Send
bounds.
Yeah, that should do it:
$ # still in vendor/backoff-futures $ cargo t -q running 5 tests ..... test result: ok. 5 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out running 1 test test src/lib.rs - (line 13) ... ok test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out
Great! Before publishing your fork on crates.io, or thinking about upstreaming it, you decide to check that those changes actually solve your problem, though.
# in `Cargo.toml` (the root one) [patch.crates-io] backoff-futures = { path = "vendor/backoff-futures" }
$ cargo run -q 2020-07-07T17:57:36.378Z INFO trouble::server > Serving "/" 2020-07-07T17:57:37.080Z INFO trouble::server > Serving "/" 2020-07-07T17:57:38.123Z INFO trouble::server > Serving "/" 2020-07-07T17:57:39.386Z INFO trouble::server > Serving "/" 2020-07-07T17:57:40.695Z INFO trouble::server > Serving "/" 2020-07-07T17:57:43.523Z INFO trouble::server > Serving "/" 2020-07-07T17:57:43.524Z INFO trouble > Request successful: Hello from test server!
Hooray! Wall of text: vanquished. Service: production-ready. Almost.
Right after you figure out which errors should be transient and which should be permanent.
Nevertheless, your codebase is in much better shape now. It's sure to last well into the Mesozoic era.
Closing words
I hope you enjoyed this little adventure in finding and figuring out a Rust futures issue. It's not always that colorful - but sometimes, it is!
And when it is, there's always something that can be done. Walls of rustc errors are not necessarily dead-ends. And everyone involved is already aware of the current usability issues, and working towards a smoother experience in general.
It might take some time. But it's in the works.
GATs or nothing!
We'll see, cool bear. We'll see.
If you liked what you saw, please support my work!