Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
T
tokio
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Package Registry
Container Registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Issue analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Terms and privacy
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
felixmoebius
tokio
Commits
de8326a5
Unverified
Commit
de8326a5
authored
4 years ago
by
nasa
Committed by
GitHub
4 years ago
Browse files
Options
Downloads
Patches
Plain Diff
doc: Sort methods on mpsc::Sender in doc (#2379)
parent
d65bf380
No related branches found
No related tags found
No related merge requests found
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
tokio/src/sync/mpsc/bounded.rs
+152
-155
152 additions, 155 deletions
tokio/src/sync/mpsc/bounded.rs
with
152 additions
and
155 deletions
tokio/src/sync/mpsc/bounded.rs
+
152
−
155
View file @
de8326a5
...
...
@@ -196,87 +196,62 @@ impl<T> Sender<T> {
Sender
{
chan
}
}
/// Returns `Poll::Ready(Ok(()))` when the channel is able to accept another item.
///
/// If the channel is full, then `Poll::Pending` is returned and the task is notified when a
/// slot becomes available.
///
/// Once `poll_ready` returns `Poll::Ready(Ok(()))`, a call to `try_send` will succeed unless
/// the channel has since been closed. To provide this guarantee, the channel reserves one slot
/// in the channel for the coming send. This reserved slot is not available to other `Sender`
/// instances, so you need to be careful to not end up with deadlocks by blocking after calling
/// `poll_ready` but before sending an element.
/// Sends a value, waiting until there is capacity.
///
/// If, after `poll_ready` succeeds, you decide you do not wish to send an item after all, you
/// can use [`disarm`](Sender::disarm) to release the reserved slot.
/// A successful send occurs when it is determined that the other end of the
/// channel has not hung up already. An unsuccessful send would be one where
/// the corresponding receiver has already been closed. Note that a return
/// value of `Err` means that the data will never be received, but a return
/// value of `Ok` does not mean that the data will be received. It is
/// possible for the corresponding receiver to hang up immediately after
/// this function returns `Ok`.
///
/// Until an item is sent or [`disarm`](Sender::disarm) is called, repeated calls to
/// `poll_ready` will return either `Poll::Ready(Ok(()))` or `Poll::Ready(Err(_))` if channel
/// is closed.
pub
fn
poll_ready
(
&
mut
self
,
cx
:
&
mut
Context
<
'_
>
)
->
Poll
<
Result
<
(),
ClosedError
>>
{
self
.chan
.poll_ready
(
cx
)
.map_err
(|
_
|
ClosedError
::
new
())
}
/// Undo a successful call to `poll_ready`.
/// # Errors
///
/// Once a call to `poll_ready` returns `Poll::Ready(Ok(()))`, it holds up one slot in the
/// channel to make room for the coming send. `disarm` allows you to give up that slot if you
/// decide you do not wish to send an item after all. After calling `disarm`, you must call
/// `poll_ready` until it returns `Poll::Ready(Ok(()))` before attempting to send again.
/// If the receive half of the channel is closed, either due to [`close`]
/// being called or the [`Receiver`] handle dropping, the function returns
/// an error. The error includes the value passed to `send`.
///
///
Returns `false` if no slot is reserved for this sender (usually because `poll_ready` was
///
not previously called, or did not succeed).
///
[`close`]: Receiver::close
///
[`Receiver`]: Receiver
///
/// #
Motivation
/// #
Examples
///
/// Since `poll_ready` takes up one of the finite number of slots in a bounded channel, callers
/// need to send an item shortly after `poll_ready` succeeds. If they do not, idle senders may
/// take up all the slots of the channel, and prevent active senders from getting any requests
/// through. Consider this code that forwards from one channel to another:
/// In the following example, each call to `send` will block until the
/// previously sent value was received.
///
/// ```rust,ignore
/// loop {
/// ready!(tx.poll_ready(cx))?;
/// if let Some(item) = ready!(rx.poll_recv(cx)) {
/// tx.try_send(item)?;
/// } else {
/// break;
/// }
/// }
/// ```
/// ```rust
/// use tokio::sync::mpsc;
///
/// If many such forwarders exist, and they all forward into a single (cloned) `Sender`, then
/// any number of forwarders may be waiting for `rx.poll_recv` at the same time. While they do,
/// they are effectively each reducing the channel's capacity by 1. If enough of these
/// forwarders are idle, forwarders whose `rx` _do_ have elements will be unable to find a spot
/// for them through `poll_ready`, and the system will deadlock.
/// #[tokio::main]
/// async fn main() {
/// let (mut tx, mut rx) = mpsc::channel(1);
///
/// `disarm` solves this problem by allowing you to give up the reserved slot if you find that
/// you have to block. We can then fix the code above by writing:
/// tokio::spawn(async move {
/// for i in 0..10 {
/// if let Err(_) = tx.send(i).await {
/// println!("receiver dropped");
/// return;
/// }
/// }
/// });
///
/// ```rust,ignore
/// loop {
/// ready!(tx.poll_ready(cx))?;
/// let item = rx.poll_recv(cx);
/// if let Poll::Ready(Ok(_)) = item {
/// // we're going to send the item below, so don't disarm
/// } else {
/// // give up our send slot, we won't need it for a while
/// tx.disarm();
/// }
/// if let Some(item) = ready!(item) {
/// tx.try_send(item)?;
/// } else {
/// break;
/// }
/// while let Some(i) = rx.recv().await {
/// println!("got = {}", i);
/// }
/// }
/// ```
pub
fn
disarm
(
&
mut
self
)
->
bool
{
if
self
.chan
.is_ready
()
{
self
.chan
.disarm
();
true
}
else
{
false
pub
async
fn
send
(
&
mut
self
,
value
:
T
)
->
Result
<
(),
SendError
<
T
>>
{
use
crate
::
future
::
poll_fn
;
if
poll_fn
(|
cx
|
self
.poll_ready
(
cx
))
.await
.is_err
()
{
return
Err
(
SendError
(
value
));
}
match
self
.try_send
(
value
)
{
Ok
(())
=>
Ok
(()),
Err
(
TrySendError
::
Full
(
_
))
=>
unreachable!
(),
Err
(
TrySendError
::
Closed
(
value
))
=>
Err
(
SendError
(
value
)),
}
}
...
...
@@ -347,15 +322,13 @@ impl<T> Sender<T> {
Ok
(())
}
/// Sends a value, waiting until there is capacity.
/// Sends a value, waiting until there is capacity
, but only for a limited time
.
///
/// A successful send occurs when it is determined that the other end of the
/// channel has not hung up already. An unsuccessful send would be one where
/// the corresponding receiver has already been closed. Note that a return
/// value of `Err` means that the data will never be received, but a return
/// value of `Ok` does not mean that the data will be received. It is
/// possible for the corresponding receiver to hang up immediately after
/// this function returns `Ok`.
/// Shares the same success and error conditions as [`send`], adding one more
/// condition for an unsuccessful send, which is when the provided timeout has
/// elapsed, and there is no capacity available.
///
/// [`send`]: Sender::send
///
/// # Errors
///
...
...
@@ -368,11 +341,12 @@ impl<T> Sender<T> {
///
/// # Examples
///
/// In the following example, each call to `send` will block until the
/// previously sent value was received.
/// In the following example, each call to `send
_timeout
` will block until the
/// previously sent value was received
, unless the timeout has elapsed
.
///
/// ```rust
/// use tokio::sync::mpsc;
/// use tokio::time::{delay_for, Duration};
///
/// #[tokio::main]
/// async fn main() {
...
...
@@ -380,8 +354,8 @@ impl<T> Sender<T> {
///
/// tokio::spawn(async move {
/// for i in 0..10 {
/// if let Err(
_
) = tx.send
(i
).await {
/// println!("
receiver dropped"
);
/// if let Err(
e
) = tx.send
_timeout(i, Duration::from_millis(100)
).await {
/// println!("
send error: #{:?}", e
);
/// return;
/// }
/// }
...
...
@@ -389,94 +363,117 @@ impl<T> Sender<T> {
///
/// while let Some(i) = rx.recv().await {
/// println!("got = {}", i);
/// delay_for(Duration::from_millis(200)).await;
/// }
/// }
/// ```
pub
async
fn
send
(
&
mut
self
,
value
:
T
)
->
Result
<
(),
SendError
<
T
>>
{
#[cfg(feature
=
"time"
)]
#[cfg_attr(docsrs,
doc(cfg(feature
=
"time"
)))]
pub
async
fn
send_timeout
(
&
mut
self
,
value
:
T
,
timeout
:
Duration
,
)
->
Result
<
(),
SendTimeoutError
<
T
>>
{
use
crate
::
future
::
poll_fn
;
if
poll_fn
(|
cx
|
self
.poll_ready
(
cx
))
.await
.is_err
()
{
return
Err
(
SendError
(
value
));
match
crate
::
time
::
timeout
(
timeout
,
poll_fn
(|
cx
|
self
.poll_ready
(
cx
)))
.await
{
Err
(
_
)
=>
{
return
Err
(
SendTimeoutError
::
Timeout
(
value
));
}
Ok
(
Err
(
_
))
=>
{
return
Err
(
SendTimeoutError
::
Closed
(
value
));
}
Ok
(
_
)
=>
{}
}
match
self
.try_send
(
value
)
{
Ok
(())
=>
Ok
(()),
Err
(
TrySendError
::
Full
(
_
))
=>
unreachable!
(),
Err
(
TrySendError
::
Closed
(
value
))
=>
Err
(
Send
Error
(
value
)),
Err
(
TrySendError
::
Closed
(
value
))
=>
Err
(
Send
TimeoutError
::
Closed
(
value
)),
}
}
}
cfg_time!
{
impl
<
T
>
Sender
<
T
>
{
/// Sends a value, waiting until there is capacity, but only for a limited time.
///
/// Shares the same success and error conditions as [`send`], adding one more
/// condition for an unsuccessful send, which is when the provided timeout has
/// elapsed, and there is no capacity available.
///
/// [`send`]: Sender::send
///
/// # Errors
///
/// If the receive half of the channel is closed, either due to [`close`] being
/// called or the [`Receiver`] handle dropping, or if the timeout specified
/// elapses before the capacity is available the function returns an error.
/// The error includes the value passed to `send_timeout`.
///
/// [`close`]: Receiver::close
/// [`Receiver`]: Receiver
///
/// # Examples
///
/// In the following example, each call to `send_timeout` will block until the
/// previously sent value was received, unless the timeout has elapsed.
///
/// ```rust
/// use tokio::sync::mpsc;
/// use tokio::time::{delay_for, Duration};
///
/// #[tokio::main]
/// async fn main() {
/// let (mut tx, mut rx) = mpsc::channel(1);
///
/// tokio::spawn(async move {
/// for i in 0..10 {
/// if let Err(e) = tx.send_timeout(i, Duration::from_millis(100)).await {
/// println!("send error: #{:?}", e);
/// return;
/// }
/// }
/// });
///
/// while let Some(i) = rx.recv().await {
/// println!("got = {}", i);
/// delay_for(Duration::from_millis(200)).await;
/// }
/// }
/// ```
pub
async
fn
send_timeout
(
&
mut
self
,
value
:
T
,
timeout
:
Duration
,
)
->
Result
<
(),
SendTimeoutError
<
T
>>
{
use
crate
::
future
::
poll_fn
;
match
crate
::
time
::
timeout
(
timeout
,
poll_fn
(|
cx
|
self
.poll_ready
(
cx
)))
.await
{
Err
(
_
)
=>
{
return
Err
(
SendTimeoutError
::
Timeout
(
value
));
}
Ok
(
Err
(
_
))
=>
{
return
Err
(
SendTimeoutError
::
Closed
(
value
));
}
Ok
(
_
)
=>
{}
}
/// Returns `Poll::Ready(Ok(()))` when the channel is able to accept another item.
///
/// If the channel is full, then `Poll::Pending` is returned and the task is notified when a
/// slot becomes available.
///
/// Once `poll_ready` returns `Poll::Ready(Ok(()))`, a call to `try_send` will succeed unless
/// the channel has since been closed. To provide this guarantee, the channel reserves one slot
/// in the channel for the coming send. This reserved slot is not available to other `Sender`
/// instances, so you need to be careful to not end up with deadlocks by blocking after calling
/// `poll_ready` but before sending an element.
///
/// If, after `poll_ready` succeeds, you decide you do not wish to send an item after all, you
/// can use [`disarm`](Sender::disarm) to release the reserved slot.
///
/// Until an item is sent or [`disarm`](Sender::disarm) is called, repeated calls to
/// `poll_ready` will return either `Poll::Ready(Ok(()))` or `Poll::Ready(Err(_))` if channel
/// is closed.
pub
fn
poll_ready
(
&
mut
self
,
cx
:
&
mut
Context
<
'_
>
)
->
Poll
<
Result
<
(),
ClosedError
>>
{
self
.chan
.poll_ready
(
cx
)
.map_err
(|
_
|
ClosedError
::
new
())
}
match
self
.try_send
(
value
)
{
Ok
(())
=>
Ok
(()),
Err
(
TrySendError
::
Full
(
_
))
=>
unreachable!
(),
Err
(
TrySendError
::
Closed
(
value
))
=>
Err
(
SendTimeoutError
::
Closed
(
value
)),
}
/// Undo a successful call to `poll_ready`.
///
/// Once a call to `poll_ready` returns `Poll::Ready(Ok(()))`, it holds up one slot in the
/// channel to make room for the coming send. `disarm` allows you to give up that slot if you
/// decide you do not wish to send an item after all. After calling `disarm`, you must call
/// `poll_ready` until it returns `Poll::Ready(Ok(()))` before attempting to send again.
///
/// Returns `false` if no slot is reserved for this sender (usually because `poll_ready` was
/// not previously called, or did not succeed).
///
/// # Motivation
///
/// Since `poll_ready` takes up one of the finite number of slots in a bounded channel, callers
/// need to send an item shortly after `poll_ready` succeeds. If they do not, idle senders may
/// take up all the slots of the channel, and prevent active senders from getting any requests
/// through. Consider this code that forwards from one channel to another:
///
/// ```rust,ignore
/// loop {
/// ready!(tx.poll_ready(cx))?;
/// if let Some(item) = ready!(rx.poll_recv(cx)) {
/// tx.try_send(item)?;
/// } else {
/// break;
/// }
/// }
/// ```
///
/// If many such forwarders exist, and they all forward into a single (cloned) `Sender`, then
/// any number of forwarders may be waiting for `rx.poll_recv` at the same time. While they do,
/// they are effectively each reducing the channel's capacity by 1. If enough of these
/// forwarders are idle, forwarders whose `rx` _do_ have elements will be unable to find a spot
/// for them through `poll_ready`, and the system will deadlock.
///
/// `disarm` solves this problem by allowing you to give up the reserved slot if you find that
/// you have to block. We can then fix the code above by writing:
///
/// ```rust,ignore
/// loop {
/// ready!(tx.poll_ready(cx))?;
/// let item = rx.poll_recv(cx);
/// if let Poll::Ready(Ok(_)) = item {
/// // we're going to send the item below, so don't disarm
/// } else {
/// // give up our send slot, we won't need it for a while
/// tx.disarm();
/// }
/// if let Some(item) = ready!(item) {
/// tx.try_send(item)?;
/// } else {
/// break;
/// }
/// }
/// ```
pub
fn
disarm
(
&
mut
self
)
->
bool
{
if
self
.chan
.is_ready
()
{
self
.chan
.disarm
();
true
}
else
{
false
}
}
}
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment