diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index a0b6db3865422b6b8a8cd2400ac1f5bf5869ea04..afca8c524dd97109fc30843ed4e61c8f870fc14e 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -196,87 +196,62 @@ impl<T> Sender<T> { Sender { chan } } - /// Returns `Poll::Ready(Ok(()))` when the channel is able to accept another item. - /// - /// If the channel is full, then `Poll::Pending` is returned and the task is notified when a - /// slot becomes available. - /// - /// Once `poll_ready` returns `Poll::Ready(Ok(()))`, a call to `try_send` will succeed unless - /// the channel has since been closed. To provide this guarantee, the channel reserves one slot - /// in the channel for the coming send. This reserved slot is not available to other `Sender` - /// instances, so you need to be careful to not end up with deadlocks by blocking after calling - /// `poll_ready` but before sending an element. + /// Sends a value, waiting until there is capacity. /// - /// If, after `poll_ready` succeeds, you decide you do not wish to send an item after all, you - /// can use [`disarm`](Sender::disarm) to release the reserved slot. + /// A successful send occurs when it is determined that the other end of the + /// channel has not hung up already. An unsuccessful send would be one where + /// the corresponding receiver has already been closed. Note that a return + /// value of `Err` means that the data will never be received, but a return + /// value of `Ok` does not mean that the data will be received. It is + /// possible for the corresponding receiver to hang up immediately after + /// this function returns `Ok`. /// - /// Until an item is sent or [`disarm`](Sender::disarm) is called, repeated calls to - /// `poll_ready` will return either `Poll::Ready(Ok(()))` or `Poll::Ready(Err(_))` if channel - /// is closed. - pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), ClosedError>> { - self.chan.poll_ready(cx).map_err(|_| ClosedError::new()) - } - - /// Undo a successful call to `poll_ready`. + /// # Errors /// - /// Once a call to `poll_ready` returns `Poll::Ready(Ok(()))`, it holds up one slot in the - /// channel to make room for the coming send. `disarm` allows you to give up that slot if you - /// decide you do not wish to send an item after all. After calling `disarm`, you must call - /// `poll_ready` until it returns `Poll::Ready(Ok(()))` before attempting to send again. + /// If the receive half of the channel is closed, either due to [`close`] + /// being called or the [`Receiver`] handle dropping, the function returns + /// an error. The error includes the value passed to `send`. /// - /// Returns `false` if no slot is reserved for this sender (usually because `poll_ready` was - /// not previously called, or did not succeed). + /// [`close`]: Receiver::close + /// [`Receiver`]: Receiver /// - /// # Motivation + /// # Examples /// - /// Since `poll_ready` takes up one of the finite number of slots in a bounded channel, callers - /// need to send an item shortly after `poll_ready` succeeds. If they do not, idle senders may - /// take up all the slots of the channel, and prevent active senders from getting any requests - /// through. Consider this code that forwards from one channel to another: + /// In the following example, each call to `send` will block until the + /// previously sent value was received. /// - /// ```rust,ignore - /// loop { - /// ready!(tx.poll_ready(cx))?; - /// if let Some(item) = ready!(rx.poll_recv(cx)) { - /// tx.try_send(item)?; - /// } else { - /// break; - /// } - /// } - /// ``` + /// ```rust + /// use tokio::sync::mpsc; /// - /// If many such forwarders exist, and they all forward into a single (cloned) `Sender`, then - /// any number of forwarders may be waiting for `rx.poll_recv` at the same time. While they do, - /// they are effectively each reducing the channel's capacity by 1. If enough of these - /// forwarders are idle, forwarders whose `rx` _do_ have elements will be unable to find a spot - /// for them through `poll_ready`, and the system will deadlock. + /// #[tokio::main] + /// async fn main() { + /// let (mut tx, mut rx) = mpsc::channel(1); /// - /// `disarm` solves this problem by allowing you to give up the reserved slot if you find that - /// you have to block. We can then fix the code above by writing: + /// tokio::spawn(async move { + /// for i in 0..10 { + /// if let Err(_) = tx.send(i).await { + /// println!("receiver dropped"); + /// return; + /// } + /// } + /// }); /// - /// ```rust,ignore - /// loop { - /// ready!(tx.poll_ready(cx))?; - /// let item = rx.poll_recv(cx); - /// if let Poll::Ready(Ok(_)) = item { - /// // we're going to send the item below, so don't disarm - /// } else { - /// // give up our send slot, we won't need it for a while - /// tx.disarm(); - /// } - /// if let Some(item) = ready!(item) { - /// tx.try_send(item)?; - /// } else { - /// break; - /// } + /// while let Some(i) = rx.recv().await { + /// println!("got = {}", i); + /// } /// } /// ``` - pub fn disarm(&mut self) -> bool { - if self.chan.is_ready() { - self.chan.disarm(); - true - } else { - false + pub async fn send(&mut self, value: T) -> Result<(), SendError<T>> { + use crate::future::poll_fn; + + if poll_fn(|cx| self.poll_ready(cx)).await.is_err() { + return Err(SendError(value)); + } + + match self.try_send(value) { + Ok(()) => Ok(()), + Err(TrySendError::Full(_)) => unreachable!(), + Err(TrySendError::Closed(value)) => Err(SendError(value)), } } @@ -347,15 +322,13 @@ impl<T> Sender<T> { Ok(()) } - /// Sends a value, waiting until there is capacity. + /// Sends a value, waiting until there is capacity, but only for a limited time. /// - /// A successful send occurs when it is determined that the other end of the - /// channel has not hung up already. An unsuccessful send would be one where - /// the corresponding receiver has already been closed. Note that a return - /// value of `Err` means that the data will never be received, but a return - /// value of `Ok` does not mean that the data will be received. It is - /// possible for the corresponding receiver to hang up immediately after - /// this function returns `Ok`. + /// Shares the same success and error conditions as [`send`], adding one more + /// condition for an unsuccessful send, which is when the provided timeout has + /// elapsed, and there is no capacity available. + /// + /// [`send`]: Sender::send /// /// # Errors /// @@ -368,11 +341,12 @@ impl<T> Sender<T> { /// /// # Examples /// - /// In the following example, each call to `send` will block until the - /// previously sent value was received. + /// In the following example, each call to `send_timeout` will block until the + /// previously sent value was received, unless the timeout has elapsed. /// /// ```rust /// use tokio::sync::mpsc; + /// use tokio::time::{delay_for, Duration}; /// /// #[tokio::main] /// async fn main() { @@ -380,8 +354,8 @@ impl<T> Sender<T> { /// /// tokio::spawn(async move { /// for i in 0..10 { - /// if let Err(_) = tx.send(i).await { - /// println!("receiver dropped"); + /// if let Err(e) = tx.send_timeout(i, Duration::from_millis(100)).await { + /// println!("send error: #{:?}", e); /// return; /// } /// } @@ -389,94 +363,117 @@ impl<T> Sender<T> { /// /// while let Some(i) = rx.recv().await { /// println!("got = {}", i); + /// delay_for(Duration::from_millis(200)).await; /// } /// } /// ``` - pub async fn send(&mut self, value: T) -> Result<(), SendError<T>> { + #[cfg(feature = "time")] + #[cfg_attr(docsrs, doc(cfg(feature = "time")))] + pub async fn send_timeout( + &mut self, + value: T, + timeout: Duration, + ) -> Result<(), SendTimeoutError<T>> { use crate::future::poll_fn; - if poll_fn(|cx| self.poll_ready(cx)).await.is_err() { - return Err(SendError(value)); + match crate::time::timeout(timeout, poll_fn(|cx| self.poll_ready(cx))).await { + Err(_) => { + return Err(SendTimeoutError::Timeout(value)); + } + Ok(Err(_)) => { + return Err(SendTimeoutError::Closed(value)); + } + Ok(_) => {} } match self.try_send(value) { Ok(()) => Ok(()), Err(TrySendError::Full(_)) => unreachable!(), - Err(TrySendError::Closed(value)) => Err(SendError(value)), + Err(TrySendError::Closed(value)) => Err(SendTimeoutError::Closed(value)), } } -} -cfg_time! { - impl<T> Sender<T> { - /// Sends a value, waiting until there is capacity, but only for a limited time. - /// - /// Shares the same success and error conditions as [`send`], adding one more - /// condition for an unsuccessful send, which is when the provided timeout has - /// elapsed, and there is no capacity available. - /// - /// [`send`]: Sender::send - /// - /// # Errors - /// - /// If the receive half of the channel is closed, either due to [`close`] being - /// called or the [`Receiver`] handle dropping, or if the timeout specified - /// elapses before the capacity is available the function returns an error. - /// The error includes the value passed to `send_timeout`. - /// - /// [`close`]: Receiver::close - /// [`Receiver`]: Receiver - /// - /// # Examples - /// - /// In the following example, each call to `send_timeout` will block until the - /// previously sent value was received, unless the timeout has elapsed. - /// - /// ```rust - /// use tokio::sync::mpsc; - /// use tokio::time::{delay_for, Duration}; - /// - /// #[tokio::main] - /// async fn main() { - /// let (mut tx, mut rx) = mpsc::channel(1); - /// - /// tokio::spawn(async move { - /// for i in 0..10 { - /// if let Err(e) = tx.send_timeout(i, Duration::from_millis(100)).await { - /// println!("send error: #{:?}", e); - /// return; - /// } - /// } - /// }); - /// - /// while let Some(i) = rx.recv().await { - /// println!("got = {}", i); - /// delay_for(Duration::from_millis(200)).await; - /// } - /// } - /// ``` - pub async fn send_timeout( - &mut self, - value: T, - timeout: Duration, - ) -> Result<(), SendTimeoutError<T>> { - use crate::future::poll_fn; - - match crate::time::timeout(timeout, poll_fn(|cx| self.poll_ready(cx))).await { - Err(_) => { - return Err(SendTimeoutError::Timeout(value)); - } - Ok(Err(_)) => { - return Err(SendTimeoutError::Closed(value)); - } - Ok(_) => {} - } + /// Returns `Poll::Ready(Ok(()))` when the channel is able to accept another item. + /// + /// If the channel is full, then `Poll::Pending` is returned and the task is notified when a + /// slot becomes available. + /// + /// Once `poll_ready` returns `Poll::Ready(Ok(()))`, a call to `try_send` will succeed unless + /// the channel has since been closed. To provide this guarantee, the channel reserves one slot + /// in the channel for the coming send. This reserved slot is not available to other `Sender` + /// instances, so you need to be careful to not end up with deadlocks by blocking after calling + /// `poll_ready` but before sending an element. + /// + /// If, after `poll_ready` succeeds, you decide you do not wish to send an item after all, you + /// can use [`disarm`](Sender::disarm) to release the reserved slot. + /// + /// Until an item is sent or [`disarm`](Sender::disarm) is called, repeated calls to + /// `poll_ready` will return either `Poll::Ready(Ok(()))` or `Poll::Ready(Err(_))` if channel + /// is closed. + pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), ClosedError>> { + self.chan.poll_ready(cx).map_err(|_| ClosedError::new()) + } - match self.try_send(value) { - Ok(()) => Ok(()), - Err(TrySendError::Full(_)) => unreachable!(), - Err(TrySendError::Closed(value)) => Err(SendTimeoutError::Closed(value)), - } + /// Undo a successful call to `poll_ready`. + /// + /// Once a call to `poll_ready` returns `Poll::Ready(Ok(()))`, it holds up one slot in the + /// channel to make room for the coming send. `disarm` allows you to give up that slot if you + /// decide you do not wish to send an item after all. After calling `disarm`, you must call + /// `poll_ready` until it returns `Poll::Ready(Ok(()))` before attempting to send again. + /// + /// Returns `false` if no slot is reserved for this sender (usually because `poll_ready` was + /// not previously called, or did not succeed). + /// + /// # Motivation + /// + /// Since `poll_ready` takes up one of the finite number of slots in a bounded channel, callers + /// need to send an item shortly after `poll_ready` succeeds. If they do not, idle senders may + /// take up all the slots of the channel, and prevent active senders from getting any requests + /// through. Consider this code that forwards from one channel to another: + /// + /// ```rust,ignore + /// loop { + /// ready!(tx.poll_ready(cx))?; + /// if let Some(item) = ready!(rx.poll_recv(cx)) { + /// tx.try_send(item)?; + /// } else { + /// break; + /// } + /// } + /// ``` + /// + /// If many such forwarders exist, and they all forward into a single (cloned) `Sender`, then + /// any number of forwarders may be waiting for `rx.poll_recv` at the same time. While they do, + /// they are effectively each reducing the channel's capacity by 1. If enough of these + /// forwarders are idle, forwarders whose `rx` _do_ have elements will be unable to find a spot + /// for them through `poll_ready`, and the system will deadlock. + /// + /// `disarm` solves this problem by allowing you to give up the reserved slot if you find that + /// you have to block. We can then fix the code above by writing: + /// + /// ```rust,ignore + /// loop { + /// ready!(tx.poll_ready(cx))?; + /// let item = rx.poll_recv(cx); + /// if let Poll::Ready(Ok(_)) = item { + /// // we're going to send the item below, so don't disarm + /// } else { + /// // give up our send slot, we won't need it for a while + /// tx.disarm(); + /// } + /// if let Some(item) = ready!(item) { + /// tx.try_send(item)?; + /// } else { + /// break; + /// } + /// } + /// ``` + pub fn disarm(&mut self) -> bool { + if self.chan.is_ready() { + self.chan.disarm(); + true + } else { + false } } }