Async in Traits Just Save Us

Developers who venture into crafting their own async/await implementations in Rust may encounter the intricate task of implementing the Future trait for their custom types. Rust’s approach to async/await is nuanced, offering a stark contrast to languages like Go, which employs preemptive scheduling. Instead, Rust embraces lazy evaluation and cooperative scheduling, allowing developers to meticulously control the yield points to the executor. This level of control, however, introduces complexity in implementing the Future trait for custom types. The intricacies arise because .await can’t be invoked within a non-async function, necessitating the development of a state machine(or similiar) for these custom types. This endeavor can be laborious and fraught with potential errors, difficult to maintain, and may prompt developers to opt for BoxFuture<T>, a choice that could compromise performance.

Understanding the Role of BoxFuture<T>
BoxFuture<T> acts as a wrapper for Pin<Box<dyn Future<Output = T> + Send + 'a>>, representing a pointer to an object on the heap that conforms to the Future trait. This layer of abstraction spares developers the headache of handling stack-based object or field movements. Thanks to Box<T>, which ensures full ownership and permits the movement of the smart pointer without affecting the underlying data, BoxFuture<T> transfers are both safe and efficient. Nevertheless, the reliance on heap allocation can introduce performance trade-offs. For developers craving finer-grained control over their asynchronous tasks, exploring alternatives like pin-project could prove advantageous.

In OpenDAL , our journey with asynchronous traits, akin to Rust’s Future, has led us to implement the Read trait as follows:

1
2
3
4
5
pub trait Read: Unpin + Send + Sync {
    fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize>>;
    fn poll_seek(&mut self, cx: &mut Context<'_>, pos: io::SeekFrom) -> Poll<Result<u64>>;
    fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>>;
}

In OpenDAL, operators manage a FusedAccessor delegate, essentially a type-erased accessor encapsulating an Arc<dyn Accessor> inside. The Accessor trait binds associated types with operational traits, such as type Reader: oio::Read, making its implementation crucial for each service backend to support specific functionalities. This process can be challenging, as it often involves manually crafting poll actions within a synchronous context to achieve asynchronous behavior.

Thankfully, the commonalities in service operations have allowed us to employ utility structures like IncomingAsyncBody , which implements the Read trait. This strategy facilitates efficient HTTP response management across service backends, eliminating the need to reinvent the wheel for each services.

Nonetheless, challenges may surface during integrations, such as with the DataBricks filesystem in OpenDAL. My experience with IncomingAsyncBody highlighted compatibility issues, as DataBricks required a distinct approach to response handling.

This led to the development of a specialized Reader. The path to understanding the necessary state machine culminated in a complex implementation , which seems unnecessary for such a simple operation.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
enum State {
    Reading(Option<Arc<DbfsCore>>),
    Finalize(BoxFuture<'static, (Arc<DbfsCore>, Result<Bytes>)>),
}

unsafe impl Sync for DbfsReader {}

#[async_trait]
impl oio::Read for DbfsReader {
    fn poll_read(&mut self, cx: &mut Context<'_>, mut buf: &mut [u8]) -> Poll<Result<usize>> {
        while self.has_filled as usize != buf.len() {
            match &mut self.state {
                State::Reading(core) => {
                    let core = core.take().expect("DbfsReader must be initialized");

                    let path = self.path.clone();
                    let offset = self.offset;
                    let len = cmp::min(buf.len(), DBFS_READ_LIMIT);

                    let fut = async move {
                        let resp = async { core.dbfs_read(&path, offset, len as u64).await }.await;
                        let body = match resp {
                            Ok(resp) => resp.into_body(),
                            Err(err) => {
                                return (core, Err(err));
                            }
                        };
                        let bs = async { body.bytes().await }.await;
                        (core, bs)
                    };
                    self.state = State::Finalize(Box::pin(fut));
                }
                State::Finalize(fut) => {
                    let (core, bs) = ready!(fut.as_mut().poll(cx));
                    let data = self.serde_json_decode(&bs?)?;

                    buf.put_slice(&data[..]);
                    self.set_offset(self.offset + data.len() as u64);
                    self.has_filled += data.len() as u64;
                    self.state = State::Reading(Some(core));
                }
            }
        }
        Poll::Ready(Ok(self.has_filled as usize))
    }

    // ...

With the advent of Rust 1.75 , the concept of Return Position Impl Trait (RPIT) in function signatures has been stabilized. This enhancement means we can now directly use async in traits, albeit with some limitations . The update streamlines the implementation of traits, such as Read, ushering in a more intuitive experience (hat tip to @Xuanwo ):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
pub trait Read: Unpin + Send + Sync {
    
    #[cfg(not(target_arch = "wasm32"))]
    fn read(&mut self, limit: usize) -> impl Future<Output = Result<Bytes>> + Send;
    #[cfg(target_arch = "wasm32")]
    fn read(&mut self, size: usize) -> impl Future<Output = Result<Bytes>>;

    #[cfg(not(target_arch = "wasm32"))]
    fn seek(&mut self, pos: io::SeekFrom) -> impl Future<Output = Result<u64>> + Send;
    #[cfg(target_arch = "wasm32")]
    fn seek(&mut self, pos: io::SeekFrom) -> impl Future<Output = Result<u64>>;
}

You might be pondering where the async keyword goes. Well, in Rust, async is essentially syntactic sugar, translating into impl Future<Output = T> + Send during compilation. The decision to explicitly define trait bounds stems from the desire to separate compile target for the WASM support. This has no bearing on the implementation of methods. For example, the poll_read method in IncomingAsyncBody can now be implemented as follows:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
impl oio::Read for IncomingAsyncBody {
    async fn read(&mut self, limit: usize) -> Result<Bytes> {
        if self.size == Some(0) {
            return Ok(Bytes::new());
        }

        if self.chunk.is_empty() {
            self.chunk = match self.inner.next().await.transpose()? {
                Some(bs) => bs,
                None => {
                    if let Some(size) = self.size {
                        Self::check(size, self.consumed)?
                    }

                    return Ok(Bytes::new());
                }
            };
        }

        let size = min(limit, self.chunk.len());
        self.consumed += size as u64;
        let bs = self.chunk.split_to(size);
        Ok(bs)
    }
    
    // ...

Compared to the previous approach :

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
impl oio::Read for IncomingAsyncBody {
    fn poll_read(&mut self, cx: &mut Context<'_>, mut buf: &mut [u8]) -> Poll<Result<usize>> {
        if buf.is_empty() || self.size == Some(0) {
            return Poll::Ready(Ok(0));
        }

        let mut bs = if let Some(chunk) = self.chunk.take() {
            chunk
        } else {
            loop {
                match ready!(self.inner.poll_next(cx)) {
                    Some(Ok(bs)) if bs.is_empty() => continue,
                    Some(Ok(bs)) => {
                        self.consumed += bs.len() as u64;
                        break bs;
                    }
                    Some(Err(err)) => return Poll::Ready(Err(err)),
                    None => {
                        if let Some(size) = self.size {
                            Self::check(size, self.consumed)?;
                        }
                        return Poll::Ready(Ok(0));
                    }
                }
            }
        };

        let amt = min(bs.len(), buf.len());
        buf.put_slice(&bs[..amt]);
        bs.advance(amt);
        if !bs.is_empty() {
            self.chunk = Some(bs);
        }

        Poll::Ready(Ok(amt))
    }
  
    // ...

Isn’t that much simpler to read and maintain? No more loop, &mut Context<'_> and state machines 🎉.

I’m convinced that integrating async within traits will significantly expedite handling futures, simplifying the Rust learning curve and making async/await paradigms more accessible to Rustaceans. Nevertheless, not every challenge related to async/await in Rust has been solved. There are still some limitations to consider. However, I beleive the Rust team 🦀 will continue to refine the async/await experience moving forward. ✌️

Related Content