From dc975eca907fabb62fd48cee82606b39900b47b8 Mon Sep 17 00:00:00 2001 From: Wim Looman Date: Fri, 14 Jun 2019 20:08:10 +0200 Subject: [PATCH] Add AsyncBufReadExt::copy_buf_into --- futures-util/src/io/copy_buf_into.rs | 58 ++++++++++++++++++++++++++++ futures-util/src/io/mod.rs | 33 ++++++++++++++++ 2 files changed, 91 insertions(+) create mode 100644 futures-util/src/io/copy_buf_into.rs diff --git a/futures-util/src/io/copy_buf_into.rs b/futures-util/src/io/copy_buf_into.rs new file mode 100644 index 0000000000..0ceaec8e0e --- /dev/null +++ b/futures-util/src/io/copy_buf_into.rs @@ -0,0 +1,58 @@ +use futures_core::future::Future; +use futures_core::task::{Context, Poll}; +use futures_io::{AsyncBufRead, AsyncWrite}; +use std::io; +use std::pin::Pin; + +/// Future for the [`copy_buf_into`](super::AsyncBufReadExt::copy_buf_into) method. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct CopyBufInto { + reader: R, + writer: W, + amt: u64, +} + +impl Unpin for CopyBufInto {} + +impl CopyBufInto { + pub(super) fn new(reader: R, writer: W) -> Self { + CopyBufInto { + reader, + writer, + amt: 0, + } + } + + fn project<'a>(self: Pin<&'a mut Self>) -> (Pin<&'a mut R>, Pin<&'a mut W>, &'a mut u64) { + unsafe { + let this = self.get_unchecked_mut(); + (Pin::new_unchecked(&mut this.reader), Pin::new_unchecked(&mut this.writer), &mut this.amt) + } + } +} + +impl Future for CopyBufInto + where R: AsyncBufRead, + W: AsyncWrite, +{ + type Output = io::Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let (mut reader, mut writer, amt) = self.project(); + loop { + let buffer = ready!(reader.as_mut().poll_fill_buf(cx))?; + if buffer.is_empty() { + ready!(writer.as_mut().poll_flush(cx))?; + return Poll::Ready(Ok(*amt)); + } + + let i = ready!(writer.as_mut().poll_write(cx, buffer))?; + if i == 0 { + return Poll::Ready(Err(io::ErrorKind::WriteZero.into())) + } + *amt += i as u64; + reader.as_mut().consume(i); + } + } +} diff --git a/futures-util/src/io/mod.rs b/futures-util/src/io/mod.rs index a08b74ea9d..43bb4f4da9 100644 --- a/futures-util/src/io/mod.rs +++ b/futures-util/src/io/mod.rs @@ -31,6 +31,9 @@ pub use self::buf_writer::BufWriter; mod copy_into; pub use self::copy_into::CopyInto; +mod copy_buf_into; +pub use self::copy_buf_into::CopyBufInto; + mod flush; pub use self::flush::Flush; @@ -404,6 +407,36 @@ impl AsyncSeekExt for S {} /// An extension trait which adds utility methods to `AsyncBufRead` types. pub trait AsyncBufReadExt: AsyncBufRead { + /// Creates a future which copies all the bytes from one object to another. + /// + /// The returned future will copy all the bytes read from this `AsyncBufRead` into the + /// `writer` specified. This future will only complete once the `reader` has hit + /// EOF and all bytes have been written to and flushed from the `writer` + /// provided. + /// + /// On success the number of bytes is returned. + /// + /// # Examples + /// + /// ``` + /// #![feature(async_await)] + /// # futures::executor::block_on(async { + /// use futures::io::AsyncBufReadExt; + /// use std::io::Cursor; + /// + /// let mut reader = Cursor::new([1, 2, 3, 4]); + /// let mut writer = Cursor::new([0u8; 5]); + /// + /// let bytes = reader.copy_buf_into(&mut writer).await?; + /// + /// assert_eq!(bytes, 4); + /// assert_eq!(writer.into_inner(), [1, 2, 3, 4, 0]); + /// # Ok::<(), Box>(()) }).unwrap(); + /// ``` + fn copy_buf_into(self, writer: W) -> CopyBufInto where Self: Sized { + CopyBufInto::new(self, writer) + } + /// Creates a future which will read all the bytes associated with this I/O /// object into `buf` until the delimiter `byte` or EOF is reached. /// This method is the async equivalent to [`BufRead::read_until`](std::io::BufRead::read_until).