Pin and suffering

👋 This page was last updated ~4 years ago. Just so you know.

I'd like to think that my understanding of "async Rust" has increased over the past year or so. I'm 100% onboard with the basic principle: I would like to handle thousands of concurrent tasks using a handful of threads. That sounds great!

And to become proficient with async Rust, I've accepted a lot of things. There are blue functions and red functions, and red (async) functions are contagious.

Cool bear

Cool bear's hot tip

Eh, not exactly — you can build an executor in a blue (sync) function to block until a red (async) function is done.

And from a red (async) function, you can use something like tokio::task::spawn_blocking.

I guess the difficulty is when you go from red to blue, back to red — because you can't make an executor inside an executor.

See bear, this is what I'm talking about: there's just so much I have to take for granted. "Don't block the executor", "futures do nothing unless polled", "you need to pin a future before you can poll it", etc.

Cool bear

Well, yes? Mostly those two?

But bear, everything was so simple in synchronous Rust, and suddenly I have to uphold all these rules? I have to think about these?

Here let me show you. Say, in synchronous Rust, I just want to print hello, sleep, and then print goodbye.

I can just do this!

use std::{thread::sleep, time::Duration};

fn main() {
    println!("Hello!");
    sleep(Duration::from_millis(500));
    println!("Goodbye!");
}
$ cargo run --quiet
Hello!
(500ms elapse...)
Goodbye!

It's so simple!

Cool bear

Well, the async version isn't that hard either. Try out tokio for example.

Okay, well, sure:

# in Cargo.toml

[dependencies]
tokio = { version = "1.4.0", features = ["full"] }
use std::{thread::sleep, time::Duration};

#[tokio::main]
async fn main() {
    println!("Hello!");
    sleep(Duration::from_millis(500));
    println!("Goodbye!");
}
$ cargo run --quiet
Hello!
(500ms elapse...)
Goodbye!

Ah, you're right! That was pretty simple.

Cool bear

Well, it's also wrong. You're blocking the executor.

What? I didn't see any blockage going on. In fact, it behaved exactly the same as the synchronous version. Seems okay to me.

Cool bear

Well, try spawning two tasks that do that, you'll see.

Okay, sure:

use std::{thread::sleep, time::Duration};

#[tokio::main]
async fn main() {
    let one = tokio::spawn(greet());
    let two = tokio::spawn(greet());
    let (_, _) = tokio::join!(one, two);
}

async fn greet() {
    println!("Hello!");
    sleep(Duration::from_millis(500));
    println!("Goodbye!");
}
$ cargo run --quiet
Hello!
Hello!
(500ms elapse...)
Goodbye!
Goodbye!

There! It just works! Thanks bear, I don't know what I was complaining about, async Rust really is quite easy.

Cool bear

No, you j- sigh okay, try switching to the single-threaded executor.

The what? Oh, oh wait a minute, I see it, I can just pass a flavor argument to the tokio::main attribute macro:

//              👇
#[tokio::main(flavor = "current_thread")]
async fn main() {
    let one = tokio::spawn(greet());
    let two = tokio::spawn(greet());
    let (_, _) = tokio::join!(one, two);
}

// omitted: everything else
$ cargo run --quiet
Hello!
(500ms elapse...)
Goodbye!
Hello!
(500ms elapse...)
Goodbye!

Ah.

So we are blocking the executor. But see, that's complicated, I don't want to have to think about all that!

For me sleep is just a syscall. You call it and boom! It sleeps. No need to worry about anything.

Cool bear

Well, you could just not block the executor?

But how?

Cool bear

Well, tokio has its own sleep method.

And that one doesn't block? I mean okay, sure, if you say so:

use std::time::Duration;
use tokio::time::sleep;

#[tokio::main(flavor = "current_thread")]
async fn main() {
    let one = tokio::spawn(greet());
    let two = tokio::spawn(greet());
    let (_, _) = tokio::join!(one, two);
}

async fn greet() {
    println!("Hello!");
    sleep(Duration::from_millis(500)).await;
    println!("Goodbye!");
}
$ cargo run --quiet
Hello!
Hello!
(500ms elapse...)
Goodbye!
Goodbye!

Huh. So that one doesn't block the executor. Okay then.

But, see bear, I don't know how that all works. It's all just voodoo to me. One function blocks the executor, another doesn't, why? How does it actually work, under the hood?

Cool bear

Well... it's not that complicated really.

The sleep in std does just call the sleep syscall, pretty much.

Yes, that's simple. I like that.

Cool bear

Whereas the sleep in tokio returns a Future... that registers a timer when you first poll it... and it only completes when the deadline is reached.

I uh... none of that made any sense. I don't remember "polling" anything. And you say it... "registers" a timer? As in, global state? And what do you mean by "complete"?

Cool bear

Sigh. Okay, let's start small.

The Future type.

Cool bear

Future is just a trait. Anything can be a Future if it wants to.

We can make Futures?

Cool bear

Sure! If you want to.

Yeah why not, if it can shed some light on the whole thing, I'll make a future.

So I guess since it's a trait... I know traits. We can just implement it on any type? Like an empty struct?

Cool bear

Yeah! Go ahead!

use std::future::Future;

#[tokio::main(flavor = "current_thread")]
async fn main() {
    todo!()
}

struct MyFuture {}

impl Future for MyFuture {}

Ok, the compiler is complaining that not all trait items are implemented: it wants Output and poll. What now?

Cool bear

Now, use the rust-analyzer "Implement missing members" quick fix to generate those.

impl Future for MyFuture {
    type Output;

    fn poll(
        self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Self::Output> {
        todo!()
    }
}

Ew, long types.

Cool bear

...just use rust-analyzer's "Replace qualified path with use" quick fix on those long types.

use std::{
    future::Future,
    pin::Pin,
    task::{Context, Poll},
};

struct MyFuture {}

impl Future for MyFuture {
    type Output;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        todo!()
    }
}

Okay, better! And now what?

Cool bear

Well, first you need to pick your output type. What do you want your future to, well, output?

Nothing special?

Cool bear

So, nothing. You can just use the empty tuple: ().

impl Future for MyFuture {
    type Output = ();

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        todo!()
    }
}

Okay done! Now what?

Cool bear

Now we await it!

But it's not done?

Cool bear

That's okay, we just await it anyway!

Hum, okay:

#[tokio::main(flavor = "current_thread")]
async fn main() {
    let fut = MyFuture {};
    fut.await
}

This feels really dumb, but here goes:

$ cargo run --quiet
thread 'main' panicked at 'not yet implemented', src/main.rs:19:9
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

Yeah, well...

Cool bear

...but that's good! That means your future was polled!

Mh? Who polled it?

Hang on, maybe we can get a stack trace...

$ RUST_BACKTRACE=1 cargo run --quiet
thread 'main' panicked at 'not yet implemented', src/main.rs:19:9
stack backtrace:
   0: rust_begin_unwind
             at /rustc/cb75ad5db02783e8b0222fee363c5f63f7e2cf5b/library/std/src/panicking.rs:493:5
   1: core::panicking::panic_fmt
             at /rustc/cb75ad5db02783e8b0222fee363c5f63f7e2cf5b/library/core/src/panicking.rs:92:14
   2: core::panicking::panic
             at /rustc/cb75ad5db02783e8b0222fee363c5f63f7e2cf5b/library/core/src/panicking.rs:50:5
   3: <manual_futures::MyFuture as core::future::future::Future>::poll
             at ./src/main.rs:19:9
   4: manual_futures::main::{{closure}}
             at ./src/main.rs:10:5
   5: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
             at /home/amos/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/future/mod.rs:80:19
   6: <core::pin::Pin<P> as core::future::future::Future>::poll
             at /home/amos/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/future/future.rs:119:9
   7: tokio::runtime::basic_scheduler::Inner<P>::block_on::{{closure}}::{{closure}}
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/basic_scheduler.rs:196:62
   8: tokio::coop::with_budget::{{closure}}
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/coop.rs:106:9
   9: std::thread::local::LocalKey<T>::try_with
             at /home/amos/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/std/src/thread/local.rs:272:16
  10: std::thread::local::LocalKey<T>::with
             at /home/amos/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/std/src/thread/local.rs:248:9
  11: tokio::coop::with_budget
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/coop.rs:99:5
  12: tokio::coop::budget
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/coop.rs:76:5
  13: tokio::runtime::basic_scheduler::Inner<P>::block_on::{{closure}}
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/basic_scheduler.rs:196:39
  14: tokio::runtime::basic_scheduler::enter::{{closure}}
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/basic_scheduler.rs:279:29
  15: tokio::macros::scoped_tls::ScopedKey<T>::set
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/macros/scoped_tls.rs:61:9
  16: tokio::runtime::basic_scheduler::enter
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/basic_scheduler.rs:279:5
  17: tokio::runtime::basic_scheduler::Inner<P>::block_on
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/basic_scheduler.rs:185:9
  18: tokio::runtime::basic_scheduler::InnerGuard<P>::block_on
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/basic_scheduler.rs:425:9
  19: tokio::runtime::basic_scheduler::BasicScheduler<P>::block_on
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/basic_scheduler.rs:145:24
  20: tokio::runtime::Runtime::block_on
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/mod.rs:450:46
  21: manual_futures::main
             at ./src/main.rs:7:1
  22: core::ops::function::FnOnce::call_once
             at /home/amos/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/ops/function.rs:227:5
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.

Ok, uhh the... tokio scheduler? Is the one that polls in?

Cool bear

Yes! That's what your code is doing.

Mh? It certainly isn't — I haven't even built a tokio::runtime::Runtime.

Cool bear

Yeah, tokio::main just did it for you! In fact, your main function here:

#[tokio::main(flavor = "current_thread")]
async fn main() {
    let fut = MyFuture {};
    fut.await
}
Cool bear

...is roughly equivalent to this:

fn main() {
    let rt = tokio::runtime::Builder::new_current_thread()
        .build()
        .unwrap();
    let fut = MyFuture {};
    rt.block_on(fut);
}

Ah! I see. So... with tokio::main, main is an async function, which... is what exactly?

Cool bear

An async is actually just a synchronous function that returns a future!

So the whole body of main is a Future, too?

Cool bear

Yes!

Okay, okay, well, I don't really want to build a Runtime myself, I'm fine with the tokio::main version for now.

I want to go back to MyFuture. How do I make it do something?

Cool bear

Well, by returning! You can see that it's supposed to a return a Poll, which is an enum with two variants.

The first variant is Ready, which..

..which I assume we return when the future is complete, okay. So if I do this:

use std::{
    future::Future,
    pin::Pin,
    task::{Context, Poll},
};

#[tokio::main]
async fn main() {
    let fut = MyFuture {};
    println!("Awaiting fut...");
    fut.await;
    println!("Awaiting fut... done!");
}

struct MyFuture {}

impl Future for MyFuture {
    type Output = ();

    fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
        // 👇
        Poll::Ready(())
    }
}

...then it should complete immediately:

$ cargo run --quiet
Awaiting fut...
Awaiting fut... done!
Cool bear

Correct! And the other variant is Pending, which you should return if your future is not quite done yet.

Mh? Then how will it ever complete?

Cool bear

Well, Future::poll will be called again later!

It will? Okay!

impl Future for MyFuture {
    type Output = ();

    fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
        // 👇
        Poll::Pending
    }
}
$ cargo run --quiet
Awaiting fut...
(nothing seems to happen)

Mhhh, bear? The program seems... stuck. Are we blocking the executor again?

Cool bear

Oh, no. It's not blocked. Well, the actual, generated, synchronous main function is blocked on the asynchronous main's future — it's waiting for it to complete, and it's never completing.

So it's not getting polled again? But you just told me.. hang on, let me just add some debug prints:

impl Future for MyFuture {
    type Output = ();

    fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
        // 👇
        println!("MyFuture::poll()");
        Poll::Pending
    }
}
$ cargo run --quiet
Awaiting fut...
MyFuture::poll()
(nothing happens, still. the program continues running forever)

Yeah! It's only getting polled once! What gives, bear? I thought you told me it would get polled again?

Cool bear

Ahh well it only gets polled again if it registered to be polled again. Did you think it would be polled in a loop? Imagine if you're trying to read from a socket and the other peer is not sending anything for five seconds.

The read future would be polled and polled and polled in a busy loop for five seconds. It would consume an entire CPU core! No, futures are only "awakened" when something happens.

Something? What kind of thing?

Cool bear

Something interesting! Like, a timer running out. Or a file being ready to read from. That kind of thing.

Ah. That kind of thing. Okay, so say I want my future to be "awakened" after one second. How would I do that?

Cool bear

Well, you see that cx argument? Of type Context? That's how.

Ah, right — the poll method takes two arguments. The receiver is... some form of self, so, MyFuture, and the second argument is a &mut Context.

Let's see what it has... it has... a waker() method that returns a &Waker! That looks interesting.

Cool bear

It sure does!

And Waker has... wake and wake_by_ref methods. Well, we can't call wake at all because it wants to take ownership of Waker, and all we have is an immutable reference to it.

Let's try wake_by_ref I guess?

impl Future for MyFuture {
    type Output = ();

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        println!("MyFuture::poll()");
        cx.waker().wake_by_ref();
        Poll::Pending
    }
}
$ cargo run --quiet
Awaiting fut...
MyFuture::poll()
MyFuture::poll()
MyFuture::poll()
MyFuture::poll()
MyFuture::poll()
MyFuture::poll()
MyFuture::poll()
MyFutur^C
(that line repeats very quickly, and only stops when we hit Ctrl-C)

Heyyyy. Now it is a busy loop! It gets polled as fast as it can.

Cool bear

Correct!

But that's still not what I want. What I want is... for my future to be polled again after one second.

Cool bear

So? You have a Waker. You can wake it up yourself.

No, because I return, so by that time it's too late.

Cool bear

So??? Just use a thread! You were just going on about how threads are nice and easy and you understood them, blah blah blah. Just use a thread.

But I only have a reference to a Waker, I can't just.. oh would you look at that, it implements Clone. Okay. OKAY. I guess I'll just spawn a thread.

impl Future for MyFuture {
    type Output = ();

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        println!("MyFuture::poll()");

        let waker = cx.waker().clone();
        std::thread::spawn(move || {
            std::thread::sleep(Duration::from_secs(1));
            waker.wake();
        });

        Poll::Pending
    }
}
$ cargo run --quiet
Awaiting fut...
MyFuture::poll()
(1 second elapses...)
MyFuture::poll()
(another second elapses...)
MyFuture::poll()
(a third second elapses...)
MyFuture::poll()
^C

AhAH! Interesting. Well, eventually I guess I want to return ready so all I need to do is maintain some state, let's see...

#[tokio::main]
async fn main() {
    let fut = MyFuture::new();
    println!("Awaiting fut...");
    fut.await;
    println!("Awaiting fut... done!");
}

struct MyFuture {
    slept: bool,
}

impl MyFuture {
    fn new() -> Self {
        Self { slept: false }
    }
}

impl Future for MyFuture {
    type Output = ();

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        println!("MyFuture::poll()");

        match self.slept {
            false => {
                // make sure we're polled again in one second
                let waker = cx.waker().clone();
                std::thread::spawn(move || {
                    std::thread::sleep(Duration::from_secs(1));
                    waker.wake();
                });
                self.slept = true;

                Poll::Pending
            }
            true => Poll::Ready(()),
        }
    }
}

No, wait — it's complaining that self is not mutable. Uhh... I can write it mut self if that'd help?

impl Future for MyFuture {
    type Output = ();

    //       👇
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // etc.
    }
}

Ah. That did help. Jolly good.

$ cargo run --quiet
Awaiting fut...
MyFuture::poll()
(one second elapses)
MyFuture::poll()
Awaiting fut... done!

Heyyyyyyy. That worked! Thanks bear!

Cool bear

You're welcome. See? It's not that complicated!

It really isn't. Hey WAIT A MINUTE. We're just back to using threads, sort of.

I don't want to spin up a new thread every time I want to wait. That seems really excessive. Surely async Rust™ has something to solve that?

Cool bear

Well yes! In this case, for this executor, it's tokio::time::sleep.

But that's what we were using in the first pl-

Cool bear

Yes, yes, that's what we were using. But we can embed it within MyFuture, so you can still see the machinery.

Machinery, yes, good! Okay, how do I do that.

Cool bear

Well, I don't know if you've noticed but tokio::time::sleep returns a concrete type: Sleep.

AhAH! So if I just store that as a field of MyFuture, all I have to do is poll it, from within my own poll method?

Cool bear

That's the gist, yes.

use tokio::time::Sleep;

struct MyFuture {
    sleep: Sleep,
}

impl MyFuture {
    fn new() -> Self {
        Self {
            sleep: tokio::time::sleep(Duration::from_secs(1)),
        }
    }
}

impl Future for MyFuture {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        println!("MyFuture::poll()");
        self.sleep.poll(cx)
    }
}

Ah, great! It even has the same exact return type as MyFuture::pollPoll<()>, so the method body is really short and sweet.

There's just uh... just one tiny problem. It doesn't compile:

$ cargo check
    Checking manual-futures v0.1.0 (/home/amos/ftl/manual-futures)
error[E0599]: no method named `poll` found for struct `Sleep` in the current scope
  --> src/main.rs:35:20
   |
35 |         self.sleep.poll(cx)
   |                    ^^^^ method not found in `Sleep`

error: aborting due to previous error

For more information about this error, try `rustc --explain E0599`.
error: could not compile `manual-futures`

To learn more, run the command again with --verbose.

Ohhh! I know! poll is a method on trait Future, so I need to import it to be able to call it... wait no, we've already imported it.

Beaaaaaaaaaar what's wrong with my code?

Cool bear

Look at the receiver! It's not self, it's not &self, it's not even &mut self.

It's self: Pin<&mut Self>.

Mh. Okay. How do we build a Pin... it has a new method. Let's try that.

impl Future for MyFuture {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        println!("MyFuture::poll()");
        //                 👇
        let mut sleep = Pin::new(&mut self.sleep);
        sleep.poll(cx)
    }
}
$ cargo check
    Checking manual-futures v0.1.0 (/home/amos/ftl/manual-futures)
error[E0277]: `PhantomPinned` cannot be unpinned
  --> src/main.rs:35:25
   |
35 |         let mut sleep = Pin::new(&mut self.sleep);
   |                         ^^^^^^^^ within `tokio::time::driver::sleep::_::__Origin<'_>`, the trait `Unpin` is not implemented for `PhantomPinned`
   |
   = note: required because it appears within the type `tokio::time::driver::entry::TimerEntry`
   = note: required because it appears within the type `tokio::time::driver::sleep::_::__Origin<'_>`
   = note: required because of the requirements on the impl of `Unpin` for `Sleep`
   = note: required by `Pin::<P>::new`

That does not work. Bear, halp. What's happening?

Cool bear

Bwahahahaha. Just Box::pin it!

Just wh-?

Cool bear

In the constructor, just wrap sleep() in Box::pin().

Uh, okay... that returns a Pin<Box<Sleep>> so I guess I'll have to change the field type too...

struct MyFuture {
    sleep: Pin<Box<Sleep>>,
}

impl MyFuture {
    fn new() -> Self {
        Self {
            sleep: Box::pin(tokio::time::sleep(Duration::from_secs(1))),
        }
    }
}

impl Future for MyFuture {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        println!("MyFuture::poll()");
        let sleep = Pin::new(&mut self.sleep);
        sleep.poll(cx)
    }
}

Oh hey, that compiles.

$ cargo run --quiet
Awaiting fut...
MyFuture::poll()
(one second elapses...)
MyFuture::poll()
Awaiting fut... done!

Oh heyy! That runs!!

Cool bear

Yes! And you honestly don't need to build that Pin yourself, because Pin<Box<Sleep>> is already pinned — so you're building a Pin<&mut Pin<Box<Sleep>>>.

You can just call sleep.as_mut(), and poll that.

impl Future for MyFuture {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        println!("MyFuture::poll()");
        self.sleep.as_mut().poll(cx)
    }
}

Ah, cleaner. That still works.

Cool bear

You can even go one step further and add the futures crate so you can use FutureExt::poll_unpin, because polling an Unpin future is a common operation.

I can?

# in Cargo.toml

[dependencies]
futures = "0.3"
use futures::FutureExt;

impl Future for MyFuture {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        println!("MyFuture::poll()");
        self.sleep.poll_unpin(cx)
    }
}

Neat! That still works.

At this point though, I can't help but wonder — do we really need to understand all this? Like, it's neat that we did it once, don't get me wrong.

But in the real world, is there ever a point in manually implementing futures like that? Don't we have the async / await keywords so that we don't have to?

Cool bear

Well! In this case, yes, it's gratuitous. But let's say we want to do something different...

Let's say we want to make a type that implements tokio's AsyncRead interface (so that we can use it anywhere we would use another reader), but that artificially introduces some delay between each read.

Mhyes, okay. Can't we just use an async method here?

Cool bear

You'd think so, but no. Not currently. See, async Rust is still in its awkward teenage phase. Almost everything is there, but it can be a bit of a challenge to live with.

What uh.. what does that mean?

Cool bear

Well, AsyncRead is a trait. And traits can't have async methods. (At the time of this writing, ie. with Rust 1.51).

Okay, so just have them return a Future then? That's all async methods are, right?

Cool bear

Right, yes, but! We're always reading into something. A buffer. In synchronous land, when we do this:

let mut buf = vec![0u8; 1024];
let n = something.read(&mut buf)?;

...the read method borrows buf mutably until it's done with it.

But the problem with async...

...is that we return early if we're not quite ready yet. We just return Poll::Pending and wait until we're awakened by the scheduler. Ohhhhh and just because we return doesn't mean that we're done with the buffer!

Cool bear

...rude, but yes.

So that's not how AsyncRead works. It does give you a buffer, but you're supposed to either:

  • Not be ready, and so you subscribe to be awakened later, and return Poll::Pending
  • Be ready, and then you fill part (or all) of the buffer, and return Poll::Ready

And if we're not ready, what do we do with the buffer?

Cool bear

Nothing. Especially not write into it. It would be disregarded anyway.

Gotcha. Okay, so, we know how to sleep by polling a Sleep future, I think we know enough to cook us a slow reader...

It's going to be a struct (it's always a struct), and it'll work with any kind of reader.

struct SlowRead<R> {
    reader: R,
}

impl<R> SlowRead<R> {
    fn new(reader: R) -> Self {
        Self { reader }
    }
}

And then we just have to implement AsyncRead for SlowRead itself! But only if its type parameter R also implements AsyncRead:

use tokio::io::{AsyncRead, ReadBuf};

impl<R> AsyncRead for SlowRead<R>
where
    R: AsyncRead,
{
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<std::io::Result<()>> {
        self.reader.poll_read(cx, buf)
    }
}

Okay! Well, this doesn't build:

$ cargo check
    Checking manual-futures v0.1.0 (/home/amos/ftl/manual-futures)
error[E0599]: no method named `poll_read` found for type parameter `R` in the current scope
  --> src/main.rs:29:21
   |
29 |         self.reader.poll_read(cx, buf)
   |                     ^^^^^^^^^ method not found in `R`
   |
   = help: items from traits can only be used if the type parameter is bounded by the trait
help: the following trait defines an item `poll_read`, perhaps you need to restrict type parameter `R` with it:
   |
20 | impl<R: AsyncRead> AsyncRead for SlowRead<R>
   |      ^^^^^^^^^^^^

And the compiler is getting confused. We're already constraining R to implement AsyncRead. I guess this is just like futures, where the receiver isn't quit ri-

Cool bear

Yup. It's exactly like that.

So we need to pin it before we read fr-

Cool bear

Yes. You need to pin it.

Okay, okay, I'm pinning it. Box-pinning it even.

struct SlowRead<R> {
    reader: Pin<Box<R>>,
}

impl<R> SlowRead<R> {
    fn new(reader: R) -> Self {
        Self {
            reader: Box::pin(reader),
        }
    }
}

impl<R> AsyncRead for SlowRead<R>
where
    R: AsyncRead,
{
    fn poll_read(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<std::io::Result<()>> {
        self.reader.as_mut().poll_read(cx, buf)
    }
}

Okay, had to sneak an as_mut in there, but it builds. What now?

Cool bear

Well, now we try reading! First, without SlowRead...

use std::{
    pin::Pin,
    task::{Context, Poll},
};
use tokio::{
    fs::File,
    io::{AsyncRead, AsyncReadExt, ReadBuf},
    time::Instant,
};

#[tokio::main]
async fn main() -> Result<(), tokio::io::Error> {
    let mut buf = vec![0u8; 128 * 1024];
    let mut f = File::open("/dev/urandom").await?;
    let before = Instant::now();
    f.read_exact(&mut buf).await?;
    println!("Read {} bytes in {:?}", buf.len(), before.elapsed());

    Ok(())
}

// omitted: `SlowRead` declaration and implementation
$ cargo run --quiet
Read 131072 bytes in 3.2748ms
Cool bear

...and now with SlowRead!

#[tokio::main]
async fn main() -> Result<(), tokio::io::Error> {
    let mut buf = vec![0u8; 128 * 1024];
    //             👇
    let mut f = SlowRead::new(File::open("/dev/urandom").await?);
    let before = Instant::now();
    f.read_exact(&mut buf).await?;
    println!("Read {} bytes in {:?}", buf.len(), before.elapsed());

    Ok(())
}
$ cargo run --quiet
Read 131072 bytes in 3.3134ms

Well, it's not... really slower.

Cool bear

Well we're just forwarding poll_read calls straight to the inner reader for now. We're not actually slowing it down.

Oh right! So I guess we can just call std::thread::sleep in poll and...

Cool bear

No, no! That would bl-

...block the executor, right. Oh, wait, poll_read gives us a &mut Context as well, so we could poll a Sleep from there, right?

Cool bear

Exactly! And Sleep has a handy reset method, so you can use it several times.

As in sleep several times in a row with the same future? Sounds good!

I guess we'll need to pin that one too...

struct SlowRead<R> {
    reader: Pin<Box<R>>,
    // 👇
    sleep: Pin<Box<Sleep>>,
}

impl<R> SlowRead<R> {
    fn new(reader: R) -> Self {
        Self {
            reader: Box::pin(reader),
            // 👇
            sleep: Box::pin(tokio::time::sleep(Default::default())),
        }
    }
}

impl<R> AsyncRead for SlowRead<R>
where
    R: AsyncRead,
{
    fn poll_read(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<std::io::Result<()>> {
        // 👇
        match self.sleep.poll_unpin(cx) {
            Poll::Ready(_) => {
                // whenever `sleep` completes, reset it...
                self.sleep
                    .as_mut()
                    .reset(Instant::now() + Duration::from_millis(25));
                // and poll the inner reader.
                self.reader.as_mut().poll_read(cx, buf)
            }
            Poll::Pending => Poll::Pending,
        }
    }
}
$ cargo run --quiet
Read 131072 bytes in 396.3634ms

Woo!! We did a thing!

Cool bear

We did!

We're still boxing everything though.

Cool bear

Yes, that's "easy mode async". Just like Arc<Mutex<T>> is "easy mode lifetimes", sort of. It's fine though! It does work!

Yeah, it does appear to work. But I have no idea why anything needs to be pinned. Nor what Unpin is. Let alone phantoms, which uhh..

Cool bear

No, and you didn't need to! You only wrote safe code, and you made sure the types matched up, even if you needed to box them or sneak in an as_mut call here and there. And everything worked.

Yes, right — but I'm assuming that Pin exists for a reason. It's not just complexity for complexity's sake, it's inherent complexity. But for what?

Cool bear

Well... follow me.

We can leave our Box behind

Cool bear

Okay, let's start with R! You don't need to store it pinned.

I don't? So just do this?

struct SlowRead<R> {
    //      👇
    reader: R,
    sleep: Pin<Box<Sleep>>,
}

impl<R> SlowRead<R> {
    fn new(reader: R) -> Self {
        Self {
            // 👇
            reader,
            sleep: Box::pin(tokio::time::sleep(Default::default())),
        }
    }
}

But then...

$ cargo check
    Checking manual-futures v0.1.0 (/home/amos/ftl/manual-futures)
error[E0599]: no method named `poll_read` found for type parameter `R` in the current scope
  --> src/main.rs:52:29
   |
52 |                 self.reader.poll_read(cx, buf)
   |                             ^^^^^^^^^ method not found in `R`
   |
   = help: items from traits can only be used if the type parameter is bounded by the trait
help: the following traits define an item `poll_read`, perhaps you need to restrict type parameter `R` with one of them:
   |
38 | impl<R: futures::AsyncRead> AsyncRead for SlowRead<R>
   |      ^^^^^^^^^^^^^^^^^^^^^
38 | impl<R: tokio::io::AsyncRead> AsyncRead for SlowRead<R>
   |      ^^^^^^^^^^^^^^^^^^^^^^^

(...and the compiler gets confused again. Again, we've already constrained R to tokio::io::AsyncRead).

The problem is, I have an R, and I need a Pin<&mut R>. And I can't use Pin::new apparently, at least that didn't work last time.

Cool bear

But maybe it'll work this time?

Okay, sure:

impl<R> AsyncRead for SlowRead<R>
where
    R: AsyncRead,
{
    fn poll_read(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<std::io::Result<()>> {
        match self.sleep.poll_unpin(cx) {
            Poll::Ready(_) => {
                self.sleep
                    .as_mut()
                    .reset(Instant::now() + Duration::from_millis(25));
                // 👇
                Pin::new(&mut self.reader).poll_read(cx, buf)
            }
            Poll::Pending => Poll::Pending,
        }
    }
}
$ cargo check
    Checking manual-futures v0.1.0 (/home/amos/ftl/manual-futures)
error[E0277]: `R` cannot be unpinned
  --> src/main.rs:52:26
   |
52 |                 Pin::new(&mut self.reader).poll_read(cx, buf)
   |                          ^^^^^^^^^^^^^^^^ the trait `Unpin` is not implemented for `R`
   |
   = note: required by `Pin::<P>::new`
help: consider further restricting this bound
   |
40 |     R: AsyncRead + Unpin,
   |                  ^^^^^^^

No, see, it doesn't work eith-

Cool bear

Ah, but there's a hint!

Consider... further restricting this bound. We can restrict our AsyncRead implementation only for R types that can be unpinned?

Cool bear

Yes! ie., we need R to implement the Unpin trait, or in other words, we need R: Unpin.

Okay, sure... I mean, I don't even know if that's worth exploring, who knows if the concrete R, which in this case is... tokio::fs::File, is even Unpin, but okay, sure:

impl<R> AsyncRead for SlowRead<R>
// 👇
where
    R: AsyncRead + Unpin,
{
    // omitted: fn `poll_read`
}
$ cargo run --quiet
Read 131072 bytes in 394.8455ms

Oh hey! That worked!

Cool bear

It sure did! tokio::fs::File does implement Unpin, so it can be unpinned.

Well I'm still not sure what "can be unpinned" even means, but if you say so, okay.

Now what about Sleep? Can we just do the same trick?

First we remove the Pin<Box<T>> / Box::pin:

struct SlowRead<R> {
    reader: R,
    // 👇
    sleep: Sleep,
}

impl<R> SlowRead<R> {
    fn new(reader: R) -> Self {
        Self {
            reader,
            // 👇
            sleep: tokio::time::sleep(Default::default()),
        }
    }
}

And then... well then we're in a pickle:

error[E0277]: `PhantomPinned` cannot be unpinned
  --> src/main.rs:47:26
   |
47 |         match self.sleep.poll_unpin(cx) {
   |                          ^^^^^^^^^^ within `tokio::time::driver::sleep::_::__Origin<'_>`, the trait `Unpin` is not implemented for `PhantomPinned`
   |
   = note: required because it appears within the type `tokio::time::driver::entry::TimerEntry`
   = note: required because it appears within the type `tokio::time::driver::sleep::_::__Origin<'_>`
   = note: required because of the requirements on the impl of `Unpin` for `Sleep`
Cool bear

Yes! tokio::time::Sleep does not implement Unpin, ie. it cannot be unpinned.

But then how... because clearly Pin::new is not going to work either in this case:

impl<R> AsyncRead for SlowRead<R>
where
    R: AsyncRead + Unpin,
{
    fn poll_read(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<std::io::Result<()>> {
        //             👇
        let sleep = Pin::new(&mut self.sleep);
        //      👇
        match sleep.poll(cx) {
            Poll::Ready(_) => {
                //             👇
                let sleep = Pin::new(&mut self.sleep);
                // 👇
                sleep.reset(Instant::now() + Duration::from_millis(25));
                Pin::new(&mut self.reader).poll_read(cx, buf)
            }
            Poll::Pending => Poll::Pending,
        }
    }
}
error[E0277]: `PhantomPinned` cannot be unpinned
  --> src/main.rs:47:21
   |
47 |         let sleep = Pin::new(&mut self.sleep);
   |                     ^^^^^^^^ within `tokio::time::driver::sleep::_::__Origin<'_>`, the trait `Unpin` is not implemented for `PhantomPinned`
   |
   = note: required because it appears within the type `tokio::time::driver::entry::TimerEntry`
   = note: required because it appears within the type `tokio::time::driver::sleep::_::__Origin<'_>`
   = note: required because of the requirements on the impl of `Unpin` for `Sleep`
   = note: required by `Pin::<P>::new`

So then what do we do?

Cool bear

Well, there's another constructor for Pin... you can always call Pin::new_unchecked.

Mh? Like that?

impl<R> AsyncRead for SlowRead<R>
where
    R: AsyncRead + Unpin,
{
    fn poll_read(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<std::io::Result<()>> {
        //                 👇
        let sleep = Pin::new_unchecked(&mut self.sleep);
        match sleep.poll(cx) {
            Poll::Ready(_) => {
                //                 👇
                let sleep = Pin::new_unchecked(&mut self.sleep);
                sleep.reset(Instant::now() + Duration::from_millis(25));
                Pin::new(&mut self.reader).poll_read(cx, buf)
            }
            Poll::Pending => Poll::Pending,
        }
    }
}

But..

error[E0133]: call to unsafe function is unsafe and requires unsafe function or block
  --> src/main.rs:47:21
   |
47 |         let sleep = Pin::new_unchecked(&mut self.sleep);
   |                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ call to unsafe function
   |
   = note: consult the function's documentation for information on how to avoid undefined behavior

Wait wait wait wait. It's unsafe? Eww!

Cool bear

It is! And that means there are some invariants that the compiler will no longer be enforcing for us, so we'll have to "just be careful". Which is something we never have to do in Rust, except for when we use unsafe.

Ughhh. Do we have to?

Cool bear

For the purpose of this exercise, yes. I'm about to tell you the rules we should follow. Are you ready?

Shoot.

Cool bear

Once we pin something, ie. once we construct a Pin<&mut T> of it, we can never use it unpinned (ie, as &mut T) ever again, unless it implements Unpin.

But Sleep doesn't implement Unpin.

Cool bear

Yes! Which means that, since we need to use it as a Pin<&mut T> once, we can only ever use it as a Pin<&mut T>.

Okay, I guess we can do that. I think I understand the rules, so... I'll just add some unsafe blocks around those:

impl<R> AsyncRead for SlowRead<R>
where
    R: AsyncRead + Unpin,
{
    fn poll_read(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<std::io::Result<()>> {
        let sleep = unsafe { Pin::new_unchecked(&mut self.sleep) };
        match sleep.poll(cx) {
            Poll::Ready(_) => {
                let sleep = unsafe { Pin::new_unchecked(&mut self.sleep) };
                sleep.reset(Instant::now() + Duration::from_millis(25));
                Pin::new(&mut self.reader).poll_read(cx, buf)
            }
            Poll::Pending => Poll::Pending,
        }
    }
}

Uhh...

error[E0596]: cannot borrow data in a dereference of `Pin<&mut SlowRead<R>>` as mutable
  --> src/main.rs:47:49
   |
47 |         let sleep = unsafe { Pin::new_unchecked(&mut self.sleep) };
   |                                                 ^^^^^^^^^^^^^^^ cannot borrow as mutable
   |
   = help: trait `DerefMut` is required to modify through a dereference, but it is not implemented for `Pin<&mut SlowRead<R>>`

Oh nooo. What now. Even a line that used to work doesn't work anymore:

error[E0596]: cannot borrow data in a dereference of `Pin<&mut SlowRead<R>>` as mutable
  --> src/main.rs:52:26
   |
52 |                 Pin::new(&mut self.reader).poll_read(cx, buf)
   |                          ^^^^^^^^^^^^^^^^ cannot borrow as mutable
   |
   = help: trait `DerefMut` is required to modify through a dereference, but it is not implemented for `Pin<&mut SlowRead<R>>`

This used to work! We said R: Unpin, so R can be unpinned, so we should be able to pass it to Pin::new, the safe constructor for Pin.

Cool bear

Ah, yes, but back then, Self was Unpin too.

Mhhhhh because it only had fields that were also Unpin?

Cool bear

Yes! Didn't you think it was odd that we could do &mut self.reader, even though self was a Pin<&mut Self> and not a &mut Self?

Honestly I was just happy it worked?

Cool bear

Well, it only worked because we had Self: Unpin, and so going from Pin<&mut Self> to &mut Self was allowed — and so that's what it did, transparently.

But now, Self is no longer Unpin, because it contains a Sleep, and so we cannot go from Pin<&mut Self> to &mut Self.

...we can't?

Cool bear

Well, not safely. We can do so unsafely.

Let me guess, Pin::new_unchecked?

Cool bear

Actually no! That wouldn't help. To obtain a Pin<&mut R>, we must obtain a &mut R, and to obtain a &mut R, we would need a &mut Self, so we're back to square one.

Okay, some other unsafe method?

Cool bear

Yes! Pin::map_unchecked. Here, let me show you:

impl<R> AsyncRead for SlowRead<R>
where
    R: AsyncRead + Unpin,
{
    fn poll_read(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<std::io::Result<()>> {
        let sleep = unsafe { self.as_mut().map_unchecked_mut(|this| &mut this.sleep) };
        match sleep.poll(cx) {
            Poll::Ready(_) => {
                let sleep = unsafe { self.as_mut().map_unchecked_mut(|this| &mut this.sleep) };
                sleep.reset(Instant::now() + Duration::from_millis(25));
                let reader = unsafe { self.as_mut().map_unchecked_mut(|this| &mut this.reader) };
                reader.poll_read(cx, buf)
            }
            Poll::Pending => Poll::Pending,
        }
    }
}
Cool bear

There!

Uhhh it still doesn't build though:

error[E0277]: `PhantomPinned` cannot be unpinned
  --> src/main.rs:18:5
   |
18 |     f.read_exact(&mut buf).await?;
   |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ within `tokio::time::driver::sleep::_::__Origin<'_>`, the trait `Unpin` is not implemented for `PhantomPinned`
   |
   = note: required because it appears within the type `tokio::time::driver::entry::TimerEntry`
   = note: required because it appears within the type `tokio::time::driver::sleep::_::__Origin<'_>`
   = note: required because of the requirements on the impl of `Unpin` for `Sleep`
   = note: required because it appears within the type `SlowRead<tokio::fs::File>`
   = note: required because of the requirements on the impl of `futures::Future` for `tokio::io::util::read_exact::ReadExact<'_, SlowRead<tokio::fs::File>>`
   = note: required by `futures::Future::poll`
Cool bear

Ah, right. read_exact wants its receiver to be Unpin, and SlowRead is no longer Unpin (because of Sleep, remember?).

So... what do we do?

Cool bear

In this case, we can just pin our SlowRead before reading from it. But remember — once we use it pinned, we can never use it unpinned again.

So uhh... like that?

#[tokio::main]
async fn main() -> Result<(), tokio::io::Error> {
    let mut buf = vec![0u8; 128 * 1024];
    let mut f = SlowRead::new(File::open("/dev/urandom").await?);
    let before = Instant::now();
    let mut f = unsafe { Pin::new_unchecked(&mut f) };
    f.read_exact(&mut buf).await?;
    println!("Read {} bytes in {:?}", buf.len(), before.elapsed());

    Ok(())
}
Cool bear

Exactly like that! In fact, despite the unsafe, this is one of the safest ways to use Pin::new_unchecked, because we're shadowing the previous f (of type SlowRead) with our new value of type Pin<&mut SlowRead>, which means we can never accidentally use it unpinned.

Okay, let's see if it still works...

$ cargo run --quiet
Read 131072 bytes in 396.4873ms

It does. Good.

Okay. I don't feel good about this code at all though.

Cool bear

And you're right to! This is dangerous territory, because we have to uphold some guarantees ourselves.

We can clean it up a little though. Instead of pin-projecting fields one by one, we can do it all in one go.

impl<R> AsyncRead for SlowRead<R>
where
    R: AsyncRead + Unpin,
{
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<std::io::Result<()>> {
        // pin-project both fields
        let (mut sleep, reader) = unsafe {
            let this = self.get_unchecked_mut();
            (
                Pin::new_unchecked(&mut this.sleep),
                Pin::new_unchecked(&mut this.reader),
            )
        };

        match sleep.as_mut().poll(cx) {
            Poll::Ready(_) => {
                sleep.reset(Instant::now() + Duration::from_millis(25));
                reader.poll_read(cx, buf)
            }
            Poll::Pending => Poll::Pending,
        }
    }
}

Ah, more unsafe functions, good.

Cool bear

Well, yes — but at least we only have one unsafe block. But yes, we're in "just be careful" territory. As long as we never use either field unpinned, there will be no undefined behavior, and we're good to go.

Undefined behavior? Like what?

Cool bear

Like... anything at all. The compiler, and the executor, are allowed to do literally anything, if there's undefined behavior.

Okay but... what? Can we get an example?

Cool bear

Sure. Consider this program:

use futures::Future;
use std::{mem::swap, pin::Pin, task::Poll, time::Duration};
use tokio::{macros::support::poll_fn, time::sleep};

#[tokio::main]
async fn main() {
    let mut sleep1 = sleep(Duration::from_secs(1));
    let mut sleep2 = sleep(Duration::from_secs(1));

    {
        // let's use `sleep1` pinned exactly _once_
        let mut sleep1 = unsafe { Pin::new_unchecked(&mut sleep1) };

        // this creates a future whose poll method is the closure argument
        poll_fn(|cx| {
            // we poll `sleep1` once, throwing away the result...
            let _ = sleep1.as_mut().poll(cx);

            // ...and resolve immediately
            Poll::Ready(())
        })
        .await;
    }

    // then, let's use `sleep1` unpinned:
    swap(&mut sleep1, &mut sleep2);
    // by this point, `sleep1` has switched places with `sleep2`

    // finally, let's await both sleep1 and sleep2
    sleep1.await;
    sleep2.await;
}
Cool bear

What do you think is going to happen?

Mhhh. Well, we're failing to maintain pinning invariants...

Cool bear

Look at you using words!!

...so, well, anything could happen? It could just work, but then I guess Sleep would be Unpin in the first place. Or it could crash... or anything else may happen.

Cool bear

Try it!

$ cargo run --quiet
thread 'tokio-runtime-worker' panicked at 'assertion failed: cur_state < STATE_MIN_VALUE', /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/time/driver/entry.rs:174:13
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

(the program hangs forever, ie. it never exits)

Huh. Let's try it again with backtraces enabled:

$ RUST_BACKTRACE=1 cargo run --quiet
thread 'tokio-runtime-worker' panicked at 'assertion failed: cur_state < STATE_MIN_VALUE', /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/time/driver/entry.rs:174:13
stack backtrace:
   0: rust_begin_unwind
             at /rustc/cb75ad5db02783e8b0222fee363c5f63f7e2cf5b/library/std/src/panicking.rs:493:5
   1: core::panicking::panic_fmt
             at /rustc/cb75ad5db02783e8b0222fee363c5f63f7e2cf5b/library/core/src/panicking.rs:92:14
   2: core::panicking::panic
             at /rustc/cb75ad5db02783e8b0222fee363c5f63f7e2cf5b/library/core/src/panicking.rs:50:5
   3: tokio::time::driver::entry::StateCell::mark_pending
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/time/driver/entry.rs:174:13
   4: tokio::time::driver::entry::TimerHandle::mark_pending
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/time/driver/entry.rs:591:15
   5: tokio::time::driver::wheel::Wheel::process_expiration
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/time/driver/wheel/mod.rs:251:28
   6: tokio::time::driver::wheel::Wheel::poll
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/time/driver/wheel/mod.rs:163:21
   7: tokio::time::driver::<impl tokio::time::driver::handle::Handle>::process_at_time
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/time/driver/mod.rs:269:33
   8: tokio::time::driver::<impl tokio::time::driver::handle::Handle>::process
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/time/driver/mod.rs:258:9
   9: tokio::time::driver::Driver<P>::park_internal
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/time/driver/mod.rs:247:9
  10: <tokio::time::driver::Driver<P> as tokio::park::Park>::park
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/time/driver/mod.rs:398:9
  11: <tokio::park::either::Either<A,B> as tokio::park::Park>::park
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/park/either.rs:30:29
  12: <tokio::runtime::driver::Driver as tokio::park::Park>::park
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/driver.rs:198:9
  13: tokio::runtime::park::Inner::park_driver
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/park.rs:205:9
  14: tokio::runtime::park::Inner::park
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/park.rs:137:13
  15: <tokio::runtime::park::Parker as tokio::park::Park>::park
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/park.rs:93:9
  16: tokio::runtime::thread_pool::worker::Context::park_timeout
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/thread_pool/worker.rs:422:13
  17: tokio::runtime::thread_pool::worker::Context::park
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/thread_pool/worker.rs:398:20
  18: tokio::runtime::thread_pool::worker::Context::run
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/thread_pool/worker.rs:328:24
  19: tokio::runtime::thread_pool::worker::run::{{closure}}
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/thread_pool/worker.rs:303:17
  20: tokio::macros::scoped_tls::ScopedKey<T>::set
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/macros/scoped_tls.rs:61:9
  21: tokio::runtime::thread_pool::worker::run
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/thread_pool/worker.rs:300:5
  22: tokio::runtime::thread_pool::worker::Launch::launch::{{closure}}
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/thread_pool/worker.rs:279:45
  23: <tokio::runtime::blocking::task::BlockingTask<T> as core::future::future::Future>::poll
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/blocking/task.rs:42:21
  24: tokio::runtime::task::core::CoreStage<T>::poll::{{closure}}
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/task/core.rs:235:17
  25: tokio::loom::std::unsafe_cell::UnsafeCell<T>::with_mut
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/loom/std/unsafe_cell.rs:14:9
  26: tokio::runtime::task::core::CoreStage<T>::poll
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/task/core.rs:225:13
  27: tokio::runtime::task::harness::poll_future::{{closure}}
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/task/harness.rs:422:23
  28: <std::panic::AssertUnwindSafe<F> as core::ops::function::FnOnce<()>>::call_once
             at /home/amos/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/std/src/panic.rs:322:9
  29: std::panicking::try::do_call
             at /home/amos/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/std/src/panicking.rs:379:40
  30: __rust_try
  31: std::panicking::try
             at /home/amos/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/std/src/panicking.rs:343:19
  32: std::panic::catch_unwind
             at /home/amos/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/std/src/panic.rs:396:14
  33: tokio::runtime::task::harness::poll_future
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/task/harness.rs:409:19
  34: tokio::runtime::task::harness::Harness<T,S>::poll_inner
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/task/harness.rs:89:9
  35: tokio::runtime::task::harness::Harness<T,S>::poll
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/task/harness.rs:59:15
  36: tokio::runtime::task::raw::poll
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/task/raw.rs:104:5
  37: tokio::runtime::task::raw::RawTask::poll
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/task/raw.rs:66:18
  38: tokio::runtime::task::Notified<S>::run
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/task/mod.rs:171:9
  39: tokio::runtime::blocking::pool::Inner::run
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/blocking/pool.rs:278:17
  40: tokio::runtime::blocking::pool::Spawner::spawn_thread::{{closure}}
             at /home/amos/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.4.0/src/runtime/blocking/pool.rs:258:17
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.

Mh, sure enough, it's deep in the internals of tokio.

The actual assertion failing is:

// (in tokio code, in `src/time/driver/entry.rs`)
            debug_assert!(cur_state < STATE_MIN_VALUE);

Huh, a debug_assert!. That means that in production...

$ RUST_BACKTRACE=1 cargo run --quiet --release

(program never outputs anything, never exits either)

Hah. I guess they're not fucking around about the "undefined" part of "undefined behavior" huh?

Cool bear

No they are not. But let's think about why this fails. We already tried to implement our own sleep by spawning another thread that calls Waker::wake...

...but we also agreed that that's definitely not how tokio's sleep works. There's just no way it spawns one thread per sleep invocation. So I guess... it has some sort of timer system, and the first time a Sleep future is polled, it register itself with it...

Cool bear

Yes! And when it's dropped, it deregisters itself from it.

Right... and I'm assuming the timer system itself must point to the Sleep future somehow because well... when the timer runs out, it must know which future to wake up.

Cool bear

Yes! And if the future moves around...

...and it's not dropped... then the timer system points to the wrong memory location!

Cool bear

Exactly! And that's precisely what happened in that naughty program. sleep1 registered itself with the timer system, and then it switched places with sleep2.

Oh, so when the timer ran out, sleep2 was awakened instead?

Cool bear

Yes! And it didn't expect to be awakened. Even worse — when we awaited sleep2, it registered itself with the timer system, which created a second entry that pointed to the same address: what used to be sleep1, but was by that point sleep2.

And that's what made it hang?

Cool bear

Possibly. We'd have to read a bunch more tokio internals to determine exactly what kind of behavior we've triggered. Suffice to say: we don't want to do that.

I see. Ohhh and because some futures register themselves with some system of the executor, which points back to them, those futures should never be moved, and that's why they need to be pinned?

Cool bear

Yes! And everything is marked backwards, kind of. Every Future gets a pinned version of itself, and it can only be unpinned if it implements Unpin.

Okay okay okay. But wait, then why does Box::pin...?

Cool bear

Well, precisely because that's a heap allocation! If you're holding a Sleep on the stack, and you pass it around to another function, or store it in a struct, or whatever, it actually moves — its address changes.

But if you're holding a Box<Sleep>, well, then you're only holding a pointer to a Sleep that lives somewhere in heap-allocated memory. That somewhere will never change, ie. the Sleep itself will never move. The pointer to Sleep can be passed around, and everything is fine.

I see. So if we go back to our AsyncRead example... SlowRead can never be Unpin?

Cool bear

It can't! Because it contains a Sleep, which is not Unpin. Unless we go back to wrapping the Sleep in a Pin<Box<T>>>.

Oh, in which case the Sleep wouldn't really move when the SlowRead does, only a pointer to it would.

Cool bear

Exactly.

So what should we do? Should we be holding a Pin<Box<Sleep>>?

Cool bear

Well, it really depends what you want to do. It's true that it's not really convenient to use SlowRead right now, because we have to pin it first:

#[tokio::main]
async fn main() -> Result<(), tokio::io::Error> {
    let mut buf = vec![0u8; 128 * 1024];
    let mut f = SlowRead::new(File::open("/dev/urandom").await?);
    let before = Instant::now();

    let mut f = unsafe { Pin::new_unchecked(&mut f) };
    f.read_exact(&mut buf).await?;

    println!("Read {} bytes in {:?}", buf.len(), before.elapsed());

    Ok(())
}
Cool bear

But, first of all — we don't have to write that unsafe code ourselves.

The pin_utils crate provides a safe macro to do exactly that: pin an owned value to the stack, shadowing its previous name.

# in `Cargo.toml`

[dependencies]
pin-utils = "0.1.0"
#[tokio::main]
async fn main() -> Result<(), tokio::io::Error> {
    let mut buf = vec![0u8; 128 * 1024];
    let f = SlowRead::new(File::open("/dev/urandom").await?);
    let before = Instant::now();

    // 👇
    pin_utils::pin_mut!(f);
    f.read_exact(&mut buf).await?;

    println!("Read {} bytes in {:?}", buf.len(), before.elapsed());

    Ok(())
}
Cool bear

(Note that tokio ships with a similar macro, tokio::pin).

Secondly — if we find ourselves wanting to unpin a SlowRead, so we can pass it around and whatnot, we can just move the entire SlowRead to the heap first.

That's safe too:

#[tokio::main]
async fn main() -> Result<(), tokio::io::Error> {
    let mut buf = vec![0u8; 128 * 1024];
    let f = SlowRead::new(File::open("/dev/urandom").await?);
    let before = Instant::now();

    // 👇
    let mut f = Box::pin(f);
    f.read_exact(&mut buf).await?;

    println!("Read {} bytes in {:?}", buf.len(), before.elapsed());

    Ok(())
}
Cool bear

And finally, I'd just like to note that here, we're wrapping the whole File in our SlowRead, but we don't have to! We could just as well wrap a mutable reference to it, which also implements AsyncRead.

That way, we can use the File for something else after:

#[tokio::main]
async fn main() -> Result<(), tokio::io::Error> {
    let mut buf = vec![0u8; 128 * 1024];
    let mut f = File::open("/dev/urandom").await?;

    let sr = SlowRead::new(&mut f);
    pin_utils::pin_mut!(sr);

    let before = Instant::now();
    sr.read_exact(&mut buf).await?;
    println!("Read {} bytes in {:?}", buf.len(), before.elapsed());

    let before = Instant::now();
    f.read_exact(&mut buf).await?;
    println!("Read {} bytes in {:?}", buf.len(), before.elapsed());

    Ok(())
}
$ cargo run --quiet
Read 131072 bytes in 395.2787ms
Read 131072 bytes in 5.1451ms

...but that makes it harder to pass around, right?

Cool bear

Right, yes! If we take a &mut File, we can pass it to a function for example, but storing it in a struct makes that struct immovable.

Usual lifetime business. Another option would be to add an into_inner method to our SlowRead, so we can give back the inner reader:

impl<R> SlowRead<R>
where
    R: Unpin,
{
    fn into_inner(self) -> R {
        self.reader
    }
}
Cool bear

...but that's dangerous business, because that means we need to use the SlowRead unpinned after we've used it pinned. Which would normally be forbidden... except it's only !Unpin (not Unpin) because of Sleep, which gets dropped in that same method (by virtue of us taking self, ie. taking ownership of the SlowRead, and letting it fall out of scope).

In fact, pin_utils::pin_mut! prevents us from doing that altogether:

#[tokio::main]
async fn main() -> Result<(), tokio::io::Error> {
    let mut buf = vec![0u8; 128 * 1024];
    let f = File::open("/dev/urandom").await?;

    let f = SlowRead::new(f);
    {
        pin_utils::pin_mut!(f);

        let before = Instant::now();
        f.read_exact(&mut buf).await?;
        println!("Read {} bytes in {:?}", buf.len(), before.elapsed());
    }

    // 👇 can't do that!
    let mut f = f.into_inner();

    let before = Instant::now();
    f.read_exact(&mut buf).await?;
    println!("Read {} bytes in {:?}", buf.len(), before.elapsed());

    Ok(())
}
cargo check
    Checking manual-futures v0.1.0 (/home/amos/ftl/manual-futures)
error[E0382]: use of moved value: `f`
  --> src/main.rs:27:17
   |
18 |     let f = SlowRead::new(f);
   |         - move occurs because `f` has type `SlowRead<tokio::fs::File>`, which does not implement the `Copy` trait
19 |     {
20 |         pin_utils::pin_mut!(f);
   |         ----------------------- value moved here
...
27 |     let mut f = f.into_inner();
   |                 ^ value used here after move
Cool bear

...and Pin<Box<T>> wouldn't even let us access the T we need to call into_inner in the first place!

Correct usage would be tricky:

#[tokio::main]
async fn main() -> Result<(), tokio::io::Error> {
    let mut buf = vec![0u8; 128 * 1024];
    let f = File::open("/dev/urandom").await?;
    let mut f = SlowRead::new(f);

    {
        let mut f = unsafe { Pin::new_unchecked(&mut f) };

        let before = Instant::now();
        f.read_exact(&mut buf).await?;
        println!("Read {} bytes in {:?}", buf.len(), before.elapsed());
    };

    let mut f = f.into_inner();

    let before = Instant::now();
    f.read_exact(&mut buf).await?;
    println!("Read {} bytes in {:?}", buf.len(), before.elapsed());

    Ok(())
}
Cool bear

...so it's probably not a good pattern. We could probably come up with a safer API, like that:

use std::{
    future::Future,
    pin::Pin,
    task::{Context, Poll},
    time::Duration,
};

use tokio::{
    fs::File,
    io::{AsyncRead, AsyncReadExt, ReadBuf},
    time::{Instant, Sleep},
};

struct SlowRead<R> {
    //       👇 now optional!
    reader: Option<R>,
    sleep: Sleep,
}

impl<R> SlowRead<R> {
    fn new(reader: R) -> Self {
        Self {
            //       👇
            reader: Some(reader),
            sleep: tokio::time::sleep(Default::default()),
        }
    }
}

impl<R> SlowRead<R>
where
    R: Unpin,
{
    // 👇 now takes pinned mutable reference to Self, and returns an option
    fn take_inner(self: Pin<&mut Self>) -> Option<R> {
        unsafe { self.get_unchecked_mut().reader.take() }
    }
}

impl<R> AsyncRead for SlowRead<R>
where
    R: AsyncRead + Unpin,
{
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<std::io::Result<()>> {
        // pin-project both fields
        let (mut sleep, reader) = unsafe {
            let this = self.get_unchecked_mut();
            (Pin::new_unchecked(&mut this.sleep), &mut this.reader)
        };

        match sleep.as_mut().poll(cx) {
            Poll::Ready(_) => {
                sleep.reset(Instant::now() + Duration::from_millis(25));
                match reader {
                    Some(reader) => {
                        // pin-project option.
                        let reader = Pin::new(reader);
                        // note: no need for unsafe since R: Unpin! (thanks
                        // /u/novartole on reddit for catching this.)
                        reader.poll_read(cx, buf)
                    }
                    None => {
                        // simulate EOF
                        Poll::Ready(Ok(()))
                    }
                }
            }
            Poll::Pending => Poll::Pending,
        }
    }
}
Cool bear

And we could use it like this:

#[tokio::main]
async fn main() -> Result<(), tokio::io::Error> {
    let mut buf = vec![0u8; 128 * 1024];
    let f = File::open("/dev/urandom").await?;

    let mut f = {
        let f = SlowRead::new(f);
        pin_utils::pin_mut!(f);

        let before = Instant::now();
        f.read_exact(&mut buf).await?;
        println!("Read {} bytes in {:?}", buf.len(), before.elapsed());

        f.take_inner().unwrap()
    };

    let before = Instant::now();
    f.read_exact(&mut buf).await?;
    println!("Read {} bytes in {:?}", buf.len(), before.elapsed());

    Ok(())
}
Cool bear

If we really needed an into_inner method that takes self, though, we may be better off boxing the Sleep so that the whole type becomes Unpin itself, which would make all of this a lot more ergonomic.

I just wanted to point out, pun fully intended, that there's options.

Gotcha.

Let's go back to this version of SlowRead for a second:

struct SlowRead<R> {
    reader: R,
    sleep: Sleep,
}

impl<R> SlowRead<R> {
    fn new(reader: R) -> Self {
        Self {
            reader,
            sleep: tokio::time::sleep(Default::default()),
        }
    }
}

That has this AsyncRead implementation:

impl<R> AsyncRead for SlowRead<R>
where
    R: AsyncRead + Unpin,
{
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<std::io::Result<()>> {
        // pin-project both fields
        let (mut sleep, reader) = unsafe {
            let this = self.get_unchecked_mut();
            (
                Pin::new_unchecked(&mut this.sleep),
                Pin::new_unchecked(&mut this.reader),
            )
        };

        match sleep.as_mut().poll(cx) {
            Poll::Ready(_) => {
                sleep.reset(Instant::now() + Duration::from_millis(25));
                reader.poll_read(cx, buf)
            }
            Poll::Pending => Poll::Pending,
        }
    }
}

Now... this could be worse, right? I'm sure there's worse implementations of AsyncRead out there. But also, I get the feeling it could be better.

Like.. without any usage of unsafe?

Cool bear

There's a crate for that!

Nothing in the standard library so far (Rust 1.51), but that's exactly what the pin-project crate is about.

Let's see..

# in `Cargo.toml`

[dependencies]
pin-project = "1.0"
Cool bear

The idea behind pin-project is that it does pin projection for you. That part:

        // pin-project both fields
        let (mut sleep, reader) = unsafe {
            let this = self.get_unchecked_mut();
            (
                Pin::new_unchecked(&mut this.sleep),
                Pin::new_unchecked(&mut this.reader),
            )
        };
Cool bear

...and to ensure that you're using fields either always pinned or never pinned, you have to make that choice when designing the struct.

pin-project presents itself as a procedural attribute macro, so you can slap it on top of any struct, and then set the #[pin] attribute onto any field that you're planning on using as pinned.

In our case, that's both fields.

use pin_project::pin_project;

#[pin_project]
struct SlowRead<R> {
    #[pin]
    reader: R,
    #[pin]
    sleep: Sleep,
}
Cool bear

And then, instead of doing pin projection ourselves, we call the auto-generated method project, which takes a Pin<&mut SlowRead<R>> and returns a struct that looks like this:

struct SlowReadProjected<'a, R> {
    reader: Pin<&'a mut R>
    sleep: Pin<&'a mut Sleep>
}
impl<R> AsyncRead for SlowRead<R>
where
    R: AsyncRead + Unpin,
{
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<std::io::Result<()>> {
        //       👇            👇
        let mut this = self.project();

        match this.sleep.as_mut().poll(cx) {
            Poll::Ready(_) => {
                this.sleep.reset(Instant::now() + Duration::from_millis(25));
                this.reader.poll_read(cx, buf)
            }
            Poll::Pending => Poll::Pending,
        }
    }
}
Cool bear

And boom, no more unsafe.

And we can never accidentally use self.sleep or self.reader unpinned (without unsafe code).

And it still works.

$ grep "unsafe" -Rn src
$ cargo run --quiet
Read 131072 bytes in 393.9761ms
Cool bear

Of course, that's a fairly simple case of pin-projection. It gets hairier. A lot hairier. Partially-pinned state machines are... not fun. For now!

Whoa. Thanks cool bear!

Cool bear

Don't mention it 😎

Comment on /r/fasterthanlime

(JavaScript is required to see this. Or maybe my stuff broke)

Here's another article just for you:

Some mistakes Rust doesn't catch

I still get excited about programming languages. But these days, it's not so much because of what they let me do, but rather what they don't let me do.

Ultimately, what you can with a programming language is seldom limited by the language itself: there's nothing you can do in C++ that you can't do in C, given infinite time.

As long as a language is turing-complete and compiles down to assembly, no matter the interface, it's the same machine you're talking to. You're limited by... what your hardware can do, how much memory it has (and how fast it is), what kind of peripherals are plugged into it, and so on.