Pin and suffering
👋 This page was last updated ~4 years ago. Just so you know.
I'd like to think that my understanding of "async Rust" has increased over the past year or so. I'm 100% onboard with the basic principle: I would like to handle thousands of concurrent tasks using a handful of threads. That sounds great!
And to become proficient with async Rust, I've accepted a lot of things. There are blue functions and red functions, and red (async) functions are contagious.
Eh, not exactly — you can build an executor in a blue (sync) function to block until a red (async) function is done.
And from a red (async) function, you can use something like
tokio::task::spawn_blocking
.
I guess the difficulty is when you go from red to blue, back to red — because you can't make an executor inside an executor.
See bear, this is what I'm talking about: there's just so much I have to take for granted. "Don't block the executor", "futures do nothing unless polled", "you need to pin a future before you can poll it", etc.
Well, yes? Mostly those two?
But bear, everything was so simple in synchronous Rust, and suddenly I have to uphold all these rules? I have to think about these?
Here let me show you. Say, in synchronous Rust, I just want to print hello, sleep, and then print goodbye.
I can just do this!
use std::{thread::sleep, time::Duration}; fn main() { println!("Hello!"); sleep(Duration::from_millis(500)); println!("Goodbye!"); }
$ cargo run --quiet Hello! (500ms elapse...) Goodbye!
It's so simple!
Well, the async version isn't that hard either. Try out tokio for example.
Okay, well, sure:
# in Cargo.toml [dependencies] tokio = { version = "1.4.0", features = ["full"] }
use std::{thread::sleep, time::Duration}; #[tokio::main] async fn main() { println!("Hello!"); sleep(Duration::from_millis(500)); println!("Goodbye!"); }
$ cargo run --quiet Hello! (500ms elapse...) Goodbye!
Ah, you're right! That was pretty simple.
Well, it's also wrong. You're blocking the executor.
What? I didn't see any blockage going on. In fact, it behaved exactly the same as the synchronous version. Seems okay to me.
Well, try spawning two tasks that do that, you'll see.
Okay, sure:
use std::{thread::sleep, time::Duration}; #[tokio::main] async fn main() { let one = tokio::spawn(greet()); let two = tokio::spawn(greet()); let (_, _) = tokio::join!(one, two); } async fn greet() { println!("Hello!"); sleep(Duration::from_millis(500)); println!("Goodbye!"); }
$ cargo run --quiet Hello! Hello! (500ms elapse...) Goodbye! Goodbye!
There! It just works! Thanks bear, I don't know what I was complaining about, async Rust really is quite easy.
No, you j- sigh okay, try switching to the single-threaded executor.
The what? Oh, oh wait a minute, I see it, I can just pass a flavor
argument to the tokio::main
attribute macro:
// 👇 #[tokio::main(flavor = "current_thread")] async fn main() { let one = tokio::spawn(greet()); let two = tokio::spawn(greet()); let (_, _) = tokio::join!(one, two); } // omitted: everything else
$ cargo run --quiet Hello! (500ms elapse...) Goodbye! Hello! (500ms elapse...) Goodbye!
Ah.
So we are blocking the executor. But see, that's complicated, I don't want to have to think about all that!
For me sleep
is just a syscall.
You call it and boom! It sleeps. No need to worry about anything.
Well, you could just not block the executor?
But how?
Well, tokio has its own sleep
method.
And that one doesn't block? I mean okay, sure, if you say so:
use std::time::Duration; use tokio::time::sleep; #[tokio::main(flavor = "current_thread")] async fn main() { let one = tokio::spawn(greet()); let two = tokio::spawn(greet()); let (_, _) = tokio::join!(one, two); } async fn greet() { println!("Hello!"); sleep(Duration::from_millis(500)).await; println!("Goodbye!"); }
$ cargo run --quiet Hello! Hello! (500ms elapse...) Goodbye! Goodbye!
Huh. So that one doesn't block the executor. Okay then.
But, see bear, I don't know how that all works. It's all just voodoo to me. One function blocks the executor, another doesn't, why? How does it actually work, under the hood?
Well... it's not that complicated really.
The sleep
in std
does just call the sleep
syscall, pretty much.
Yes, that's simple. I like that.
Whereas the sleep
in tokio
returns a Future... that registers a timer
when you first poll it... and it only completes when the deadline is reached.
I uh... none of that made any sense. I don't remember "polling" anything. And you say it... "registers" a timer? As in, global state? And what do you mean by "complete"?
Sigh. Okay, let's start small.
The Future
type.
Future
is just a trait. Anything can be a Future
if it wants to.
We can make Future
s?
Sure! If you want to.
Yeah why not, if it can shed some light on the whole thing, I'll make a future.
So I guess since it's a trait... I know traits. We can just implement it on any type? Like an empty struct?
Yeah! Go ahead!
use std::future::Future; #[tokio::main(flavor = "current_thread")] async fn main() { todo!() } struct MyFuture {} impl Future for MyFuture {}
Ok, the compiler is complaining that not all trait items are implemented: it
wants Output
and poll
. What now?
Now, use the rust-analyzer "Implement missing members" quick fix to generate those.
impl Future for MyFuture { type Output; fn poll( self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll<Self::Output> { todo!() } }
Ew, long types.
...just use rust-analyzer's "Replace qualified path with use" quick fix on those long types.
use std::{ future::Future, pin::Pin, task::{Context, Poll}, }; struct MyFuture {} impl Future for MyFuture { type Output; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { todo!() } }
Okay, better! And now what?
Well, first you need to pick your output type. What do you want your future to, well, output?
Nothing special?
So, nothing. You can just use the empty tuple: ()
.
impl Future for MyFuture { type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { todo!() } }
Okay done! Now what?
Now we await it!
But it's not done?
That's okay, we just await it anyway!
Hum, okay:
#[tokio::main(flavor = "current_thread")] async fn main() { let fut = MyFuture {}; fut.await }
This feels really dumb, but here goes:
$ cargo run --quiet thread 'main' panicked at 'not yet implemented', src/main.rs:19:9 note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
Yeah, well...
...but that's good! That means your future was polled!
Mh? Who polled it?
Hang on, maybe we can get a stack trace...
$ RUST_BACKTRACE=1 cargo run --quiet thread 'main' panicked at 'not yet implemented', src/main.rs:19:9 stack backtrace: 0: rust_begin_unwind at /rustc/cb75ad5db02783e8b0222fee363c5f63f7e2cf5b/library/std/src/panicking.rs:493:5 1: core::panicking::panic_fmt at /rustc/cb75ad5db02783e8b0222fee363c5f63f7e2cf5b/library/core/src/panicking.rs:92:14 2: core::panicking::panic at /rustc/cb75ad5db02783e8b0222fee363c5f63f7e2cf5b/library/core/src/panicking.rs:50:5 3: <manual_futures::MyFuture as core::future::future::Future>::poll at ./src/main.rs:19:9 4: manual_futures::main::{{closure}} at ./src/main.rs:10:5 5: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll at /home/amos/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/future/mod.rs:80:19 6: <core::pin::Pin<P> as core::future::future::Future>::poll at /home/amos/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/future/future.rs:119:9 7: tokio::runtime::basic_scheduler::Inner<P>::block_on::{{closure}}::{{closure}} at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/basic_scheduler.rs:196:62 8: tokio::coop::with_budget::{{closure}} at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/coop.rs:106:9 9: std::thread::local::LocalKey<T>::try_with at /home/amos/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/std/src/thread/local.rs:272:16 10: std::thread::local::LocalKey<T>::with at /home/amos/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/std/src/thread/local.rs:248:9 11: tokio::coop::with_budget at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/coop.rs:99:5 12: tokio::coop::budget at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/coop.rs:76:5 13: tokio::runtime::basic_scheduler::Inner<P>::block_on::{{closure}} at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/basic_scheduler.rs:196:39 14: tokio::runtime::basic_scheduler::enter::{{closure}} at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/basic_scheduler.rs:279:29 15: tokio::macros::scoped_tls::ScopedKey<T>::set at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/macros/scoped_tls.rs:61:9 16: tokio::runtime::basic_scheduler::enter at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/basic_scheduler.rs:279:5 17: tokio::runtime::basic_scheduler::Inner<P>::block_on at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/basic_scheduler.rs:185:9 18: tokio::runtime::basic_scheduler::InnerGuard<P>::block_on at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/basic_scheduler.rs:425:9 19: tokio::runtime::basic_scheduler::BasicScheduler<P>::block_on at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/basic_scheduler.rs:145:24 20: tokio::runtime::Runtime::block_on at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/mod.rs:450:46 21: manual_futures::main at ./src/main.rs:7:1 22: core::ops::function::FnOnce::call_once at /home/amos/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/ops/function.rs:227:5 note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.
Ok, uhh the... tokio scheduler? Is the one that polls in?
Yes! That's what your code is doing.
Mh? It certainly isn't — I haven't even built a tokio::runtime::Runtime
.
Yeah, tokio::main
just did it for you! In fact, your main function here:
#[tokio::main(flavor = "current_thread")] async fn main() { let fut = MyFuture {}; fut.await }
...is roughly equivalent to this:
fn main() { let rt = tokio::runtime::Builder::new_current_thread() .build() .unwrap(); let fut = MyFuture {}; rt.block_on(fut); }
Ah! I see. So... with tokio::main
, main
is an async function, which... is
what exactly?
An async is actually just a synchronous function that returns a future!
So the whole body of main is a Future, too?
Yes!
Okay, okay, well, I don't really want to build a Runtime
myself, I'm fine
with the tokio::main
version for now.
I want to go back to MyFuture
. How do I make it do something?
Well, by returning! You can see that it's supposed to a return a Poll
,
which is an enum with two variants.
The first variant is Ready
, which..
..which I assume we return when the future is complete, okay. So if I do this:
use std::{ future::Future, pin::Pin, task::{Context, Poll}, }; #[tokio::main] async fn main() { let fut = MyFuture {}; println!("Awaiting fut..."); fut.await; println!("Awaiting fut... done!"); } struct MyFuture {} impl Future for MyFuture { type Output = (); fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> { // 👇 Poll::Ready(()) } }
...then it should complete immediately:
$ cargo run --quiet Awaiting fut... Awaiting fut... done!
Correct! And the other variant is Pending
, which you should return if your
future is not quite done yet.
Mh? Then how will it ever complete?
Well, Future::poll
will be called again later!
It will? Okay!
impl Future for MyFuture { type Output = (); fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> { // 👇 Poll::Pending } }
$ cargo run --quiet Awaiting fut... (nothing seems to happen)
Mhhh, bear? The program seems... stuck. Are we blocking the executor again?
Oh, no. It's not blocked. Well, the actual, generated, synchronous main
function is blocked on the asynchronous main
's future — it's waiting for it
to complete, and it's never completing.
So it's not getting polled again? But you just told me.. hang on, let me just add some debug prints:
impl Future for MyFuture { type Output = (); fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> { // 👇 println!("MyFuture::poll()"); Poll::Pending } }
$ cargo run --quiet Awaiting fut... MyFuture::poll() (nothing happens, still. the program continues running forever)
Yeah! It's only getting polled once! What gives, bear? I thought you told me it would get polled again?
Ahh well it only gets polled again if it registered to be polled again. Did you think it would be polled in a loop? Imagine if you're trying to read from a socket and the other peer is not sending anything for five seconds.
The read future would be polled and polled and polled in a busy loop for five seconds. It would consume an entire CPU core! No, futures are only "awakened" when something happens.
Something? What kind of thing?
Something interesting! Like, a timer running out. Or a file being ready to read from. That kind of thing.
Ah. That kind of thing. Okay, so say I want my future to be "awakened" after one second. How would I do that?
Well, you see that cx
argument? Of type Context
? That's how.
Ah, right — the poll
method takes two arguments. The receiver is... some
form of self, so, MyFuture
, and the second argument is a &mut Context
.
Let's see what it has... it has... a waker()
method that returns a
&Waker
! That looks interesting.
It sure does!
And Waker
has... wake
and wake_by_ref
methods. Well, we can't call
wake
at all because it wants to take ownership of Waker
, and all we have
is an immutable reference to it.
Let's try wake_by_ref
I guess?
impl Future for MyFuture { type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { println!("MyFuture::poll()"); cx.waker().wake_by_ref(); Poll::Pending } }
$ cargo run --quiet Awaiting fut... MyFuture::poll() MyFuture::poll() MyFuture::poll() MyFuture::poll() MyFuture::poll() MyFuture::poll() MyFuture::poll() MyFutur^C (that line repeats very quickly, and only stops when we hit Ctrl-C)
Heyyyy. Now it is a busy loop! It gets polled as fast as it can.
Correct!
But that's still not what I want. What I want is... for my future to be polled again after one second.
So? You have a Waker
. You can wake it up yourself.
No, because I return, so by that time it's too late.
So??? Just use a thread! You were just going on about how threads are nice and easy and you understood them, blah blah blah. Just use a thread.
But I only have a reference to a Waker
, I can't just.. oh would you look
at that, it implements Clone
. Okay. OKAY. I guess I'll just spawn a thread.
impl Future for MyFuture { type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { println!("MyFuture::poll()"); let waker = cx.waker().clone(); std::thread::spawn(move || { std::thread::sleep(Duration::from_secs(1)); waker.wake(); }); Poll::Pending } }
$ cargo run --quiet Awaiting fut... MyFuture::poll() (1 second elapses...) MyFuture::poll() (another second elapses...) MyFuture::poll() (a third second elapses...) MyFuture::poll() ^C
AhAH! Interesting. Well, eventually I guess I want to return ready so all I need to do is maintain some state, let's see...
#[tokio::main] async fn main() { let fut = MyFuture::new(); println!("Awaiting fut..."); fut.await; println!("Awaiting fut... done!"); } struct MyFuture { slept: bool, } impl MyFuture { fn new() -> Self { Self { slept: false } } } impl Future for MyFuture { type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { println!("MyFuture::poll()"); match self.slept { false => { // make sure we're polled again in one second let waker = cx.waker().clone(); std::thread::spawn(move || { std::thread::sleep(Duration::from_secs(1)); waker.wake(); }); self.slept = true; Poll::Pending } true => Poll::Ready(()), } } }
No, wait — it's complaining that self
is not mutable. Uhh... I can write it
mut self
if that'd help?
impl Future for MyFuture { type Output = (); // 👇 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { // etc. } }
Ah. That did help. Jolly good.
$ cargo run --quiet Awaiting fut... MyFuture::poll() (one second elapses) MyFuture::poll() Awaiting fut... done!
Heyyyyyyy. That worked! Thanks bear!
You're welcome. See? It's not that complicated!
It really isn't. Hey WAIT A MINUTE. We're just back to using threads, sort of.
I don't want to spin up a new thread every time I want to wait. That seems really excessive. Surely async Rustâ„¢ has something to solve that?
Well yes! In this case, for this executor, it's tokio::time::sleep
.
But that's what we were using in the first pl-
Yes, yes, that's what we were using. But we can embed it within MyFuture
, so
you can still see the machinery.
Machinery, yes, good! Okay, how do I do that.
Well, I don't know if you've noticed but tokio::time::sleep
returns a
concrete type: Sleep
.
AhAH! So if I just store that as a field of MyFuture
, all I have to do is
poll it, from within my own poll
method?
That's the gist, yes.
use tokio::time::Sleep; struct MyFuture { sleep: Sleep, } impl MyFuture { fn new() -> Self { Self { sleep: tokio::time::sleep(Duration::from_secs(1)), } } } impl Future for MyFuture { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { println!("MyFuture::poll()"); self.sleep.poll(cx) } }
Ah, great! It even has the same exact return type as MyFuture::poll
—
Poll<()>
, so the method body is really short and sweet.
There's just uh... just one tiny problem. It doesn't compile:
$ cargo check Checking manual-futures v0.1.0 (/home/amos/ftl/manual-futures) error[E0599]: no method named `poll` found for struct `Sleep` in the current scope --> src/main.rs:35:20 | 35 | self.sleep.poll(cx) | ^^^^ method not found in `Sleep` error: aborting due to previous error For more information about this error, try `rustc --explain E0599`. error: could not compile `manual-futures` To learn more, run the command again with --verbose.
Ohhh! I know! poll
is a method on trait Future
, so I need to import it to
be able to call it... wait no, we've already imported it.
Beaaaaaaaaaar what's wrong with my code?
Look at the receiver! It's not self
, it's not &self
, it's not even &mut self
.
It's self: Pin<&mut Self>
.
Mh. Okay. How do we build a Pin
... it has a new
method. Let's try that.
impl Future for MyFuture { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { println!("MyFuture::poll()"); // 👇 let mut sleep = Pin::new(&mut self.sleep); sleep.poll(cx) } }
$ cargo check Checking manual-futures v0.1.0 (/home/amos/ftl/manual-futures) error[E0277]: `PhantomPinned` cannot be unpinned --> src/main.rs:35:25 | 35 | let mut sleep = Pin::new(&mut self.sleep); | ^^^^^^^^ within `tokio::time::driver::sleep::_::__Origin<'_>`, the trait `Unpin` is not implemented for `PhantomPinned` | = note: required because it appears within the type `tokio::time::driver::entry::TimerEntry` = note: required because it appears within the type `tokio::time::driver::sleep::_::__Origin<'_>` = note: required because of the requirements on the impl of `Unpin` for `Sleep` = note: required by `Pin::<P>::new`
That does not work. Bear, halp. What's happening?
Bwahahahaha. Just Box::pin
it!
Just wh-?
In the constructor, just wrap sleep()
in Box::pin()
.
Uh, okay... that returns a Pin<Box<Sleep>>
so I guess I'll have to change the
field type too...
struct MyFuture { sleep: Pin<Box<Sleep>>, } impl MyFuture { fn new() -> Self { Self { sleep: Box::pin(tokio::time::sleep(Duration::from_secs(1))), } } } impl Future for MyFuture { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { println!("MyFuture::poll()"); let sleep = Pin::new(&mut self.sleep); sleep.poll(cx) } }
Oh hey, that compiles.
$ cargo run --quiet Awaiting fut... MyFuture::poll() (one second elapses...) MyFuture::poll() Awaiting fut... done!
Oh heyy! That runs!!
Yes! And you honestly don't need to build that Pin
yourself, because
Pin<Box<Sleep>>
is already pinned — so you're building a Pin<&mut Pin<Box<Sleep>>>
.
You can just call sleep.as_mut()
, and poll that.
impl Future for MyFuture { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { println!("MyFuture::poll()"); self.sleep.as_mut().poll(cx) } }
Ah, cleaner. That still works.
You can even go one step further and add the futures
crate so you can use
FutureExt::poll_unpin
, because polling an Unpin
future is a common
operation.
I can?
# in Cargo.toml [dependencies] futures = "0.3"
use futures::FutureExt; impl Future for MyFuture { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { println!("MyFuture::poll()"); self.sleep.poll_unpin(cx) } }
Neat! That still works.
At this point though, I can't help but wonder — do we really need to understand all this? Like, it's neat that we did it once, don't get me wrong.
But in the real world, is there ever a point in manually implementing futures
like that? Don't we have the async
/ await
keywords so that we don't have
to?
Well! In this case, yes, it's gratuitous. But let's say we want to do something different...
Let's say we want to make a type that implements tokio's AsyncRead
interface (so that we can use it anywhere we would use another reader), but
that artificially introduces some delay between each read.
Mhyes, okay. Can't we just use an async
method here?
You'd think so, but no. Not currently. See, async Rust is still in its awkward teenage phase. Almost everything is there, but it can be a bit of a challenge to live with.
What uh.. what does that mean?
Well, AsyncRead
is a trait. And traits can't have async methods. (At the
time of this writing, ie. with Rust 1.51).
Okay, so just have them return a Future
then? That's all async methods are,
right?
Right, yes, but! We're always reading into something. A buffer. In synchronous land, when we do this:
let mut buf = vec![0u8; 1024]; let n = something.read(&mut buf)?;
...the read
method borrows buf
mutably until it's done with it.
But the problem with async...
...is that we return early if we're not quite ready yet. We just return
Poll::Pending
and wait until we're awakened by the scheduler. Ohhhhh and
just because we return doesn't mean that we're done with the buffer!
...rude, but yes.
So that's not how AsyncRead
works. It does give you a buffer, but you're
supposed to either:
- Not be ready, and so you subscribe to be awakened later, and return
Poll::Pending
- Be ready, and then you fill part (or all) of the buffer, and return
Poll::Ready
And if we're not ready, what do we do with the buffer?
Nothing. Especially not write into it. It would be disregarded anyway.
Gotcha. Okay, so, we know how to sleep by polling a Sleep
future, I think
we know enough to cook us a slow reader...
It's going to be a struct (it's always a struct), and it'll work with any kind of reader.
struct SlowRead<R> { reader: R, } impl<R> SlowRead<R> { fn new(reader: R) -> Self { Self { reader } } }
And then we just have to implement AsyncRead
for SlowRead
itself! But only
if its type parameter R
also implements AsyncRead
:
use tokio::io::{AsyncRead, ReadBuf}; impl<R> AsyncRead for SlowRead<R> where R: AsyncRead, { fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<std::io::Result<()>> { self.reader.poll_read(cx, buf) } }
Okay! Well, this doesn't build:
$ cargo check Checking manual-futures v0.1.0 (/home/amos/ftl/manual-futures) error[E0599]: no method named `poll_read` found for type parameter `R` in the current scope --> src/main.rs:29:21 | 29 | self.reader.poll_read(cx, buf) | ^^^^^^^^^ method not found in `R` | = help: items from traits can only be used if the type parameter is bounded by the trait help: the following trait defines an item `poll_read`, perhaps you need to restrict type parameter `R` with it: | 20 | impl<R: AsyncRead> AsyncRead for SlowRead<R> | ^^^^^^^^^^^^
And the compiler is getting confused. We're already constraining R
to
implement AsyncRead
. I guess this is just like futures, where the receiver isn't
quit ri-
Yup. It's exactly like that.
So we need to pin it before we read fr-
Yes. You need to pin it.
Okay, okay, I'm pinning it. Box-pinning it even.
struct SlowRead<R> { reader: Pin<Box<R>>, } impl<R> SlowRead<R> { fn new(reader: R) -> Self { Self { reader: Box::pin(reader), } } } impl<R> AsyncRead for SlowRead<R> where R: AsyncRead, { fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<std::io::Result<()>> { self.reader.as_mut().poll_read(cx, buf) } }
Okay, had to sneak an as_mut
in there, but it builds. What now?
Well, now we try reading! First, without SlowRead
...
use std::{ pin::Pin, task::{Context, Poll}, }; use tokio::{ fs::File, io::{AsyncRead, AsyncReadExt, ReadBuf}, time::Instant, }; #[tokio::main] async fn main() -> Result<(), tokio::io::Error> { let mut buf = vec![0u8; 128 * 1024]; let mut f = File::open("/dev/urandom").await?; let before = Instant::now(); f.read_exact(&mut buf).await?; println!("Read {} bytes in {:?}", buf.len(), before.elapsed()); Ok(()) } // omitted: `SlowRead` declaration and implementation
$ cargo run --quiet Read 131072 bytes in 3.2748ms
...and now with SlowRead
!
#[tokio::main] async fn main() -> Result<(), tokio::io::Error> { let mut buf = vec![0u8; 128 * 1024]; // 👇 let mut f = SlowRead::new(File::open("/dev/urandom").await?); let before = Instant::now(); f.read_exact(&mut buf).await?; println!("Read {} bytes in {:?}", buf.len(), before.elapsed()); Ok(()) }
$ cargo run --quiet Read 131072 bytes in 3.3134ms
Well, it's not... really slower.
Well we're just forwarding poll_read
calls straight to the inner reader for
now. We're not actually slowing it down.
Oh right! So I guess we can just call std::thread::sleep
in poll
and...
No, no! That would bl-
...block the executor, right. Oh, wait, poll_read
gives us a &mut Context
as well, so we could poll
a Sleep
from there, right?
Exactly! And Sleep
has a handy reset
method, so you can use it several
times.
As in sleep several times in a row with the same future? Sounds good!
I guess we'll need to pin that one too...
struct SlowRead<R> { reader: Pin<Box<R>>, // 👇 sleep: Pin<Box<Sleep>>, } impl<R> SlowRead<R> { fn new(reader: R) -> Self { Self { reader: Box::pin(reader), // 👇 sleep: Box::pin(tokio::time::sleep(Default::default())), } } } impl<R> AsyncRead for SlowRead<R> where R: AsyncRead, { fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<std::io::Result<()>> { // 👇 match self.sleep.poll_unpin(cx) { Poll::Ready(_) => { // whenever `sleep` completes, reset it... self.sleep .as_mut() .reset(Instant::now() + Duration::from_millis(25)); // and poll the inner reader. self.reader.as_mut().poll_read(cx, buf) } Poll::Pending => Poll::Pending, } } }
$ cargo run --quiet Read 131072 bytes in 396.3634ms
Woo!! We did a thing!
We did!
We're still boxing everything though.
Yes, that's "easy mode async". Just like Arc<Mutex<T>>
is "easy mode
lifetimes", sort of. It's fine though! It does work!
Yeah, it does appear to work. But I have no idea why anything needs to be
pinned. Nor what Unpin
is. Let alone phantoms, which uhh..
No, and you didn't need to! You only wrote safe code, and you made sure the
types matched up, even if you needed to box them or sneak in an as_mut
call
here and there. And everything worked.
Yes, right — but I'm assuming that Pin
exists for a reason. It's not just
complexity for complexity's sake, it's inherent complexity. But for what?
Well... follow me.
We can leave our Box behind
Okay, let's start with R
! You don't need to store it pinned.
I don't? So just do this?
struct SlowRead<R> { // 👇 reader: R, sleep: Pin<Box<Sleep>>, } impl<R> SlowRead<R> { fn new(reader: R) -> Self { Self { // 👇 reader, sleep: Box::pin(tokio::time::sleep(Default::default())), } } }
But then...
$ cargo check Checking manual-futures v0.1.0 (/home/amos/ftl/manual-futures) error[E0599]: no method named `poll_read` found for type parameter `R` in the current scope --> src/main.rs:52:29 | 52 | self.reader.poll_read(cx, buf) | ^^^^^^^^^ method not found in `R` | = help: items from traits can only be used if the type parameter is bounded by the trait help: the following traits define an item `poll_read`, perhaps you need to restrict type parameter `R` with one of them: | 38 | impl<R: futures::AsyncRead> AsyncRead for SlowRead<R> | ^^^^^^^^^^^^^^^^^^^^^ 38 | impl<R: tokio::io::AsyncRead> AsyncRead for SlowRead<R> | ^^^^^^^^^^^^^^^^^^^^^^^
(...and the compiler gets confused again. Again, we've already constrained R
to tokio::io::AsyncRead
).
The problem is, I have an R
, and I need a Pin<&mut R>
. And I can't use
Pin::new
apparently, at least that didn't work last time.
But maybe it'll work this time?
Okay, sure:
impl<R> AsyncRead for SlowRead<R> where R: AsyncRead, { fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<std::io::Result<()>> { match self.sleep.poll_unpin(cx) { Poll::Ready(_) => { self.sleep .as_mut() .reset(Instant::now() + Duration::from_millis(25)); // 👇 Pin::new(&mut self.reader).poll_read(cx, buf) } Poll::Pending => Poll::Pending, } } }
$ cargo check Checking manual-futures v0.1.0 (/home/amos/ftl/manual-futures) error[E0277]: `R` cannot be unpinned --> src/main.rs:52:26 | 52 | Pin::new(&mut self.reader).poll_read(cx, buf) | ^^^^^^^^^^^^^^^^ the trait `Unpin` is not implemented for `R` | = note: required by `Pin::<P>::new` help: consider further restricting this bound | 40 | R: AsyncRead + Unpin, | ^^^^^^^
No, see, it doesn't work eith-
Ah, but there's a hint!
Consider... further restricting this bound. We can restrict our AsyncRead
implementation only for R
types that can be unpinned?
Yes! ie., we need R
to implement the Unpin
trait, or in other words,
we need R: Unpin
.
Okay, sure... I mean, I don't even know if that's worth exploring, who knows
if the concrete R
, which in this case is... tokio::fs::File
, is even
Unpin
, but okay, sure:
impl<R> AsyncRead for SlowRead<R> // 👇 where R: AsyncRead + Unpin, { // omitted: fn `poll_read` }
$ cargo run --quiet Read 131072 bytes in 394.8455ms
Oh hey! That worked!
It sure did! tokio::fs::File
does implement Unpin
, so it can be unpinned.
Well I'm still not sure what "can be unpinned" even means, but if you say so, okay.
Now what about Sleep? Can we just do the same trick?
First we remove the Pin<Box<T>>
/ Box::pin
:
struct SlowRead<R> { reader: R, // 👇 sleep: Sleep, } impl<R> SlowRead<R> { fn new(reader: R) -> Self { Self { reader, // 👇 sleep: tokio::time::sleep(Default::default()), } } }
And then... well then we're in a pickle:
error[E0277]: `PhantomPinned` cannot be unpinned --> src/main.rs:47:26 | 47 | match self.sleep.poll_unpin(cx) { | ^^^^^^^^^^ within `tokio::time::driver::sleep::_::__Origin<'_>`, the trait `Unpin` is not implemented for `PhantomPinned` | = note: required because it appears within the type `tokio::time::driver::entry::TimerEntry` = note: required because it appears within the type `tokio::time::driver::sleep::_::__Origin<'_>` = note: required because of the requirements on the impl of `Unpin` for `Sleep`
Yes! tokio::time::Sleep
does not implement Unpin
, ie. it cannot be unpinned.
But then how... because clearly Pin::new
is not going to work either in
this case:
impl<R> AsyncRead for SlowRead<R> where R: AsyncRead + Unpin, { fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<std::io::Result<()>> { // 👇 let sleep = Pin::new(&mut self.sleep); // 👇 match sleep.poll(cx) { Poll::Ready(_) => { // 👇 let sleep = Pin::new(&mut self.sleep); // 👇 sleep.reset(Instant::now() + Duration::from_millis(25)); Pin::new(&mut self.reader).poll_read(cx, buf) } Poll::Pending => Poll::Pending, } } }
error[E0277]: `PhantomPinned` cannot be unpinned --> src/main.rs:47:21 | 47 | let sleep = Pin::new(&mut self.sleep); | ^^^^^^^^ within `tokio::time::driver::sleep::_::__Origin<'_>`, the trait `Unpin` is not implemented for `PhantomPinned` | = note: required because it appears within the type `tokio::time::driver::entry::TimerEntry` = note: required because it appears within the type `tokio::time::driver::sleep::_::__Origin<'_>` = note: required because of the requirements on the impl of `Unpin` for `Sleep` = note: required by `Pin::<P>::new`
So then what do we do?
Well, there's another constructor for Pin
... you can always call Pin::new_unchecked
.
Mh? Like that?
impl<R> AsyncRead for SlowRead<R> where R: AsyncRead + Unpin, { fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<std::io::Result<()>> { // 👇 let sleep = Pin::new_unchecked(&mut self.sleep); match sleep.poll(cx) { Poll::Ready(_) => { // 👇 let sleep = Pin::new_unchecked(&mut self.sleep); sleep.reset(Instant::now() + Duration::from_millis(25)); Pin::new(&mut self.reader).poll_read(cx, buf) } Poll::Pending => Poll::Pending, } } }
But..
error[E0133]: call to unsafe function is unsafe and requires unsafe function or block --> src/main.rs:47:21 | 47 | let sleep = Pin::new_unchecked(&mut self.sleep); | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ call to unsafe function | = note: consult the function's documentation for information on how to avoid undefined behavior
Wait wait wait wait. It's unsafe? Eww!
It is! And that means there are some invariants that the compiler will no longer
be enforcing for us, so we'll have to "just be careful". Which is something we
never have to do in Rust, except for when we use unsafe
.
Ughhh. Do we have to?
For the purpose of this exercise, yes. I'm about to tell you the rules we should follow. Are you ready?
Shoot.
Once we pin something, ie. once we construct a Pin<&mut T>
of it, we can
never use it unpinned (ie, as &mut T
) ever again, unless it implements
Unpin
.
But Sleep
doesn't implement Unpin
.
Yes! Which means that, since we need to use it as a Pin<&mut T>
once, we
can only ever use it as a Pin<&mut T>
.
Okay, I guess we can do that. I think I understand the rules, so... I'll just add some unsafe blocks around those:
impl<R> AsyncRead for SlowRead<R> where R: AsyncRead + Unpin, { fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<std::io::Result<()>> { let sleep = unsafe { Pin::new_unchecked(&mut self.sleep) }; match sleep.poll(cx) { Poll::Ready(_) => { let sleep = unsafe { Pin::new_unchecked(&mut self.sleep) }; sleep.reset(Instant::now() + Duration::from_millis(25)); Pin::new(&mut self.reader).poll_read(cx, buf) } Poll::Pending => Poll::Pending, } } }
Uhh...
error[E0596]: cannot borrow data in a dereference of `Pin<&mut SlowRead<R>>` as mutable --> src/main.rs:47:49 | 47 | let sleep = unsafe { Pin::new_unchecked(&mut self.sleep) }; | ^^^^^^^^^^^^^^^ cannot borrow as mutable | = help: trait `DerefMut` is required to modify through a dereference, but it is not implemented for `Pin<&mut SlowRead<R>>`
Oh nooo. What now. Even a line that used to work doesn't work anymore:
error[E0596]: cannot borrow data in a dereference of `Pin<&mut SlowRead<R>>` as mutable --> src/main.rs:52:26 | 52 | Pin::new(&mut self.reader).poll_read(cx, buf) | ^^^^^^^^^^^^^^^^ cannot borrow as mutable | = help: trait `DerefMut` is required to modify through a dereference, but it is not implemented for `Pin<&mut SlowRead<R>>`
This used to work! We said R: Unpin
, so R
can be unpinned, so we should be
able to pass it to Pin::new
, the safe constructor for Pin
.
Ah, yes, but back then, Self
was Unpin
too.
Mhhhhh because it only had fields that were also Unpin
?
Yes! Didn't you think it was odd that we could do &mut self.reader
, even though
self
was a Pin<&mut Self>
and not a &mut Self
?
Honestly I was just happy it worked?
Well, it only worked because we had Self: Unpin
, and so going from
Pin<&mut Self>
to &mut Self
was allowed — and so that's what it did,
transparently.
But now, Self
is no longer Unpin
, because it contains a Sleep
, and so
we cannot go from Pin<&mut Self>
to &mut Self
.
...we can't?
Well, not safely. We can do so unsafely.
Let me guess, Pin::new_unchecked
?
Actually no! That wouldn't help. To obtain a Pin<&mut R>
, we must obtain a
&mut R
, and to obtain a &mut R
, we would need a &mut Self
, so we're back
to square one.
Okay, some other unsafe method?
Yes! Pin::map_unchecked
. Here, let me show you:
impl<R> AsyncRead for SlowRead<R> where R: AsyncRead + Unpin, { fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<std::io::Result<()>> { let sleep = unsafe { self.as_mut().map_unchecked_mut(|this| &mut this.sleep) }; match sleep.poll(cx) { Poll::Ready(_) => { let sleep = unsafe { self.as_mut().map_unchecked_mut(|this| &mut this.sleep) }; sleep.reset(Instant::now() + Duration::from_millis(25)); let reader = unsafe { self.as_mut().map_unchecked_mut(|this| &mut this.reader) }; reader.poll_read(cx, buf) } Poll::Pending => Poll::Pending, } } }
There!
Uhhh it still doesn't build though:
error[E0277]: `PhantomPinned` cannot be unpinned --> src/main.rs:18:5 | 18 | f.read_exact(&mut buf).await?; | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ within `tokio::time::driver::sleep::_::__Origin<'_>`, the trait `Unpin` is not implemented for `PhantomPinned` | = note: required because it appears within the type `tokio::time::driver::entry::TimerEntry` = note: required because it appears within the type `tokio::time::driver::sleep::_::__Origin<'_>` = note: required because of the requirements on the impl of `Unpin` for `Sleep` = note: required because it appears within the type `SlowRead<tokio::fs::File>` = note: required because of the requirements on the impl of `futures::Future` for `tokio::io::util::read_exact::ReadExact<'_, SlowRead<tokio::fs::File>>` = note: required by `futures::Future::poll`
Ah, right. read_exact
wants its receiver to be Unpin
, and SlowRead
is
no longer Unpin
(because of Sleep
, remember?).
So... what do we do?
In this case, we can just pin our SlowRead
before reading from it. But
remember — once we use it pinned, we can never use it unpinned again.
So uhh... like that?
#[tokio::main] async fn main() -> Result<(), tokio::io::Error> { let mut buf = vec![0u8; 128 * 1024]; let mut f = SlowRead::new(File::open("/dev/urandom").await?); let before = Instant::now(); let mut f = unsafe { Pin::new_unchecked(&mut f) }; f.read_exact(&mut buf).await?; println!("Read {} bytes in {:?}", buf.len(), before.elapsed()); Ok(()) }
Exactly like that! In fact, despite the unsafe
, this is one of the safest
ways to use Pin::new_unchecked
, because we're shadowing the previous f
(of type SlowRead
) with our new value of type Pin<&mut SlowRead>
, which
means we can never accidentally use it unpinned.
Okay, let's see if it still works...
$ cargo run --quiet Read 131072 bytes in 396.4873ms
It does. Good.
Okay. I don't feel good about this code at all though.
And you're right to! This is dangerous territory, because we have to uphold some guarantees ourselves.
We can clean it up a little though. Instead of pin-projecting fields one by one, we can do it all in one go.
impl<R> AsyncRead for SlowRead<R> where R: AsyncRead + Unpin, { fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<std::io::Result<()>> { // pin-project both fields let (mut sleep, reader) = unsafe { let this = self.get_unchecked_mut(); ( Pin::new_unchecked(&mut this.sleep), Pin::new_unchecked(&mut this.reader), ) }; match sleep.as_mut().poll(cx) { Poll::Ready(_) => { sleep.reset(Instant::now() + Duration::from_millis(25)); reader.poll_read(cx, buf) } Poll::Pending => Poll::Pending, } } }
Ah, more unsafe functions, good.
Well, yes — but at least we only have one unsafe block. But yes, we're in "just be careful" territory. As long as we never use either field unpinned, there will be no undefined behavior, and we're good to go.
Undefined behavior? Like what?
Like... anything at all. The compiler, and the executor, are allowed to do literally anything, if there's undefined behavior.
Okay but... what? Can we get an example?
Sure. Consider this program:
use futures::Future; use std::{mem::swap, pin::Pin, task::Poll, time::Duration}; use tokio::{macros::support::poll_fn, time::sleep}; #[tokio::main] async fn main() { let mut sleep1 = sleep(Duration::from_secs(1)); let mut sleep2 = sleep(Duration::from_secs(1)); { // let's use `sleep1` pinned exactly _once_ let mut sleep1 = unsafe { Pin::new_unchecked(&mut sleep1) }; // this creates a future whose poll method is the closure argument poll_fn(|cx| { // we poll `sleep1` once, throwing away the result... let _ = sleep1.as_mut().poll(cx); // ...and resolve immediately Poll::Ready(()) }) .await; } // then, let's use `sleep1` unpinned: swap(&mut sleep1, &mut sleep2); // by this point, `sleep1` has switched places with `sleep2` // finally, let's await both sleep1 and sleep2 sleep1.await; sleep2.await; }
What do you think is going to happen?
Mhhh. Well, we're failing to maintain pinning invariants...
Look at you using words!!
...so, well, anything could happen? It could just work, but then I guess
Sleep
would be Unpin
in the first place. Or it could crash... or anything
else may happen.
Try it!
$ cargo run --quiet thread 'tokio-runtime-worker' panicked at 'assertion failed: cur_state < STATE_MIN_VALUE', /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/time/driver/entry.rs:174:13 note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace (the program hangs forever, ie. it never exits)
Huh. Let's try it again with backtraces enabled:
$ RUST_BACKTRACE=1 cargo run --quiet thread 'tokio-runtime-worker' panicked at 'assertion failed: cur_state < STATE_MIN_VALUE', /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/time/driver/entry.rs:174:13 stack backtrace: 0: rust_begin_unwind at /rustc/cb75ad5db02783e8b0222fee363c5f63f7e2cf5b/library/std/src/panicking.rs:493:5 1: core::panicking::panic_fmt at /rustc/cb75ad5db02783e8b0222fee363c5f63f7e2cf5b/library/core/src/panicking.rs:92:14 2: core::panicking::panic at /rustc/cb75ad5db02783e8b0222fee363c5f63f7e2cf5b/library/core/src/panicking.rs:50:5 3: tokio::time::driver::entry::StateCell::mark_pending at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/time/driver/entry.rs:174:13 4: tokio::time::driver::entry::TimerHandle::mark_pending at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/time/driver/entry.rs:591:15 5: tokio::time::driver::wheel::Wheel::process_expiration at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/time/driver/wheel/mod.rs:251:28 6: tokio::time::driver::wheel::Wheel::poll at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/time/driver/wheel/mod.rs:163:21 7: tokio::time::driver::<impl tokio::time::driver::handle::Handle>::process_at_time at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/time/driver/mod.rs:269:33 8: tokio::time::driver::<impl tokio::time::driver::handle::Handle>::process at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/time/driver/mod.rs:258:9 9: tokio::time::driver::Driver<P>::park_internal at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/time/driver/mod.rs:247:9 10: <tokio::time::driver::Driver<P> as tokio::park::Park>::park at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/time/driver/mod.rs:398:9 11: <tokio::park::either::Either<A,B> as tokio::park::Park>::park at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/park/either.rs:30:29 12: <tokio::runtime::driver::Driver as tokio::park::Park>::park at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/driver.rs:198:9 13: tokio::runtime::park::Inner::park_driver at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/park.rs:205:9 14: tokio::runtime::park::Inner::park at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/park.rs:137:13 15: <tokio::runtime::park::Parker as tokio::park::Park>::park at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/park.rs:93:9 16: tokio::runtime::thread_pool::worker::Context::park_timeout at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/thread_pool/worker.rs:422:13 17: tokio::runtime::thread_pool::worker::Context::park at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/thread_pool/worker.rs:398:20 18: tokio::runtime::thread_pool::worker::Context::run at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/thread_pool/worker.rs:328:24 19: tokio::runtime::thread_pool::worker::run::{{closure}} at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/thread_pool/worker.rs:303:17 20: tokio::macros::scoped_tls::ScopedKey<T>::set at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/macros/scoped_tls.rs:61:9 21: tokio::runtime::thread_pool::worker::run at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/thread_pool/worker.rs:300:5 22: tokio::runtime::thread_pool::worker::Launch::launch::{{closure}} at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/thread_pool/worker.rs:279:45 23: <tokio::runtime::blocking::task::BlockingTask<T> as core::future::future::Future>::poll at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/blocking/task.rs:42:21 24: tokio::runtime::task::core::CoreStage<T>::poll::{{closure}} at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/task/core.rs:235:17 25: tokio::loom::std::unsafe_cell::UnsafeCell<T>::with_mut at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/loom/std/unsafe_cell.rs:14:9 26: tokio::runtime::task::core::CoreStage<T>::poll at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/task/core.rs:225:13 27: tokio::runtime::task::harness::poll_future::{{closure}} at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/task/harness.rs:422:23 28: <std::panic::AssertUnwindSafe<F> as core::ops::function::FnOnce<()>>::call_once at /home/amos/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/std/src/panic.rs:322:9 29: std::panicking::try::do_call at /home/amos/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/std/src/panicking.rs:379:40 30: __rust_try 31: std::panicking::try at /home/amos/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/std/src/panicking.rs:343:19 32: std::panic::catch_unwind at /home/amos/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/std/src/panic.rs:396:14 33: tokio::runtime::task::harness::poll_future at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/task/harness.rs:409:19 34: tokio::runtime::task::harness::Harness<T,S>::poll_inner at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/task/harness.rs:89:9 35: tokio::runtime::task::harness::Harness<T,S>::poll at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/task/harness.rs:59:15 36: tokio::runtime::task::raw::poll at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/task/raw.rs:104:5 37: tokio::runtime::task::raw::RawTask::poll at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/task/raw.rs:66:18 38: tokio::runtime::task::Notified<S>::run at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/task/mod.rs:171:9 39: tokio::runtime::blocking::pool::Inner::run at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/blocking/pool.rs:278:17 40: tokio::runtime::blocking::pool::Spawner::spawn_thread::{{closure}} at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/blocking/pool.rs:258:17 note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.
Mh, sure enough, it's deep in the internals of tokio.
The actual assertion failing is:
// (in tokio code, in `src/time/driver/entry.rs`) debug_assert!(cur_state < STATE_MIN_VALUE);
Huh, a debug_assert!
. That means that in production...
$ RUST_BACKTRACE=1 cargo run --quiet --release (program never outputs anything, never exits either)
Hah. I guess they're not fucking around about the "undefined" part of "undefined behavior" huh?
No they are not. But let's think about why this fails. We already tried to
implement our own sleep
by spawning another thread that calls Waker::wake
...
...but we also agreed that that's definitely not how tokio's sleep
works.
There's just no way it spawns one thread per sleep
invocation. So I guess...
it has some sort of timer system, and the first time a Sleep
future is polled,
it register itself with it...
Yes! And when it's dropped, it deregisters itself from it.
Right... and I'm assuming the timer system itself must point to the Sleep
future somehow because well... when the timer runs out, it must know which
future to wake up.
Yes! And if the future moves around...
...and it's not dropped... then the timer system points to the wrong memory location!
Exactly! And that's precisely what happened in that naughty program.
sleep1
registered itself with the timer system, and then it switched places
with sleep2
.
Oh, so when the timer ran out, sleep2
was awakened instead?
Yes! And it didn't expect to be awakened. Even worse — when we awaited
sleep2
, it registered itself with the timer system, which created a second
entry that pointed to the same address: what used to be sleep1
, but was by
that point sleep2
.
And that's what made it hang?
Possibly. We'd have to read a bunch more tokio internals to determine exactly what kind of behavior we've triggered. Suffice to say: we don't want to do that.
I see. Ohhh and because some futures register themselves with some system of the executor, which points back to them, those futures should never be moved, and that's why they need to be pinned?
Yes! And everything is marked backwards, kind of. Every Future
gets a pinned
version of itself, and it can only be unpinned if it implements Unpin
.
Okay okay okay. But wait, then why does Box::pin
...?
Well, precisely because that's a heap allocation! If you're holding a
Sleep
on the stack, and you pass it around to another function, or store it
in a struct, or whatever, it actually moves — its address changes.
But if you're holding a Box<Sleep>
, well, then you're only holding a pointer
to a Sleep
that lives somewhere in heap-allocated memory. That somewhere will
never change, ie. the Sleep
itself will never move. The pointer to Sleep
can be passed around, and everything is fine.
I see. So if we go back to our AsyncRead
example... SlowRead
can never be
Unpin
?
It can't! Because it contains a Sleep
, which is not Unpin
. Unless we go
back to wrapping the Sleep
in a Pin<Box<T>>>
.
Oh, in which case the Sleep
wouldn't really move when the SlowRead
does,
only a pointer to it would.
Exactly.
So what should we do? Should we be holding a Pin<Box<Sleep>>
?
Well, it really depends what you want to do. It's true that it's not really
convenient to use SlowRead
right now, because we have to pin it first:
#[tokio::main] async fn main() -> Result<(), tokio::io::Error> { let mut buf = vec![0u8; 128 * 1024]; let mut f = SlowRead::new(File::open("/dev/urandom").await?); let before = Instant::now(); let mut f = unsafe { Pin::new_unchecked(&mut f) }; f.read_exact(&mut buf).await?; println!("Read {} bytes in {:?}", buf.len(), before.elapsed()); Ok(()) }
But, first of all — we don't have to write that unsafe
code ourselves.
The pin_utils
crate provides a safe macro to do exactly that: pin an owned
value to the stack, shadowing its previous name.
# in `Cargo.toml` [dependencies] pin-utils = "0.1.0"
#[tokio::main] async fn main() -> Result<(), tokio::io::Error> { let mut buf = vec![0u8; 128 * 1024]; let f = SlowRead::new(File::open("/dev/urandom").await?); let before = Instant::now(); // 👇 pin_utils::pin_mut!(f); f.read_exact(&mut buf).await?; println!("Read {} bytes in {:?}", buf.len(), before.elapsed()); Ok(()) }
(Note that tokio ships with a similar macro, tokio::pin
).
Secondly — if we find ourselves wanting to unpin a SlowRead
, so we can pass
it around and whatnot, we can just move the entire SlowRead to the heap first.
That's safe too:
#[tokio::main] async fn main() -> Result<(), tokio::io::Error> { let mut buf = vec![0u8; 128 * 1024]; let f = SlowRead::new(File::open("/dev/urandom").await?); let before = Instant::now(); // 👇 let mut f = Box::pin(f); f.read_exact(&mut buf).await?; println!("Read {} bytes in {:?}", buf.len(), before.elapsed()); Ok(()) }
And finally, I'd just like to note that here, we're wrapping the whole
File
in our SlowRead
, but we don't have to! We could just as well wrap
a mutable reference to it, which also implements AsyncRead
.
That way, we can use the File
for something else after:
#[tokio::main] async fn main() -> Result<(), tokio::io::Error> { let mut buf = vec![0u8; 128 * 1024]; let mut f = File::open("/dev/urandom").await?; let sr = SlowRead::new(&mut f); pin_utils::pin_mut!(sr); let before = Instant::now(); sr.read_exact(&mut buf).await?; println!("Read {} bytes in {:?}", buf.len(), before.elapsed()); let before = Instant::now(); f.read_exact(&mut buf).await?; println!("Read {} bytes in {:?}", buf.len(), before.elapsed()); Ok(()) }
$ cargo run --quiet Read 131072 bytes in 395.2787ms Read 131072 bytes in 5.1451ms
...but that makes it harder to pass around, right?
Right, yes! If we take a &mut File
, we can pass it to a function for
example, but storing it in a struct makes that struct immovable.
Usual lifetime business. Another option would be to add an into_inner
method to our SlowRead
, so we can give back the inner reader:
impl<R> SlowRead<R> where R: Unpin, { fn into_inner(self) -> R { self.reader } }
...but that's dangerous business, because that means we need to use the
SlowRead
unpinned after we've used it pinned. Which would normally be
forbidden... except it's only !Unpin
(not Unpin) because of Sleep
, which
gets dropped in that same method (by virtue of us taking self
, ie. taking
ownership of the SlowRead
, and letting it fall out of scope).
In fact, pin_utils::pin_mut!
prevents us from doing that altogether:
#[tokio::main] async fn main() -> Result<(), tokio::io::Error> { let mut buf = vec![0u8; 128 * 1024]; let f = File::open("/dev/urandom").await?; let f = SlowRead::new(f); { pin_utils::pin_mut!(f); let before = Instant::now(); f.read_exact(&mut buf).await?; println!("Read {} bytes in {:?}", buf.len(), before.elapsed()); } // 👇 can't do that! let mut f = f.into_inner(); let before = Instant::now(); f.read_exact(&mut buf).await?; println!("Read {} bytes in {:?}", buf.len(), before.elapsed()); Ok(()) }
cargo check Checking manual-futures v0.1.0 (/home/amos/ftl/manual-futures) error[E0382]: use of moved value: `f` --> src/main.rs:27:17 | 18 | let f = SlowRead::new(f); | - move occurs because `f` has type `SlowRead<tokio::fs::File>`, which does not implement the `Copy` trait 19 | { 20 | pin_utils::pin_mut!(f); | ----------------------- value moved here ... 27 | let mut f = f.into_inner(); | ^ value used here after move
...and Pin<Box<T>>
wouldn't even let us access the T
we need to call
into_inner
in the first place!
Correct usage would be tricky:
#[tokio::main] async fn main() -> Result<(), tokio::io::Error> { let mut buf = vec![0u8; 128 * 1024]; let f = File::open("/dev/urandom").await?; let mut f = SlowRead::new(f); { let mut f = unsafe { Pin::new_unchecked(&mut f) }; let before = Instant::now(); f.read_exact(&mut buf).await?; println!("Read {} bytes in {:?}", buf.len(), before.elapsed()); }; let mut f = f.into_inner(); let before = Instant::now(); f.read_exact(&mut buf).await?; println!("Read {} bytes in {:?}", buf.len(), before.elapsed()); Ok(()) }
...so it's probably not a good pattern. We could probably come up with a safer API, like that:
use std::{ future::Future, pin::Pin, task::{Context, Poll}, time::Duration, }; use tokio::{ fs::File, io::{AsyncRead, AsyncReadExt, ReadBuf}, time::{Instant, Sleep}, }; struct SlowRead<R> { // 👇 now optional! reader: Option<R>, sleep: Sleep, } impl<R> SlowRead<R> { fn new(reader: R) -> Self { Self { // 👇 reader: Some(reader), sleep: tokio::time::sleep(Default::default()), } } } impl<R> SlowRead<R> where R: Unpin, { // 👇 now takes pinned mutable reference to Self, and returns an option fn take_inner(self: Pin<&mut Self>) -> Option<R> { unsafe { self.get_unchecked_mut().reader.take() } } } impl<R> AsyncRead for SlowRead<R> where R: AsyncRead + Unpin, { fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<std::io::Result<()>> { // pin-project both fields let (mut sleep, reader) = unsafe { let this = self.get_unchecked_mut(); (Pin::new_unchecked(&mut this.sleep), &mut this.reader) }; match sleep.as_mut().poll(cx) { Poll::Ready(_) => { sleep.reset(Instant::now() + Duration::from_millis(25)); match reader { Some(reader) => { // pin-project option. let reader = Pin::new(reader); // note: no need for unsafe since R: Unpin! (thanks // /u/novartole on reddit for catching this.) reader.poll_read(cx, buf) } None => { // simulate EOF Poll::Ready(Ok(())) } } } Poll::Pending => Poll::Pending, } } }
And we could use it like this:
#[tokio::main] async fn main() -> Result<(), tokio::io::Error> { let mut buf = vec![0u8; 128 * 1024]; let f = File::open("/dev/urandom").await?; let mut f = { let f = SlowRead::new(f); pin_utils::pin_mut!(f); let before = Instant::now(); f.read_exact(&mut buf).await?; println!("Read {} bytes in {:?}", buf.len(), before.elapsed()); f.take_inner().unwrap() }; let before = Instant::now(); f.read_exact(&mut buf).await?; println!("Read {} bytes in {:?}", buf.len(), before.elapsed()); Ok(()) }
If we really needed an into_inner
method that takes self
, though, we may
be better off boxing the Sleep
so that the whole type becomes Unpin
itself, which would make all of this a lot more ergonomic.
I just wanted to point out, pun fully intended, that there's options.
Gotcha.
Let's go back to this version of SlowRead
for a second:
struct SlowRead<R> { reader: R, sleep: Sleep, } impl<R> SlowRead<R> { fn new(reader: R) -> Self { Self { reader, sleep: tokio::time::sleep(Default::default()), } } }
That has this AsyncRead
implementation:
impl<R> AsyncRead for SlowRead<R> where R: AsyncRead + Unpin, { fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<std::io::Result<()>> { // pin-project both fields let (mut sleep, reader) = unsafe { let this = self.get_unchecked_mut(); ( Pin::new_unchecked(&mut this.sleep), Pin::new_unchecked(&mut this.reader), ) }; match sleep.as_mut().poll(cx) { Poll::Ready(_) => { sleep.reset(Instant::now() + Duration::from_millis(25)); reader.poll_read(cx, buf) } Poll::Pending => Poll::Pending, } } }
Now... this could be worse, right? I'm sure there's worse implementations of
AsyncRead
out there. But also, I get the feeling it could be better.
Like.. without any usage of unsafe
?
There's a crate for that!
Nothing in the standard library so far (Rust 1.51), but that's exactly what
the pin-project
crate is about.
Let's see..
# in `Cargo.toml` [dependencies] pin-project = "1.0"
The idea behind pin-project
is that it does pin projection for you.
That part:
// pin-project both fields let (mut sleep, reader) = unsafe { let this = self.get_unchecked_mut(); ( Pin::new_unchecked(&mut this.sleep), Pin::new_unchecked(&mut this.reader), ) };
...and to ensure that you're using fields either always pinned or never pinned, you have to make that choice when designing the struct.
pin-project
presents itself as a procedural attribute macro, so you can
slap it on top of any struct, and then set the #[pin]
attribute onto any
field that you're planning on using as pinned.
In our case, that's both fields.
use pin_project::pin_project; #[pin_project] struct SlowRead<R> { #[pin] reader: R, #[pin] sleep: Sleep, }
And then, instead of doing pin projection ourselves, we call the auto-generated
method project
, which takes a Pin<&mut SlowRead<R>>
and returns a struct that
looks like this:
struct SlowReadProjected<'a, R> { reader: Pin<&'a mut R> sleep: Pin<&'a mut Sleep> }
impl<R> AsyncRead for SlowRead<R> where R: AsyncRead + Unpin, { fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<std::io::Result<()>> { // 👇 👇 let mut this = self.project(); match this.sleep.as_mut().poll(cx) { Poll::Ready(_) => { this.sleep.reset(Instant::now() + Duration::from_millis(25)); this.reader.poll_read(cx, buf) } Poll::Pending => Poll::Pending, } } }
And boom, no more unsafe
.
And we can never accidentally use self.sleep
or self.reader
unpinned
(without unsafe
code).
And it still works.
$ grep "unsafe" -Rn src $ cargo run --quiet Read 131072 bytes in 393.9761ms
Of course, that's a fairly simple case of pin-projection. It gets hairier. A lot hairier. Partially-pinned state machines are... not fun. For now!
Whoa. Thanks cool bear!
Don't mention it 😎
Thanks to my sponsors: Lev Khoroshansky, Ivo Murrell, Tobias Bahls, Andrew Neth, Jon Gjengset, Walther, Hadrien G., Jonathan Adams, Cass, WeblWabl, Alex Doroshenko, Lyssieth, callym, Chirag Jain, SeniorMars, Matt Jackson, Philipp Gniewosz, Peter Shih, xales, Henrik Tudborg and 227 more
If you liked what you saw, please support my work!
Here's another article just for you:
Long story short: a couple of my articles got really popular on a bunch of sites, and someone, somewhere, went "well, let's see how much traffic that smart-ass can handle", and suddenly I was on the receiving end of a couple DDoS attacks.
It really doesn't matter what the articles were about — the attack is certainly not representative of how folks on either side of any number of debates generally behave.