Skip to content

Commit

Permalink
Add AsyncBufReadExt::copy_buf_into
Browse files Browse the repository at this point in the history
  • Loading branch information
Nemo157 committed Jun 14, 2019
1 parent b232653 commit dc975ec
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 0 deletions.
58 changes: 58 additions & 0 deletions futures-util/src/io/copy_buf_into.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use futures_core::future::Future;
use futures_core::task::{Context, Poll};
use futures_io::{AsyncBufRead, AsyncWrite};
use std::io;
use std::pin::Pin;

/// Future for the [`copy_buf_into`](super::AsyncBufReadExt::copy_buf_into) method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct CopyBufInto<R, W> {
reader: R,
writer: W,
amt: u64,
}

impl<R: Unpin, W: Unpin> Unpin for CopyBufInto<R, W> {}

impl<R, W> CopyBufInto<R, W> {
pub(super) fn new(reader: R, writer: W) -> Self {
CopyBufInto {
reader,
writer,
amt: 0,
}
}

fn project<'a>(self: Pin<&'a mut Self>) -> (Pin<&'a mut R>, Pin<&'a mut W>, &'a mut u64) {
unsafe {
let this = self.get_unchecked_mut();
(Pin::new_unchecked(&mut this.reader), Pin::new_unchecked(&mut this.writer), &mut this.amt)
}
}
}

impl<R, W> Future for CopyBufInto<R, W>
where R: AsyncBufRead,
W: AsyncWrite,
{
type Output = io::Result<u64>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let (mut reader, mut writer, amt) = self.project();
loop {
let buffer = ready!(reader.as_mut().poll_fill_buf(cx))?;
if buffer.is_empty() {
ready!(writer.as_mut().poll_flush(cx))?;
return Poll::Ready(Ok(*amt));
}

let i = ready!(writer.as_mut().poll_write(cx, buffer))?;
if i == 0 {
return Poll::Ready(Err(io::ErrorKind::WriteZero.into()))
}
*amt += i as u64;
reader.as_mut().consume(i);
}
}
}
33 changes: 33 additions & 0 deletions futures-util/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ pub use self::buf_writer::BufWriter;
mod copy_into;
pub use self::copy_into::CopyInto;

mod copy_buf_into;
pub use self::copy_buf_into::CopyBufInto;

mod flush;
pub use self::flush::Flush;

Expand Down Expand Up @@ -404,6 +407,36 @@ impl<S: AsyncSeek + ?Sized> AsyncSeekExt for S {}

/// An extension trait which adds utility methods to `AsyncBufRead` types.
pub trait AsyncBufReadExt: AsyncBufRead {
/// Creates a future which copies all the bytes from one object to another.
///
/// The returned future will copy all the bytes read from this `AsyncBufRead` into the
/// `writer` specified. This future will only complete once the `reader` has hit
/// EOF and all bytes have been written to and flushed from the `writer`
/// provided.
///
/// On success the number of bytes is returned.
///
/// # Examples
///
/// ```
/// #![feature(async_await)]
/// # futures::executor::block_on(async {
/// use futures::io::AsyncBufReadExt;
/// use std::io::Cursor;
///
/// let mut reader = Cursor::new([1, 2, 3, 4]);
/// let mut writer = Cursor::new([0u8; 5]);
///
/// let bytes = reader.copy_buf_into(&mut writer).await?;
///
/// assert_eq!(bytes, 4);
/// assert_eq!(writer.into_inner(), [1, 2, 3, 4, 0]);
/// # Ok::<(), Box<std::error::Error>>(()) }).unwrap();
/// ```
fn copy_buf_into<W: AsyncWrite>(self, writer: W) -> CopyBufInto<Self, W> where Self: Sized {
CopyBufInto::new(self, writer)
}

/// Creates a future which will read all the bytes associated with this I/O
/// object into `buf` until the delimiter `byte` or EOF is reached.
/// This method is the async equivalent to [`BufRead::read_until`](std::io::BufRead::read_until).
Expand Down

0 comments on commit dc975ec

Please sign in to comment.