Pin and suffering

👋 This page was last updated ~4 years ago. Just so you know.

I'd like to think that my understanding of "async Rust" has increased over the past year or so. I'm 100% onboard with the basic principle: I would like to handle thousands of concurrent tasks using a handful of threads. That sounds great!

And to become proficient with async Rust, I've accepted a lot of things. There are blue functions and red functions, and red (async) functions are contagious.

Cool bear's hot tip

Eh, not exactly — you can build an executor in a blue (sync) function to block until a red (async) function is done.

And from a red (async) function, you can use something like tokio::task::spawn_blocking.

I guess the difficulty is when you go from red to blue, back to red — because you can't make an executor inside an executor.

See bear, this is what I'm talking about: there's just so much I have to take for granted. "Don't block the executor", "futures do nothing unless polled", "you need to pin a future before you can poll it", etc.

Bear

Well, yes? Mostly those two?

But bear, everything was so simple in synchronous Rust, and suddenly I have to uphold all these rules? I have to think about these?

Here let me show you. Say, in synchronous Rust, I just want to print hello, sleep, and then print goodbye.

I can just do this!

Rust code
use std::{thread::sleep, time::Duration};

fn main() {
    println!("Hello!");
    sleep(Duration::from_millis(500));
    println!("Goodbye!");
}
Shell session
$ cargo run --quiet
Hello!
(500ms elapse...)
Goodbye!

It's so simple!

Bear

Well, the async version isn't that hard either. Try out tokio for example.

Okay, well, sure:

TOML markup
# in Cargo.toml

[dependencies]
tokio = { version = "1.4.0", features = ["full"] }
Rust code
use std::{thread::sleep, time::Duration};

#[tokio::main]
async fn main() {
    println!("Hello!");
    sleep(Duration::from_millis(500));
    println!("Goodbye!");
}
Shell session
$ cargo run --quiet
Hello!
(500ms elapse...)
Goodbye!

Ah, you're right! That was pretty simple.

Bear

Well, it's also wrong. You're blocking the executor.

What? I didn't see any blockage going on. In fact, it behaved exactly the same as the synchronous version. Seems okay to me.

Bear

Well, try spawning two tasks that do that, you'll see.

Okay, sure:

Rust code
use std::{thread::sleep, time::Duration};

#[tokio::main]
async fn main() {
    let one = tokio::spawn(greet());
    let two = tokio::spawn(greet());
    let (_, _) = tokio::join!(one, two);
}

async fn greet() {
    println!("Hello!");
    sleep(Duration::from_millis(500));
    println!("Goodbye!");
}
Shell session
$ cargo run --quiet
Hello!
Hello!
(500ms elapse...)
Goodbye!
Goodbye!

There! It just works! Thanks bear, I don't know what I was complaining about, async Rust really is quite easy.

Bear

No, you j- sigh okay, try switching to the single-threaded executor.

The what? Oh, oh wait a minute, I see it, I can just pass a flavor argument to the tokio::main attribute macro:

Rust code
//              👇
#[tokio::main(flavor = "current_thread")]
async fn main() {
    let one = tokio::spawn(greet());
    let two = tokio::spawn(greet());
    let (_, _) = tokio::join!(one, two);
}

// omitted: everything else
Shell session
$ cargo run --quiet
Hello!
(500ms elapse...)
Goodbye!
Hello!
(500ms elapse...)
Goodbye!

Ah.

So we are blocking the executor. But see, that's complicated, I don't want to have to think about all that!

For me sleep is just a syscall. You call it and boom! It sleeps. No need to worry about anything.

Bear

Well, you could just not block the executor?

But how?

Bear

Well, tokio has its own sleep method.

And that one doesn't block? I mean okay, sure, if you say so:

Rust code
use std::time::Duration;
use tokio::time::sleep;

#[tokio::main(flavor = "current_thread")]
async fn main() {
    let one = tokio::spawn(greet());
    let two = tokio::spawn(greet());
    let (_, _) = tokio::join!(one, two);
}

async fn greet() {
    println!("Hello!");
    sleep(Duration::from_millis(500)).await;
    println!("Goodbye!");
}
Shell session
$ cargo run --quiet
Hello!
Hello!
(500ms elapse...)
Goodbye!
Goodbye!

Huh. So that one doesn't block the executor. Okay then.

But, see bear, I don't know how that all works. It's all just voodoo to me. One function blocks the executor, another doesn't, why? How does it actually work, under the hood?

Bear

Well... it's not that complicated really.

The sleep in std does just call the sleep syscall, pretty much.

Yes, that's simple. I like that.

Bear

Whereas the sleep in tokio returns a Future... that registers a timer when you first poll it... and it only completes when the deadline is reached.

I uh... none of that made any sense. I don't remember "polling" anything. And you say it... "registers" a timer? As in, global state? And what do you mean by "complete"?

Bear

Sigh. Okay, let's start small.

The Future type.

Bear

Future is just a trait. Anything can be a Future if it wants to.

We can make Futures?

Bear

Sure! If you want to.

Yeah why not, if it can shed some light on the whole thing, I'll make a future.

So I guess since it's a trait... I know traits. We can just implement it on any type? Like an empty struct?

Bear

Yeah! Go ahead!

Rust code
use std::future::Future;

#[tokio::main(flavor = "current_thread")]
async fn main() {
    todo!()
}

struct MyFuture {}

impl Future for MyFuture {}

Ok, the compiler is complaining that not all trait items are implemented: it wants Output and poll. What now?

Bear

Now, use the rust-analyzer "Implement missing members" quick fix to generate those.

Rust code
impl Future for MyFuture {
    type Output;

    fn poll(
        self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Self::Output> {
        todo!()
    }
}

Ew, long types.

Bear

...just use rust-analyzer's "Replace qualified path with use" quick fix on those long types.

Rust code
use std::{
    future::Future,
    pin::Pin,
    task::{Context, Poll},
};

struct MyFuture {}

impl Future for MyFuture {
    type Output;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        todo!()
    }
}

Okay, better! And now what?

Bear

Well, first you need to pick your output type. What do you want your future to, well, output?

Nothing special?

Bear

So, nothing. You can just use the empty tuple: ().

Rust code
impl Future for MyFuture {
    type Output = ();

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        todo!()
    }
}

Okay done! Now what?

Bear

Now we await it!

But it's not done?

Bear

That's okay, we just await it anyway!

Hum, okay:

Rust code
#[tokio::main(flavor = "current_thread")]
async fn main() {
    let fut = MyFuture {};
    fut.await
}

This feels really dumb, but here goes:

Shell session
$ cargo run --quiet
thread 'main' panicked at 'not yet implemented', src/main.rs:19:9
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

Yeah, well...

Bear

...but that's good! That means your future was polled!

Mh? Who polled it?

Hang on, maybe we can get a stack trace...

Shell session
$ RUST_BACKTRACE=1 cargo run --quiet
thread 'main' panicked at 'not yet implemented', src/main.rs:19:9
stack backtrace:
   0: rust_begin_unwind
             at /rustc/cb75ad5db02783e8b0222fee363c5f63f7e2cf5b/library/std/src/panicking.rs:493:5
   1: core::panicking::panic_fmt
             at /rustc/cb75ad5db02783e8b0222fee363c5f63f7e2cf5b/library/core/src/panicking.rs:92:14
   2: core::panicking::panic
             at /rustc/cb75ad5db02783e8b0222fee363c5f63f7e2cf5b/library/core/src/panicking.rs:50:5
   3: <manual_futures::MyFuture as core::future::future::Future>::poll
             at ./src/main.rs:19:9
   4: manual_futures::main::{{closure}}
             at ./src/main.rs:10:5
   5: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
             at /home/amos/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/future/mod.rs:80:19
   6: <core::pin::Pin<P> as core::future::future::Future>::poll
             at /home/amos/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/future/future.rs:119:9
   7: tokio::runtime::basic_scheduler::Inner<P>::block_on::{{closure}}::{{closure}}
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/basic_scheduler.rs:196:62
   8: tokio::coop::with_budget::{{closure}}
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/coop.rs:106:9
   9: std::thread::local::LocalKey<T>::try_with
             at /home/amos/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/std/src/thread/local.rs:272:16
  10: std::thread::local::LocalKey<T>::with
             at /home/amos/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/std/src/thread/local.rs:248:9
  11: tokio::coop::with_budget
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/coop.rs:99:5
  12: tokio::coop::budget
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/coop.rs:76:5
  13: tokio::runtime::basic_scheduler::Inner<P>::block_on::{{closure}}
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/basic_scheduler.rs:196:39
  14: tokio::runtime::basic_scheduler::enter::{{closure}}
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/basic_scheduler.rs:279:29
  15: tokio::macros::scoped_tls::ScopedKey<T>::set
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/macros/scoped_tls.rs:61:9
  16: tokio::runtime::basic_scheduler::enter
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/basic_scheduler.rs:279:5
  17: tokio::runtime::basic_scheduler::Inner<P>::block_on
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/basic_scheduler.rs:185:9
  18: tokio::runtime::basic_scheduler::InnerGuard<P>::block_on
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/basic_scheduler.rs:425:9
  19: tokio::runtime::basic_scheduler::BasicScheduler<P>::block_on
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/basic_scheduler.rs:145:24
  20: tokio::runtime::Runtime::block_on
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/mod.rs:450:46
  21: manual_futures::main
             at ./src/main.rs:7:1
  22: core::ops::function::FnOnce::call_once
             at /home/amos/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/ops/function.rs:227:5
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.

Ok, uhh the... tokio scheduler? Is the one that polls in?

Bear

Yes! That's what your code is doing.

Mh? It certainly isn't — I haven't even built a tokio::runtime::Runtime.

Bear

Yeah, tokio::main just did it for you! In fact, your main function here:

Rust code
#[tokio::main(flavor = "current_thread")]
async fn main() {
    let fut = MyFuture {};
    fut.await
}
Bear

...is roughly equivalent to this:

Rust code
fn main() {
    let rt = tokio::runtime::Builder::new_current_thread()
        .build()
        .unwrap();
    let fut = MyFuture {};
    rt.block_on(fut);
}

Ah! I see. So... with tokio::main, main is an async function, which... is what exactly?

Bear

An async is actually just a synchronous function that returns a future!

So the whole body of main is a Future, too?

Bear

Yes!

Okay, okay, well, I don't really want to build a Runtime myself, I'm fine with the tokio::main version for now.

I want to go back to MyFuture. How do I make it do something?

Bear

Well, by returning! You can see that it's supposed to a return a Poll, which is an enum with two variants.

The first variant is Ready, which..

..which I assume we return when the future is complete, okay. So if I do this:

Rust code
use std::{
    future::Future,
    pin::Pin,
    task::{Context, Poll},
};

#[tokio::main]
async fn main() {
    let fut = MyFuture {};
    println!("Awaiting fut...");
    fut.await;
    println!("Awaiting fut... done!");
}

struct MyFuture {}

impl Future for MyFuture {
    type Output = ();

    fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
        // 👇
        Poll::Ready(())
    }
}

...then it should complete immediately:

Shell session
$ cargo run --quiet
Awaiting fut...
Awaiting fut... done!
Bear

Correct! And the other variant is Pending, which you should return if your future is not quite done yet.

Mh? Then how will it ever complete?

Bear

Well, Future::poll will be called again later!

It will? Okay!

Rust code
impl Future for MyFuture {
    type Output = ();

    fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
        // 👇
        Poll::Pending
    }
}
Shell session
$ cargo run --quiet
Awaiting fut...
(nothing seems to happen)

Mhhh, bear? The program seems... stuck. Are we blocking the executor again?

Bear

Oh, no. It's not blocked. Well, the actual, generated, synchronous main function is blocked on the asynchronous main's future — it's waiting for it to complete, and it's never completing.

So it's not getting polled again? But you just told me.. hang on, let me just add some debug prints:

Rust code
impl Future for MyFuture {
    type Output = ();

    fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
        // 👇
        println!("MyFuture::poll()");
        Poll::Pending
    }
}
Shell session
$ cargo run --quiet
Awaiting fut...
MyFuture::poll()
(nothing happens, still. the program continues running forever)

Yeah! It's only getting polled once! What gives, bear? I thought you told me it would get polled again?

Bear

Ahh well it only gets polled again if it registered to be polled again. Did you think it would be polled in a loop? Imagine if you're trying to read from a socket and the other peer is not sending anything for five seconds.

The read future would be polled and polled and polled in a busy loop for five seconds. It would consume an entire CPU core! No, futures are only "awakened" when something happens.

Something? What kind of thing?

Bear

Something interesting! Like, a timer running out. Or a file being ready to read from. That kind of thing.

Ah. That kind of thing. Okay, so say I want my future to be "awakened" after one second. How would I do that?

Bear

Well, you see that cx argument? Of type Context? That's how.

Ah, right — the poll method takes two arguments. The receiver is... some form of self, so, MyFuture, and the second argument is a &mut Context.

Let's see what it has... it has... a waker() method that returns a &Waker! That looks interesting.

Bear

It sure does!

And Waker has... wake and wake_by_ref methods. Well, we can't call wake at all because it wants to take ownership of Waker, and all we have is an immutable reference to it.

Let's try wake_by_ref I guess?

Rust code
impl Future for MyFuture {
    type Output = ();

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        println!("MyFuture::poll()");
        cx.waker().wake_by_ref();
        Poll::Pending
    }
}
Shell session
$ cargo run --quiet
Awaiting fut...
MyFuture::poll()
MyFuture::poll()
MyFuture::poll()
MyFuture::poll()
MyFuture::poll()
MyFuture::poll()
MyFuture::poll()
MyFutur^C
(that line repeats very quickly, and only stops when we hit Ctrl-C)

Heyyyy. Now it is a busy loop! It gets polled as fast as it can.

Bear

Correct!

But that's still not what I want. What I want is... for my future to be polled again after one second.

Bear

So? You have a Waker. You can wake it up yourself.

No, because I return, so by that time it's too late.

Bear

So??? Just use a thread! You were just going on about how threads are nice and easy and you understood them, blah blah blah. Just use a thread.

But I only have a reference to a Waker, I can't just.. oh would you look at that, it implements Clone. Okay. OKAY. I guess I'll just spawn a thread.

Rust code
impl Future for MyFuture {
    type Output = ();

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        println!("MyFuture::poll()");

        let waker = cx.waker().clone();
        std::thread::spawn(move || {
            std::thread::sleep(Duration::from_secs(1));
            waker.wake();
        });

        Poll::Pending
    }
}
Shell session
$ cargo run --quiet
Awaiting fut...
MyFuture::poll()
(1 second elapses...)
MyFuture::poll()
(another second elapses...)
MyFuture::poll()
(a third second elapses...)
MyFuture::poll()
^C

AhAH! Interesting. Well, eventually I guess I want to return ready so all I need to do is maintain some state, let's see...

Rust code
#[tokio::main]
async fn main() {
    let fut = MyFuture::new();
    println!("Awaiting fut...");
    fut.await;
    println!("Awaiting fut... done!");
}

struct MyFuture {
    slept: bool,
}

impl MyFuture {
    fn new() -> Self {
        Self { slept: false }
    }
}

impl Future for MyFuture {
    type Output = ();

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        println!("MyFuture::poll()");

        match self.slept {
            false => {
                // make sure we're polled again in one second
                let waker = cx.waker().clone();
                std::thread::spawn(move || {
                    std::thread::sleep(Duration::from_secs(1));
                    waker.wake();
                });
                self.slept = true;

                Poll::Pending
            }
            true => Poll::Ready(()),
        }
    }
}

No, wait — it's complaining that self is not mutable. Uhh... I can write it mut self if that'd help?

Rust code
impl Future for MyFuture {
    type Output = ();

    //       👇
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // etc.
    }
}

Ah. That did help. Jolly good.

Shell session
$ cargo run --quiet
Awaiting fut...
MyFuture::poll()
(one second elapses)
MyFuture::poll()
Awaiting fut... done!

Heyyyyyyy. That worked! Thanks bear!

Bear

You're welcome. See? It's not that complicated!

It really isn't. Hey WAIT A MINUTE. We're just back to using threads, sort of.

I don't want to spin up a new thread every time I want to wait. That seems really excessive. Surely async Rustâ„¢ has something to solve that?

Bear

Well yes! In this case, for this executor, it's tokio::time::sleep.

But that's what we were using in the first pl-

Bear

Yes, yes, that's what we were using. But we can embed it within MyFuture, so you can still see the machinery.

Machinery, yes, good! Okay, how do I do that.

Bear

Well, I don't know if you've noticed but tokio::time::sleep returns a concrete type: Sleep.

AhAH! So if I just store that as a field of MyFuture, all I have to do is poll it, from within my own poll method?

Bear

That's the gist, yes.

Rust code
use tokio::time::Sleep;

struct MyFuture {
    sleep: Sleep,
}

impl MyFuture {
    fn new() -> Self {
        Self {
            sleep: tokio::time::sleep(Duration::from_secs(1)),
        }
    }
}

impl Future for MyFuture {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        println!("MyFuture::poll()");
        self.sleep.poll(cx)
    }
}

Ah, great! It even has the same exact return type as MyFuture::poll — Poll<()>, so the method body is really short and sweet.

There's just uh... just one tiny problem. It doesn't compile:

Shell session
$ cargo check
    Checking manual-futures v0.1.0 (/home/amos/ftl/manual-futures)
error[E0599]: no method named `poll` found for struct `Sleep` in the current scope
  --> src/main.rs:35:20
   |
35 |         self.sleep.poll(cx)
   |                    ^^^^ method not found in `Sleep`

error: aborting due to previous error

For more information about this error, try `rustc --explain E0599`.
error: could not compile `manual-futures`

To learn more, run the command again with --verbose.

Ohhh! I know! poll is a method on trait Future, so I need to import it to be able to call it... wait no, we've already imported it.

Beaaaaaaaaaar what's wrong with my code?

Bear

Look at the receiver! It's not self, it's not &self, it's not even &mut self.

It's self: Pin<&mut Self>.

Mh. Okay. How do we build a Pin... it has a new method. Let's try that.

Rust code
impl Future for MyFuture {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        println!("MyFuture::poll()");
        //                 👇
        let mut sleep = Pin::new(&mut self.sleep);
        sleep.poll(cx)
    }
}
Shell session
$ cargo check
    Checking manual-futures v0.1.0 (/home/amos/ftl/manual-futures)
error[E0277]: `PhantomPinned` cannot be unpinned
  --> src/main.rs:35:25
   |
35 |         let mut sleep = Pin::new(&mut self.sleep);
   |                         ^^^^^^^^ within `tokio::time::driver::sleep::_::__Origin<'_>`, the trait `Unpin` is not implemented for `PhantomPinned`
   |
   = note: required because it appears within the type `tokio::time::driver::entry::TimerEntry`
   = note: required because it appears within the type `tokio::time::driver::sleep::_::__Origin<'_>`
   = note: required because of the requirements on the impl of `Unpin` for `Sleep`
   = note: required by `Pin::<P>::new`

That does not work. Bear, halp. What's happening?

Bear

Bwahahahaha. Just Box::pin it!

Just wh-?

Bear

In the constructor, just wrap sleep() in Box::pin().

Uh, okay... that returns a Pin<Box<Sleep>> so I guess I'll have to change the field type too...

Rust code
struct MyFuture {
    sleep: Pin<Box<Sleep>>,
}

impl MyFuture {
    fn new() -> Self {
        Self {
            sleep: Box::pin(tokio::time::sleep(Duration::from_secs(1))),
        }
    }
}

impl Future for MyFuture {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        println!("MyFuture::poll()");
        let sleep = Pin::new(&mut self.sleep);
        sleep.poll(cx)
    }
}

Oh hey, that compiles.

Shell session
$ cargo run --quiet
Awaiting fut...
MyFuture::poll()
(one second elapses...)
MyFuture::poll()
Awaiting fut... done!

Oh heyy! That runs!!

Bear

Yes! And you honestly don't need to build that Pin yourself, because Pin<Box<Sleep>> is already pinned — so you're building a Pin<&mut Pin<Box<Sleep>>>.

You can just call sleep.as_mut(), and poll that.

Rust code
impl Future for MyFuture {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        println!("MyFuture::poll()");
        self.sleep.as_mut().poll(cx)
    }
}

Ah, cleaner. That still works.

Bear

You can even go one step further and add the futures crate so you can use FutureExt::poll_unpin, because polling an Unpin future is a common operation.

I can?

TOML markup
# in Cargo.toml

[dependencies]
futures = "0.3"
Rust code
use futures::FutureExt;

impl Future for MyFuture {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        println!("MyFuture::poll()");
        self.sleep.poll_unpin(cx)
    }
}

Neat! That still works.

At this point though, I can't help but wonder — do we really need to understand all this? Like, it's neat that we did it once, don't get me wrong.

But in the real world, is there ever a point in manually implementing futures like that? Don't we have the async / await keywords so that we don't have to?

Bear

Well! In this case, yes, it's gratuitous. But let's say we want to do something different...

Let's say we want to make a type that implements tokio's AsyncRead interface (so that we can use it anywhere we would use another reader), but that artificially introduces some delay between each read.

Mhyes, okay. Can't we just use an async method here?

Bear

You'd think so, but no. Not currently. See, async Rust is still in its awkward teenage phase. Almost everything is there, but it can be a bit of a challenge to live with.

What uh.. what does that mean?

Bear

Well, AsyncRead is a trait. And traits can't have async methods. (At the time of this writing, ie. with Rust 1.51).

Okay, so just have them return a Future then? That's all async methods are, right?

Bear

Right, yes, but! We're always reading into something. A buffer. In synchronous land, when we do this:

Rust code
let mut buf = vec![0u8; 1024];
let n = something.read(&mut buf)?;

...the read method borrows buf mutably until it's done with it.

But the problem with async...

...is that we return early if we're not quite ready yet. We just return Poll::Pending and wait until we're awakened by the scheduler. Ohhhhh and just because we return doesn't mean that we're done with the buffer!

Bear

...rude, but yes.

So that's not how AsyncRead works. It does give you a buffer, but you're supposed to either:

  • Not be ready, and so you subscribe to be awakened later, and return Poll::Pending
  • Be ready, and then you fill part (or all) of the buffer, and return Poll::Ready

And if we're not ready, what do we do with the buffer?

Bear

Nothing. Especially not write into it. It would be disregarded anyway.

Gotcha. Okay, so, we know how to sleep by polling a Sleep future, I think we know enough to cook us a slow reader...

It's going to be a struct (it's always a struct), and it'll work with any kind of reader.

Rust code
struct SlowRead<R> {
    reader: R,
}

impl<R> SlowRead<R> {
    fn new(reader: R) -> Self {
        Self { reader }
    }
}

And then we just have to implement AsyncRead for SlowRead itself! But only if its type parameter R also implements AsyncRead:

Rust code
use tokio::io::{AsyncRead, ReadBuf};

impl<R> AsyncRead for SlowRead<R>
where
    R: AsyncRead,
{
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<std::io::Result<()>> {
        self.reader.poll_read(cx, buf)
    }
}

Okay! Well, this doesn't build:

Shell session
$ cargo check
    Checking manual-futures v0.1.0 (/home/amos/ftl/manual-futures)
error[E0599]: no method named `poll_read` found for type parameter `R` in the current scope
  --> src/main.rs:29:21
   |
29 |         self.reader.poll_read(cx, buf)
   |                     ^^^^^^^^^ method not found in `R`
   |
   = help: items from traits can only be used if the type parameter is bounded by the trait
help: the following trait defines an item `poll_read`, perhaps you need to restrict type parameter `R` with it:
   |
20 | impl<R: AsyncRead> AsyncRead for SlowRead<R>
   |      ^^^^^^^^^^^^

And the compiler is getting confused. We're already constraining R to implement AsyncRead. I guess this is just like futures, where the receiver isn't quit ri-

Bear

Yup. It's exactly like that.

So we need to pin it before we read fr-

Bear

Yes. You need to pin it.

Okay, okay, I'm pinning it. Box-pinning it even.

Rust code
struct SlowRead<R> {
    reader: Pin<Box<R>>,
}

impl<R> SlowRead<R> {
    fn new(reader: R) -> Self {
        Self {
            reader: Box::pin(reader),
        }
    }
}

impl<R> AsyncRead for SlowRead<R>
where
    R: AsyncRead,
{
    fn poll_read(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<std::io::Result<()>> {
        self.reader.as_mut().poll_read(cx, buf)
    }
}

Okay, had to sneak an as_mut in there, but it builds. What now?

Bear

Well, now we try reading! First, without SlowRead...

Rust code
use std::{
    pin::Pin,
    task::{Context, Poll},
};
use tokio::{
    fs::File,
    io::{AsyncRead, AsyncReadExt, ReadBuf},
    time::Instant,
};

#[tokio::main]
async fn main() -> Result<(), tokio::io::Error> {
    let mut buf = vec![0u8; 128 * 1024];
    let mut f = File::open("/dev/urandom").await?;
    let before = Instant::now();
    f.read_exact(&mut buf).await?;
    println!("Read {} bytes in {:?}", buf.len(), before.elapsed());

    Ok(())
}

// omitted: `SlowRead` declaration and implementation
Shell session
$ cargo run --quiet
Read 131072 bytes in 3.2748ms
Bear

...and now with SlowRead!

Rust code
#[tokio::main]
async fn main() -> Result<(), tokio::io::Error> {
    let mut buf = vec![0u8; 128 * 1024];
    //             👇
    let mut f = SlowRead::new(File::open("/dev/urandom").await?);
    let before = Instant::now();
    f.read_exact(&mut buf).await?;
    println!("Read {} bytes in {:?}", buf.len(), before.elapsed());

    Ok(())
}
Shell session
$ cargo run --quiet
Read 131072 bytes in 3.3134ms

Well, it's not... really slower.

Bear

Well we're just forwarding poll_read calls straight to the inner reader for now. We're not actually slowing it down.

Oh right! So I guess we can just call std::thread::sleep in poll and...

Bear

No, no! That would bl-

...block the executor, right. Oh, wait, poll_read gives us a &mut Context as well, so we could poll a Sleep from there, right?

Bear

Exactly! And Sleep has a handy reset method, so you can use it several times.

As in sleep several times in a row with the same future? Sounds good!

I guess we'll need to pin that one too...

Rust code
struct SlowRead<R> {
    reader: Pin<Box<R>>,
    // 👇
    sleep: Pin<Box<Sleep>>,
}

impl<R> SlowRead<R> {
    fn new(reader: R) -> Self {
        Self {
            reader: Box::pin(reader),
            // 👇
            sleep: Box::pin(tokio::time::sleep(Default::default())),
        }
    }
}

impl<R> AsyncRead for SlowRead<R>
where
    R: AsyncRead,
{
    fn poll_read(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<std::io::Result<()>> {
        // 👇
        match self.sleep.poll_unpin(cx) {
            Poll::Ready(_) => {
                // whenever `sleep` completes, reset it...
                self.sleep
                    .as_mut()
                    .reset(Instant::now() + Duration::from_millis(25));
                // and poll the inner reader.
                self.reader.as_mut().poll_read(cx, buf)
            }
            Poll::Pending => Poll::Pending,
        }
    }
}
Shell session
$ cargo run --quiet
Read 131072 bytes in 396.3634ms

Woo!! We did a thing!

Bear

We did!

We're still boxing everything though.

Bear

Yes, that's "easy mode async". Just like Arc<Mutex<T>> is "easy mode lifetimes", sort of. It's fine though! It does work!

Yeah, it does appear to work. But I have no idea why anything needs to be pinned. Nor what Unpin is. Let alone phantoms, which uhh..

Bear

No, and you didn't need to! You only wrote safe code, and you made sure the types matched up, even if you needed to box them or sneak in an as_mut call here and there. And everything worked.

Yes, right — but I'm assuming that Pin exists for a reason. It's not just complexity for complexity's sake, it's inherent complexity. But for what?

Bear

Well... follow me.

We can leave our Box behind

Bear

Okay, let's start with R! You don't need to store it pinned.

I don't? So just do this?

Rust code
struct SlowRead<R> {
    //      👇
    reader: R,
    sleep: Pin<Box<Sleep>>,
}

impl<R> SlowRead<R> {
    fn new(reader: R) -> Self {
        Self {
            // 👇
            reader,
            sleep: Box::pin(tokio::time::sleep(Default::default())),
        }
    }
}

But then...

Shell session
$ cargo check
    Checking manual-futures v0.1.0 (/home/amos/ftl/manual-futures)
error[E0599]: no method named `poll_read` found for type parameter `R` in the current scope
  --> src/main.rs:52:29
   |
52 |                 self.reader.poll_read(cx, buf)
   |                             ^^^^^^^^^ method not found in `R`
   |
   = help: items from traits can only be used if the type parameter is bounded by the trait
help: the following traits define an item `poll_read`, perhaps you need to restrict type parameter `R` with one of them:
   |
38 | impl<R: futures::AsyncRead> AsyncRead for SlowRead<R>
   |      ^^^^^^^^^^^^^^^^^^^^^
38 | impl<R: tokio::io::AsyncRead> AsyncRead for SlowRead<R>
   |      ^^^^^^^^^^^^^^^^^^^^^^^

(...and the compiler gets confused again. Again, we've already constrained R to tokio::io::AsyncRead).

The problem is, I have an R, and I need a Pin<&mut R>. And I can't use Pin::new apparently, at least that didn't work last time.

Bear

But maybe it'll work this time?

Okay, sure:

Rust code
impl<R> AsyncRead for SlowRead<R>
where
    R: AsyncRead,
{
    fn poll_read(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<std::io::Result<()>> {
        match self.sleep.poll_unpin(cx) {
            Poll::Ready(_) => {
                self.sleep
                    .as_mut()
                    .reset(Instant::now() + Duration::from_millis(25));
                // 👇
                Pin::new(&mut self.reader).poll_read(cx, buf)
            }
            Poll::Pending => Poll::Pending,
        }
    }
}
Shell session
$ cargo check
    Checking manual-futures v0.1.0 (/home/amos/ftl/manual-futures)
error[E0277]: `R` cannot be unpinned
  --> src/main.rs:52:26
   |
52 |                 Pin::new(&mut self.reader).poll_read(cx, buf)
   |                          ^^^^^^^^^^^^^^^^ the trait `Unpin` is not implemented for `R`
   |
   = note: required by `Pin::<P>::new`
help: consider further restricting this bound
   |
40 |     R: AsyncRead + Unpin,
   |                  ^^^^^^^

No, see, it doesn't work eith-

Bear

Ah, but there's a hint!

Consider... further restricting this bound. We can restrict our AsyncRead implementation only for R types that can be unpinned?

Bear

Yes! ie., we need R to implement the Unpin trait, or in other words, we need R: Unpin.

Okay, sure... I mean, I don't even know if that's worth exploring, who knows if the concrete R, which in this case is... tokio::fs::File, is even Unpin, but okay, sure:

Rust code
impl<R> AsyncRead for SlowRead<R>
// 👇
where
    R: AsyncRead + Unpin,
{
    // omitted: fn `poll_read`
}
Shell session
$ cargo run --quiet
Read 131072 bytes in 394.8455ms

Oh hey! That worked!

Bear

It sure did! tokio::fs::File does implement Unpin, so it can be unpinned.

Well I'm still not sure what "can be unpinned" even means, but if you say so, okay.

Now what about Sleep? Can we just do the same trick?

First we remove the Pin<Box<T>> / Box::pin:

Rust code
struct SlowRead<R> {
    reader: R,
    // 👇
    sleep: Sleep,
}

impl<R> SlowRead<R> {
    fn new(reader: R) -> Self {
        Self {
            reader,
            // 👇
            sleep: tokio::time::sleep(Default::default()),
        }
    }
}

And then... well then we're in a pickle:

Rust code
error[E0277]: `PhantomPinned` cannot be unpinned
  --> src/main.rs:47:26
   |
47 |         match self.sleep.poll_unpin(cx) {
   |                          ^^^^^^^^^^ within `tokio::time::driver::sleep::_::__Origin<'_>`, the trait `Unpin` is not implemented for `PhantomPinned`
   |
   = note: required because it appears within the type `tokio::time::driver::entry::TimerEntry`
   = note: required because it appears within the type `tokio::time::driver::sleep::_::__Origin<'_>`
   = note: required because of the requirements on the impl of `Unpin` for `Sleep`
Bear

Yes! tokio::time::Sleep does not implement Unpin, ie. it cannot be unpinned.

But then how... because clearly Pin::new is not going to work either in this case:

Rust code
impl<R> AsyncRead for SlowRead<R>
where
    R: AsyncRead + Unpin,
{
    fn poll_read(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<std::io::Result<()>> {
        //             👇
        let sleep = Pin::new(&mut self.sleep);
        //      👇
        match sleep.poll(cx) {
            Poll::Ready(_) => {
                //             👇
                let sleep = Pin::new(&mut self.sleep);
                // 👇
                sleep.reset(Instant::now() + Duration::from_millis(25));
                Pin::new(&mut self.reader).poll_read(cx, buf)
            }
            Poll::Pending => Poll::Pending,
        }
    }
}
Shell session
error[E0277]: `PhantomPinned` cannot be unpinned
  --> src/main.rs:47:21
   |
47 |         let sleep = Pin::new(&mut self.sleep);
   |                     ^^^^^^^^ within `tokio::time::driver::sleep::_::__Origin<'_>`, the trait `Unpin` is not implemented for `PhantomPinned`
   |
   = note: required because it appears within the type `tokio::time::driver::entry::TimerEntry`
   = note: required because it appears within the type `tokio::time::driver::sleep::_::__Origin<'_>`
   = note: required because of the requirements on the impl of `Unpin` for `Sleep`
   = note: required by `Pin::<P>::new`

So then what do we do?

Bear

Well, there's another constructor for Pin... you can always call Pin::new_unchecked.

Mh? Like that?

Rust code
impl<R> AsyncRead for SlowRead<R>
where
    R: AsyncRead + Unpin,
{
    fn poll_read(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<std::io::Result<()>> {
        //                 👇
        let sleep = Pin::new_unchecked(&mut self.sleep);
        match sleep.poll(cx) {
            Poll::Ready(_) => {
                //                 👇
                let sleep = Pin::new_unchecked(&mut self.sleep);
                sleep.reset(Instant::now() + Duration::from_millis(25));
                Pin::new(&mut self.reader).poll_read(cx, buf)
            }
            Poll::Pending => Poll::Pending,
        }
    }
}

But..

Shell session
error[E0133]: call to unsafe function is unsafe and requires unsafe function or block
  --> src/main.rs:47:21
   |
47 |         let sleep = Pin::new_unchecked(&mut self.sleep);
   |                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ call to unsafe function
   |
   = note: consult the function's documentation for information on how to avoid undefined behavior

Wait wait wait wait. It's unsafe? Eww!

Bear

It is! And that means there are some invariants that the compiler will no longer be enforcing for us, so we'll have to "just be careful". Which is something we never have to do in Rust, except for when we use unsafe.

Ughhh. Do we have to?

Bear

For the purpose of this exercise, yes. I'm about to tell you the rules we should follow. Are you ready?

Shoot.

Bear

Once we pin something, ie. once we construct a Pin<&mut T> of it, we can never use it unpinned (ie, as &mut T) ever again, unless it implements Unpin.

But Sleep doesn't implement Unpin.

Bear

Yes! Which means that, since we need to use it as a Pin<&mut T> once, we can only ever use it as a Pin<&mut T>.

Okay, I guess we can do that. I think I understand the rules, so... I'll just add some unsafe blocks around those:

Rust code
impl<R> AsyncRead for SlowRead<R>
where
    R: AsyncRead + Unpin,
{
    fn poll_read(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<std::io::Result<()>> {
        let sleep = unsafe { Pin::new_unchecked(&mut self.sleep) };
        match sleep.poll(cx) {
            Poll::Ready(_) => {
                let sleep = unsafe { Pin::new_unchecked(&mut self.sleep) };
                sleep.reset(Instant::now() + Duration::from_millis(25));
                Pin::new(&mut self.reader).poll_read(cx, buf)
            }
            Poll::Pending => Poll::Pending,
        }
    }
}

Uhh...

Shell session
error[E0596]: cannot borrow data in a dereference of `Pin<&mut SlowRead<R>>` as mutable
  --> src/main.rs:47:49
   |
47 |         let sleep = unsafe { Pin::new_unchecked(&mut self.sleep) };
   |                                                 ^^^^^^^^^^^^^^^ cannot borrow as mutable
   |
   = help: trait `DerefMut` is required to modify through a dereference, but it is not implemented for `Pin<&mut SlowRead<R>>`

Oh nooo. What now. Even a line that used to work doesn't work anymore:

Shell session
error[E0596]: cannot borrow data in a dereference of `Pin<&mut SlowRead<R>>` as mutable
  --> src/main.rs:52:26
   |
52 |                 Pin::new(&mut self.reader).poll_read(cx, buf)
   |                          ^^^^^^^^^^^^^^^^ cannot borrow as mutable
   |
   = help: trait `DerefMut` is required to modify through a dereference, but it is not implemented for `Pin<&mut SlowRead<R>>`

This used to work! We said R: Unpin, so R can be unpinned, so we should be able to pass it to Pin::new, the safe constructor for Pin.

Bear

Ah, yes, but back then, Self was Unpin too.

Mhhhhh because it only had fields that were also Unpin?

Bear

Yes! Didn't you think it was odd that we could do &mut self.reader, even though self was a Pin<&mut Self> and not a &mut Self?

Honestly I was just happy it worked?

Bear

Well, it only worked because we had Self: Unpin, and so going from Pin<&mut Self> to &mut Self was allowed — and so that's what it did, transparently.

But now, Self is no longer Unpin, because it contains a Sleep, and so we cannot go from Pin<&mut Self> to &mut Self.

...we can't?

Bear

Well, not safely. We can do so unsafely.

Let me guess, Pin::new_unchecked?

Bear

Actually no! That wouldn't help. To obtain a Pin<&mut R>, we must obtain a &mut R, and to obtain a &mut R, we would need a &mut Self, so we're back to square one.

Okay, some other unsafe method?

Bear

Yes! Pin::map_unchecked. Here, let me show you:

Rust code
impl<R> AsyncRead for SlowRead<R>
where
    R: AsyncRead + Unpin,
{
    fn poll_read(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<std::io::Result<()>> {
        let sleep = unsafe { self.as_mut().map_unchecked_mut(|this| &mut this.sleep) };
        match sleep.poll(cx) {
            Poll::Ready(_) => {
                let sleep = unsafe { self.as_mut().map_unchecked_mut(|this| &mut this.sleep) };
                sleep.reset(Instant::now() + Duration::from_millis(25));
                let reader = unsafe { self.as_mut().map_unchecked_mut(|this| &mut this.reader) };
                reader.poll_read(cx, buf)
            }
            Poll::Pending => Poll::Pending,
        }
    }
}
Bear

There!

Uhhh it still doesn't build though:

Shell session
error[E0277]: `PhantomPinned` cannot be unpinned
  --> src/main.rs:18:5
   |
18 |     f.read_exact(&mut buf).await?;
   |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ within `tokio::time::driver::sleep::_::__Origin<'_>`, the trait `Unpin` is not implemented for `PhantomPinned`
   |
   = note: required because it appears within the type `tokio::time::driver::entry::TimerEntry`
   = note: required because it appears within the type `tokio::time::driver::sleep::_::__Origin<'_>`
   = note: required because of the requirements on the impl of `Unpin` for `Sleep`
   = note: required because it appears within the type `SlowRead<tokio::fs::File>`
   = note: required because of the requirements on the impl of `futures::Future` for `tokio::io::util::read_exact::ReadExact<'_, SlowRead<tokio::fs::File>>`
   = note: required by `futures::Future::poll`
Bear

Ah, right. read_exact wants its receiver to be Unpin, and SlowRead is no longer Unpin (because of Sleep, remember?).

So... what do we do?

Bear

In this case, we can just pin our SlowRead before reading from it. But remember — once we use it pinned, we can never use it unpinned again.

So uhh... like that?

Rust code
#[tokio::main]
async fn main() -> Result<(), tokio::io::Error> {
    let mut buf = vec![0u8; 128 * 1024];
    let mut f = SlowRead::new(File::open("/dev/urandom").await?);
    let before = Instant::now();
    let mut f = unsafe { Pin::new_unchecked(&mut f) };
    f.read_exact(&mut buf).await?;
    println!("Read {} bytes in {:?}", buf.len(), before.elapsed());

    Ok(())
}
Bear

Exactly like that! In fact, despite the unsafe, this is one of the safest ways to use Pin::new_unchecked, because we're shadowing the previous f (of type SlowRead) with our new value of type Pin<&mut SlowRead>, which means we can never accidentally use it unpinned.

Okay, let's see if it still works...

Shell session
$ cargo run --quiet
Read 131072 bytes in 396.4873ms

It does. Good.

Okay. I don't feel good about this code at all though.

Bear

And you're right to! This is dangerous territory, because we have to uphold some guarantees ourselves.

We can clean it up a little though. Instead of pin-projecting fields one by one, we can do it all in one go.

Rust code
impl<R> AsyncRead for SlowRead<R>
where
    R: AsyncRead + Unpin,
{
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<std::io::Result<()>> {
        // pin-project both fields
        let (mut sleep, reader) = unsafe {
            let this = self.get_unchecked_mut();
            (
                Pin::new_unchecked(&mut this.sleep),
                Pin::new_unchecked(&mut this.reader),
            )
        };

        match sleep.as_mut().poll(cx) {
            Poll::Ready(_) => {
                sleep.reset(Instant::now() + Duration::from_millis(25));
                reader.poll_read(cx, buf)
            }
            Poll::Pending => Poll::Pending,
        }
    }
}

Ah, more unsafe functions, good.

Bear

Well, yes — but at least we only have one unsafe block. But yes, we're in "just be careful" territory. As long as we never use either field unpinned, there will be no undefined behavior, and we're good to go.

Undefined behavior? Like what?

Bear

Like... anything at all. The compiler, and the executor, are allowed to do literally anything, if there's undefined behavior.

Okay but... what? Can we get an example?

Bear

Sure. Consider this program:

Rust code
use futures::Future;
use std::{mem::swap, pin::Pin, task::Poll, time::Duration};
use tokio::{macros::support::poll_fn, time::sleep};

#[tokio::main]
async fn main() {
    let mut sleep1 = sleep(Duration::from_secs(1));
    let mut sleep2 = sleep(Duration::from_secs(1));

    {
        // let's use `sleep1` pinned exactly _once_
        let mut sleep1 = unsafe { Pin::new_unchecked(&mut sleep1) };

        // this creates a future whose poll method is the closure argument
        poll_fn(|cx| {
            // we poll `sleep1` once, throwing away the result...
            let _ = sleep1.as_mut().poll(cx);

            // ...and resolve immediately
            Poll::Ready(())
        })
        .await;
    }

    // then, let's use `sleep1` unpinned:
    swap(&mut sleep1, &mut sleep2);
    // by this point, `sleep1` has switched places with `sleep2`

    // finally, let's await both sleep1 and sleep2
    sleep1.await;
    sleep2.await;
}
Bear

What do you think is going to happen?

Mhhh. Well, we're failing to maintain pinning invariants...

Bear

Look at you using words!!

...so, well, anything could happen? It could just work, but then I guess Sleep would be Unpin in the first place. Or it could crash... or anything else may happen.

Bear

Try it!

Shell session
$ cargo run --quiet
thread 'tokio-runtime-worker' panicked at 'assertion failed: cur_state < STATE_MIN_VALUE', /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/time/driver/entry.rs:174:13
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

(the program hangs forever, ie. it never exits)

Huh. Let's try it again with backtraces enabled:

Shell session
$ RUST_BACKTRACE=1 cargo run --quiet
thread 'tokio-runtime-worker' panicked at 'assertion failed: cur_state < STATE_MIN_VALUE', /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/time/driver/entry.rs:174:13
stack backtrace:
   0: rust_begin_unwind
             at /rustc/cb75ad5db02783e8b0222fee363c5f63f7e2cf5b/library/std/src/panicking.rs:493:5
   1: core::panicking::panic_fmt
             at /rustc/cb75ad5db02783e8b0222fee363c5f63f7e2cf5b/library/core/src/panicking.rs:92:14
   2: core::panicking::panic
             at /rustc/cb75ad5db02783e8b0222fee363c5f63f7e2cf5b/library/core/src/panicking.rs:50:5
   3: tokio::time::driver::entry::StateCell::mark_pending
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/time/driver/entry.rs:174:13
   4: tokio::time::driver::entry::TimerHandle::mark_pending
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/time/driver/entry.rs:591:15
   5: tokio::time::driver::wheel::Wheel::process_expiration
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/time/driver/wheel/mod.rs:251:28
   6: tokio::time::driver::wheel::Wheel::poll
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/time/driver/wheel/mod.rs:163:21
   7: tokio::time::driver::<impl tokio::time::driver::handle::Handle>::process_at_time
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/time/driver/mod.rs:269:33
   8: tokio::time::driver::<impl tokio::time::driver::handle::Handle>::process
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/time/driver/mod.rs:258:9
   9: tokio::time::driver::Driver<P>::park_internal
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/time/driver/mod.rs:247:9
  10: <tokio::time::driver::Driver<P> as tokio::park::Park>::park
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/time/driver/mod.rs:398:9
  11: <tokio::park::either::Either<A,B> as tokio::park::Park>::park
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/park/either.rs:30:29
  12: <tokio::runtime::driver::Driver as tokio::park::Park>::park
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/driver.rs:198:9
  13: tokio::runtime::park::Inner::park_driver
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/park.rs:205:9
  14: tokio::runtime::park::Inner::park
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/park.rs:137:13
  15: <tokio::runtime::park::Parker as tokio::park::Park>::park
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/park.rs:93:9
  16: tokio::runtime::thread_pool::worker::Context::park_timeout
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/thread_pool/worker.rs:422:13
  17: tokio::runtime::thread_pool::worker::Context::park
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/thread_pool/worker.rs:398:20
  18: tokio::runtime::thread_pool::worker::Context::run
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/thread_pool/worker.rs:328:24
  19: tokio::runtime::thread_pool::worker::run::{{closure}}
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/thread_pool/worker.rs:303:17
  20: tokio::macros::scoped_tls::ScopedKey<T>::set
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/macros/scoped_tls.rs:61:9
  21: tokio::runtime::thread_pool::worker::run
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/thread_pool/worker.rs:300:5
  22: tokio::runtime::thread_pool::worker::Launch::launch::{{closure}}
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/thread_pool/worker.rs:279:45
  23: <tokio::runtime::blocking::task::BlockingTask<T> as core::future::future::Future>::poll
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/blocking/task.rs:42:21
  24: tokio::runtime::task::core::CoreStage<T>::poll::{{closure}}
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/task/core.rs:235:17
  25: tokio::loom::std::unsafe_cell::UnsafeCell<T>::with_mut
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/loom/std/unsafe_cell.rs:14:9
  26: tokio::runtime::task::core::CoreStage<T>::poll
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/task/core.rs:225:13
  27: tokio::runtime::task::harness::poll_future::{{closure}}
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/task/harness.rs:422:23
  28: <std::panic::AssertUnwindSafe<F> as core::ops::function::FnOnce<()>>::call_once
             at /home/amos/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/std/src/panic.rs:322:9
  29: std::panicking::try::do_call
             at /home/amos/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/std/src/panicking.rs:379:40
  30: __rust_try
  31: std::panicking::try
             at /home/amos/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/std/src/panicking.rs:343:19
  32: std::panic::catch_unwind
             at /home/amos/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/std/src/panic.rs:396:14
  33: tokio::runtime::task::harness::poll_future
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/task/harness.rs:409:19
  34: tokio::runtime::task::harness::Harness<T,S>::poll_inner
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/task/harness.rs:89:9
  35: tokio::runtime::task::harness::Harness<T,S>::poll
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/task/harness.rs:59:15
  36: tokio::runtime::task::raw::poll
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/task/raw.rs:104:5
  37: tokio::runtime::task::raw::RawTask::poll
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/task/raw.rs:66:18
  38: tokio::runtime::task::Notified<S>::run
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/task/mod.rs:171:9
  39: tokio::runtime::blocking::pool::Inner::run
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/blocking/pool.rs:278:17
  40: tokio::runtime::blocking::pool::Spawner::spawn_thread::{{closure}}
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/blocking/pool.rs:258:17
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.

Mh, sure enough, it's deep in the internals of tokio.

The actual assertion failing is:

Rust code
// (in tokio code, in `src/time/driver/entry.rs`)
            debug_assert!(cur_state < STATE_MIN_VALUE);

Huh, a debug_assert!. That means that in production...

Shell session
$ RUST_BACKTRACE=1 cargo run --quiet --release

(program never outputs anything, never exits either)

Hah. I guess they're not fucking around about the "undefined" part of "undefined behavior" huh?

Bear

No they are not. But let's think about why this fails. We already tried to implement our own sleep by spawning another thread that calls Waker::wake...

...but we also agreed that that's definitely not how tokio's sleep works. There's just no way it spawns one thread per sleep invocation. So I guess... it has some sort of timer system, and the first time a Sleep future is polled, it register itself with it...

Bear

Yes! And when it's dropped, it deregisters itself from it.

Right... and I'm assuming the timer system itself must point to the Sleep future somehow because well... when the timer runs out, it must know which future to wake up.

Bear

Yes! And if the future moves around...

...and it's not dropped... then the timer system points to the wrong memory location!

Bear

Exactly! And that's precisely what happened in that naughty program. sleep1 registered itself with the timer system, and then it switched places with sleep2.

Oh, so when the timer ran out, sleep2 was awakened instead?

Bear

Yes! And it didn't expect to be awakened. Even worse — when we awaited sleep2, it registered itself with the timer system, which created a second entry that pointed to the same address: what used to be sleep1, but was by that point sleep2.

And that's what made it hang?

Bear

Possibly. We'd have to read a bunch more tokio internals to determine exactly what kind of behavior we've triggered. Suffice to say: we don't want to do that.

I see. Ohhh and because some futures register themselves with some system of the executor, which points back to them, those futures should never be moved, and that's why they need to be pinned?

Bear

Yes! And everything is marked backwards, kind of. Every Future gets a pinned version of itself, and it can only be unpinned if it implements Unpin.

Okay okay okay. But wait, then why does Box::pin...?

Bear

Well, precisely because that's a heap allocation! If you're holding a Sleep on the stack, and you pass it around to another function, or store it in a struct, or whatever, it actually moves — its address changes.

But if you're holding a Box<Sleep>, well, then you're only holding a pointer to a Sleep that lives somewhere in heap-allocated memory. That somewhere will never change, ie. the Sleep itself will never move. The pointer to Sleep can be passed around, and everything is fine.

I see. So if we go back to our AsyncRead example... SlowRead can never be Unpin?

Bear

It can't! Because it contains a Sleep, which is not Unpin. Unless we go back to wrapping the Sleep in a Pin<Box<T>>>.

Oh, in which case the Sleep wouldn't really move when the SlowRead does, only a pointer to it would.

Bear

Exactly.

So what should we do? Should we be holding a Pin<Box<Sleep>>?

Bear

Well, it really depends what you want to do. It's true that it's not really convenient to use SlowRead right now, because we have to pin it first:

Rust code
#[tokio::main]
async fn main() -> Result<(), tokio::io::Error> {
    let mut buf = vec![0u8; 128 * 1024];
    let mut f = SlowRead::new(File::open("/dev/urandom").await?);
    let before = Instant::now();

    let mut f = unsafe { Pin::new_unchecked(&mut f) };
    f.read_exact(&mut buf).await?;

    println!("Read {} bytes in {:?}", buf.len(), before.elapsed());

    Ok(())
}
Bear

But, first of all — we don't have to write that unsafe code ourselves.

The pin_utils crate provides a safe macro to do exactly that: pin an owned value to the stack, shadowing its previous name.

TOML markup
# in `Cargo.toml`

[dependencies]
pin-utils = "0.1.0"
Rust code
#[tokio::main]
async fn main() -> Result<(), tokio::io::Error> {
    let mut buf = vec![0u8; 128 * 1024];
    let f = SlowRead::new(File::open("/dev/urandom").await?);
    let before = Instant::now();

    // 👇
    pin_utils::pin_mut!(f);
    f.read_exact(&mut buf).await?;

    println!("Read {} bytes in {:?}", buf.len(), before.elapsed());

    Ok(())
}
Bear

(Note that tokio ships with a similar macro, tokio::pin).

Secondly — if we find ourselves wanting to unpin a SlowRead, so we can pass it around and whatnot, we can just move the entire SlowRead to the heap first.

That's safe too:

Rust code
#[tokio::main]
async fn main() -> Result<(), tokio::io::Error> {
    let mut buf = vec![0u8; 128 * 1024];
    let f = SlowRead::new(File::open("/dev/urandom").await?);
    let before = Instant::now();

    // 👇
    let mut f = Box::pin(f);
    f.read_exact(&mut buf).await?;

    println!("Read {} bytes in {:?}", buf.len(), before.elapsed());

    Ok(())
}
Bear

And finally, I'd just like to note that here, we're wrapping the whole File in our SlowRead, but we don't have to! We could just as well wrap a mutable reference to it, which also implements AsyncRead.

That way, we can use the File for something else after:

Rust code
#[tokio::main]
async fn main() -> Result<(), tokio::io::Error> {
    let mut buf = vec![0u8; 128 * 1024];
    let mut f = File::open("/dev/urandom").await?;

    let sr = SlowRead::new(&mut f);
    pin_utils::pin_mut!(sr);

    let before = Instant::now();
    sr.read_exact(&mut buf).await?;
    println!("Read {} bytes in {:?}", buf.len(), before.elapsed());

    let before = Instant::now();
    f.read_exact(&mut buf).await?;
    println!("Read {} bytes in {:?}", buf.len(), before.elapsed());

    Ok(())
}
Shell session
$ cargo run --quiet
Read 131072 bytes in 395.2787ms
Read 131072 bytes in 5.1451ms

...but that makes it harder to pass around, right?

Bear

Right, yes! If we take a &mut File, we can pass it to a function for example, but storing it in a struct makes that struct immovable.

Usual lifetime business. Another option would be to add an into_inner method to our SlowRead, so we can give back the inner reader:

Rust code
impl<R> SlowRead<R>
where
    R: Unpin,
{
    fn into_inner(self) -> R {
        self.reader
    }
}
Bear

...but that's dangerous business, because that means we need to use the SlowRead unpinned after we've used it pinned. Which would normally be forbidden... except it's only !Unpin (not Unpin) because of Sleep, which gets dropped in that same method (by virtue of us taking self, ie. taking ownership of the SlowRead, and letting it fall out of scope).

In fact, pin_utils::pin_mut! prevents us from doing that altogether:

Rust code
#[tokio::main]
async fn main() -> Result<(), tokio::io::Error> {
    let mut buf = vec![0u8; 128 * 1024];
    let f = File::open("/dev/urandom").await?;

    let f = SlowRead::new(f);
    {
        pin_utils::pin_mut!(f);

        let before = Instant::now();
        f.read_exact(&mut buf).await?;
        println!("Read {} bytes in {:?}", buf.len(), before.elapsed());
    }

    // 👇 can't do that!
    let mut f = f.into_inner();

    let before = Instant::now();
    f.read_exact(&mut buf).await?;
    println!("Read {} bytes in {:?}", buf.len(), before.elapsed());

    Ok(())
}
Shell session
cargo check
    Checking manual-futures v0.1.0 (/home/amos/ftl/manual-futures)
error[E0382]: use of moved value: `f`
  --> src/main.rs:27:17
   |
18 |     let f = SlowRead::new(f);
   |         - move occurs because `f` has type `SlowRead<tokio::fs::File>`, which does not implement the `Copy` trait
19 |     {
20 |         pin_utils::pin_mut!(f);
   |         ----------------------- value moved here
...
27 |     let mut f = f.into_inner();
   |                 ^ value used here after move
Bear

...and Pin<Box<T>> wouldn't even let us access the T we need to call into_inner in the first place!

Correct usage would be tricky:

Rust code
#[tokio::main]
async fn main() -> Result<(), tokio::io::Error> {
    let mut buf = vec![0u8; 128 * 1024];
    let f = File::open("/dev/urandom").await?;
    let mut f = SlowRead::new(f);

    {
        let mut f = unsafe { Pin::new_unchecked(&mut f) };

        let before = Instant::now();
        f.read_exact(&mut buf).await?;
        println!("Read {} bytes in {:?}", buf.len(), before.elapsed());
    };

    let mut f = f.into_inner();

    let before = Instant::now();
    f.read_exact(&mut buf).await?;
    println!("Read {} bytes in {:?}", buf.len(), before.elapsed());

    Ok(())
}
Bear

...so it's probably not a good pattern. We could probably come up with a safer API, like that:

Rust code
use std::{
    future::Future,
    pin::Pin,
    task::{Context, Poll},
    time::Duration,
};

use tokio::{
    fs::File,
    io::{AsyncRead, AsyncReadExt, ReadBuf},
    time::{Instant, Sleep},
};

struct SlowRead<R> {
    //       👇 now optional!
    reader: Option<R>,
    sleep: Sleep,
}

impl<R> SlowRead<R> {
    fn new(reader: R) -> Self {
        Self {
            //       👇
            reader: Some(reader),
            sleep: tokio::time::sleep(Default::default()),
        }
    }
}

impl<R> SlowRead<R>
where
    R: Unpin,
{
    // 👇 now takes pinned mutable reference to Self, and returns an option
    fn take_inner(self: Pin<&mut Self>) -> Option<R> {
        unsafe { self.get_unchecked_mut().reader.take() }
    }
}

impl<R> AsyncRead for SlowRead<R>
where
    R: AsyncRead + Unpin,
{
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<std::io::Result<()>> {
        // pin-project both fields
        let (mut sleep, reader) = unsafe {
            let this = self.get_unchecked_mut();
            (Pin::new_unchecked(&mut this.sleep), &mut this.reader)
        };

        match sleep.as_mut().poll(cx) {
            Poll::Ready(_) => {
                sleep.reset(Instant::now() + Duration::from_millis(25));
                match reader {
                    Some(reader) => {
                        // pin-project option.
                        let reader = Pin::new(reader);
                        // note: no need for unsafe since R: Unpin! (thanks
                        // /u/novartole on reddit for catching this.)
                        reader.poll_read(cx, buf)
                    }
                    None => {
                        // simulate EOF
                        Poll::Ready(Ok(()))
                    }
                }
            }
            Poll::Pending => Poll::Pending,
        }
    }
}
Bear

And we could use it like this:

Rust code
#[tokio::main]
async fn main() -> Result<(), tokio::io::Error> {
    let mut buf = vec![0u8; 128 * 1024];
    let f = File::open("/dev/urandom").await?;

    let mut f = {
        let f = SlowRead::new(f);
        pin_utils::pin_mut!(f);

        let before = Instant::now();
        f.read_exact(&mut buf).await?;
        println!("Read {} bytes in {:?}", buf.len(), before.elapsed());

        f.take_inner().unwrap()
    };

    let before = Instant::now();
    f.read_exact(&mut buf).await?;
    println!("Read {} bytes in {:?}", buf.len(), before.elapsed());

    Ok(())
}
Bear

If we really needed an into_inner method that takes self, though, we may be better off boxing the Sleep so that the whole type becomes Unpin itself, which would make all of this a lot more ergonomic.

I just wanted to point out, pun fully intended, that there's options.

Gotcha.

Let's go back to this version of SlowRead for a second:

Rust code
struct SlowRead<R> {
    reader: R,
    sleep: Sleep,
}

impl<R> SlowRead<R> {
    fn new(reader: R) -> Self {
        Self {
            reader,
            sleep: tokio::time::sleep(Default::default()),
        }
    }
}

That has this AsyncRead implementation:

Rust code
impl<R> AsyncRead for SlowRead<R>
where
    R: AsyncRead + Unpin,
{
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<std::io::Result<()>> {
        // pin-project both fields
        let (mut sleep, reader) = unsafe {
            let this = self.get_unchecked_mut();
            (
                Pin::new_unchecked(&mut this.sleep),
                Pin::new_unchecked(&mut this.reader),
            )
        };

        match sleep.as_mut().poll(cx) {
            Poll::Ready(_) => {
                sleep.reset(Instant::now() + Duration::from_millis(25));
                reader.poll_read(cx, buf)
            }
            Poll::Pending => Poll::Pending,
        }
    }
}

Now... this could be worse, right? I'm sure there's worse implementations of AsyncRead out there. But also, I get the feeling it could be better.

Like.. without any usage of unsafe?

Bear

There's a crate for that!

Nothing in the standard library so far (Rust 1.51), but that's exactly what the pin-project crate is about.

Let's see..

TOML markup
# in `Cargo.toml`

[dependencies]
pin-project = "1.0"
Bear

The idea behind pin-project is that it does pin projection for you. That part:

Rust code
        // pin-project both fields
        let (mut sleep, reader) = unsafe {
            let this = self.get_unchecked_mut();
            (
                Pin::new_unchecked(&mut this.sleep),
                Pin::new_unchecked(&mut this.reader),
            )
        };
Bear

...and to ensure that you're using fields either always pinned or never pinned, you have to make that choice when designing the struct.

pin-project presents itself as a procedural attribute macro, so you can slap it on top of any struct, and then set the #[pin] attribute onto any field that you're planning on using as pinned.

In our case, that's both fields.

Rust code
use pin_project::pin_project;

#[pin_project]
struct SlowRead<R> {
    #[pin]
    reader: R,
    #[pin]
    sleep: Sleep,
}
Bear

And then, instead of doing pin projection ourselves, we call the auto-generated method project, which takes a Pin<&mut SlowRead<R>> and returns a struct that looks like this:

Rust code
struct SlowReadProjected<'a, R> {
    reader: Pin<&'a mut R>
    sleep: Pin<&'a mut Sleep>
}
Rust code
impl<R> AsyncRead for SlowRead<R>
where
    R: AsyncRead + Unpin,
{
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<std::io::Result<()>> {
        //       👇            👇
        let mut this = self.project();

        match this.sleep.as_mut().poll(cx) {
            Poll::Ready(_) => {
                this.sleep.reset(Instant::now() + Duration::from_millis(25));
                this.reader.poll_read(cx, buf)
            }
            Poll::Pending => Poll::Pending,
        }
    }
}
Bear

And boom, no more unsafe.

And we can never accidentally use self.sleep or self.reader unpinned (without unsafe code).

And it still works.

Shell session
$ grep "unsafe" -Rn src
$ cargo run --quiet
Read 131072 bytes in 393.9761ms
Bear

Of course, that's a fairly simple case of pin-projection. It gets hairier. A lot hairier. Partially-pinned state machines are... not fun. For now!

Whoa. Thanks cool bear!

Bear

Don't mention it 😎

If you liked what you saw, please support my work!

Github logo Donate on GitHub Patreon logo Donate on Patreon

Here's another article just for you:

I won free load testing

Long story short: a couple of my articles got really popular on a bunch of sites, and someone, somewhere, went "well, let's see how much traffic that smart-ass can handle", and suddenly I was on the receiving end of a couple DDoS attacks.

It really doesn't matter what the articles were about — the attack is certainly not representative of how folks on either side of any number of debates generally behave.