Skip to main content

tokio/runtime/task/
join.rs

1use crate::runtime::task::{AbortHandle, Header, RawTask};
2
3use std::fmt;
4use std::future::Future;
5use std::marker::PhantomData;
6use std::panic::{RefUnwindSafe, UnwindSafe};
7use std::pin::Pin;
8use std::task::{ready, Context, Poll, Waker};
9
10cfg_rt! {
11    /// An owned permission to join on a task (await its termination).
12    ///
13    /// This can be thought of as the equivalent of [`std::thread::JoinHandle`]
14    /// for a Tokio task rather than a thread. Note that the background task
15    /// associated with this `JoinHandle` started running immediately when you
16    /// called spawn, even if you have not yet awaited the `JoinHandle`.
17    ///
18    /// A `JoinHandle` *detaches* the associated task when it is dropped, which
19    /// means that there is no longer any handle to the task, and no way to `join`
20    /// on it.
21    ///
22    /// This `struct` is created by the [`task::spawn`] and [`task::spawn_blocking`]
23    /// functions.
24    ///
25    /// It is guaranteed that the destructor of the spawned task has finished
26    /// before task completion is observed via `JoinHandle` `await`,
27    /// [`JoinHandle::is_finished`] or [`AbortHandle::is_finished`].
28    ///
29    /// # Cancel safety
30    ///
31    /// The `&mut JoinHandle<T>` type is cancel safe. If it is used as the event
32    /// in a `tokio::select!` statement and some other branch completes first,
33    /// then it is guaranteed that the output of the task is not lost.
34    ///
35    /// If a `JoinHandle` is dropped, then the task continues running in the
36    /// background and its return value is lost.
37    ///
38    /// # Examples
39    ///
40    /// Creation from [`task::spawn`]:
41    ///
42    /// ```
43    /// use tokio::task;
44    ///
45    /// # async fn doc() {
46    /// let join_handle: task::JoinHandle<_> = task::spawn(async {
47    ///     // some work here
48    /// });
49    /// # }
50    /// ```
51    ///
52    /// Creation from [`task::spawn_blocking`]:
53    ///
54    /// ```
55    /// use tokio::task;
56    ///
57    /// # async fn doc() {
58    /// let join_handle: task::JoinHandle<_> = task::spawn_blocking(|| {
59    ///     // some blocking work here
60    /// });
61    /// # }
62    /// ```
63    ///
64    /// The generic parameter `T` in `JoinHandle<T>` is the return type of the spawned task.
65    /// If the return value is an `i32`, the join handle has type `JoinHandle<i32>`:
66    ///
67    /// ```
68    /// use tokio::task;
69    ///
70    /// # async fn doc() {
71    /// let join_handle: task::JoinHandle<i32> = task::spawn(async {
72    ///     5 + 3
73    /// });
74    /// # }
75    ///
76    /// ```
77    ///
78    /// If the task does not have a return value, the join handle has type `JoinHandle<()>`:
79    ///
80    /// ```
81    /// use tokio::task;
82    ///
83    /// # async fn doc() {
84    /// let join_handle: task::JoinHandle<()> = task::spawn(async {
85    ///     println!("I return nothing.");
86    /// });
87    /// # }
88    /// ```
89    ///
90    /// Note that `handle.await` doesn't give you the return type directly. It is wrapped in a
91    /// `Result` because panics in the spawned task are caught by Tokio. The `?` operator has
92    /// to be double chained to extract the returned value:
93    ///
94    /// ```
95    /// use tokio::task;
96    /// use std::io;
97    ///
98    /// # #[tokio::main(flavor = "current_thread")]
99    /// # async fn main() -> io::Result<()> {
100    /// let join_handle: task::JoinHandle<Result<i32, io::Error>> = tokio::spawn(async {
101    ///     Ok(5 + 3)
102    /// });
103    ///
104    /// let result = join_handle.await??;
105    /// assert_eq!(result, 8);
106    /// Ok(())
107    /// # }
108    /// ```
109    ///
110    /// If the task panics, the error is a [`JoinError`] that contains the panic:
111    ///
112    /// ```
113    /// # #[cfg(not(target_family = "wasm"))]
114    /// # {
115    /// use tokio::task;
116    /// use std::io;
117    /// use std::panic;
118    ///
119    /// #[tokio::main]
120    /// async fn main() -> io::Result<()> {
121    ///     let join_handle: task::JoinHandle<Result<i32, io::Error>> = tokio::spawn(async {
122    ///         panic!("boom");
123    ///     });
124    ///
125    ///     let err = join_handle.await.unwrap_err();
126    ///     assert!(err.is_panic());
127    ///     Ok(())
128    /// }
129    /// # }
130    /// ```
131    /// Child being detached and outliving its parent:
132    ///
133    /// ```no_run
134    /// use tokio::task;
135    /// use tokio::time;
136    /// use std::time::Duration;
137    ///
138    /// # #[tokio::main(flavor = "current_thread")]
139    /// # async fn main() {
140    /// let original_task = task::spawn(async {
141    ///     let _detached_task = task::spawn(async {
142    ///         // Here we sleep to make sure that the first task returns before.
143    ///         time::sleep(Duration::from_millis(10)).await;
144    ///         // This will be called, even though the JoinHandle is dropped.
145    ///         println!("♫ Still alive ♫");
146    ///     });
147    /// });
148    ///
149    /// original_task.await.expect("The task being joined has panicked");
150    /// println!("Original task is joined.");
151    ///
152    /// // We make sure that the new task has time to run, before the main
153    /// // task returns.
154    ///
155    /// time::sleep(Duration::from_millis(1000)).await;
156    /// # }
157    /// ```
158    ///
159    /// [`task::spawn`]: crate::task::spawn()
160    /// [`task::spawn_blocking`]: crate::task::spawn_blocking
161    /// [`std::thread::JoinHandle`]: std::thread::JoinHandle
162    /// [`JoinError`]: crate::task::JoinError
163    pub struct JoinHandle<T> {
164        raw: RawTask,
165        _p: PhantomData<T>,
166    }
167}
168
169unsafe impl<T: Send> Send for JoinHandle<T> {}
170unsafe impl<T: Send> Sync for JoinHandle<T> {}
171
172impl<T> UnwindSafe for JoinHandle<T> {}
173impl<T> RefUnwindSafe for JoinHandle<T> {}
174
175impl<T> JoinHandle<T> {
176    pub(super) fn new(raw: RawTask) -> JoinHandle<T> {
177        JoinHandle {
178            raw,
179            _p: PhantomData,
180        }
181    }
182
183    /// Abort the task associated with the handle.
184    ///
185    /// Awaiting a cancelled task might complete as usual if the task was
186    /// already completed at the time it was cancelled, but most likely it
187    /// will fail with a [cancelled] `JoinError`.
188    ///
189    /// Be aware that tasks spawned using [`spawn_blocking`] cannot be aborted
190    /// because they are not async. If you call `abort` on a `spawn_blocking`
191    /// task, then this *will not have any effect*, and the task will continue
192    /// running normally. The exception is if the task has not started running
193    /// yet; in that case, calling `abort` may prevent the task from starting.
194    ///
195    /// See also [the module level docs] for more information on cancellation.
196    ///
197    /// ```rust
198    /// use tokio::time;
199    ///
200    /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
201    /// # async fn main() {
202    /// let mut handles = Vec::new();
203    ///
204    /// handles.push(tokio::spawn(async {
205    ///    time::sleep(time::Duration::from_secs(10)).await;
206    ///    true
207    /// }));
208    ///
209    /// handles.push(tokio::spawn(async {
210    ///    time::sleep(time::Duration::from_secs(10)).await;
211    ///    false
212    /// }));
213    ///
214    /// for handle in &handles {
215    ///     handle.abort();
216    /// }
217    ///
218    /// for handle in handles {
219    ///     assert!(handle.await.unwrap_err().is_cancelled());
220    /// }
221    /// # }
222    /// ```
223    ///
224    /// [cancelled]: method@super::error::JoinError::is_cancelled
225    /// [the module level docs]: crate::task#cancellation
226    /// [`spawn_blocking`]: crate::task::spawn_blocking
227    pub fn abort(&self) {
228        self.raw.remote_abort();
229    }
230
231    /// Checks if the task associated with this `JoinHandle` has finished.
232    ///
233    /// Please note that this method can return `false` even if [`abort`] has been
234    /// called on the task. This is because the cancellation process may take
235    /// some time, and this method does not return `true` until it has
236    /// completed.
237    ///
238    /// ```rust
239    /// use tokio::time;
240    ///
241    /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
242    /// # async fn main() {
243    /// let handle1 = tokio::spawn(async {
244    ///     // do some stuff here
245    /// });
246    /// let handle2 = tokio::spawn(async {
247    ///     // do some other stuff here
248    ///     time::sleep(time::Duration::from_secs(10)).await;
249    /// });
250    /// // Wait for the task to finish
251    /// handle2.abort();
252    /// time::sleep(time::Duration::from_secs(1)).await;
253    /// assert!(handle1.is_finished());
254    /// assert!(handle2.is_finished());
255    /// # }
256    /// ```
257    /// [`abort`]: method@JoinHandle::abort
258    pub fn is_finished(&self) -> bool {
259        let state = self.raw.header().state.load();
260        state.is_complete()
261    }
262
263    /// Set the waker that is notified when the task completes.
264    pub(crate) fn set_join_waker(&mut self, waker: &Waker) {
265        if self.raw.try_set_join_waker(waker) {
266            // In this case the task has already completed. We wake the waker immediately.
267            waker.wake_by_ref();
268        }
269    }
270
271    /// Returns a new `AbortHandle` that can be used to remotely abort this task.
272    ///
273    /// Awaiting a task cancelled by the `AbortHandle` might complete as usual if the task was
274    /// already completed at the time it was cancelled, but most likely it
275    /// will fail with a [cancelled] `JoinError`.
276    ///
277    /// ```rust
278    /// use tokio::{time, task};
279    ///
280    /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
281    /// # async fn main() {
282    /// let mut handles = Vec::new();
283    ///
284    /// handles.push(tokio::spawn(async {
285    ///    time::sleep(time::Duration::from_secs(10)).await;
286    ///    true
287    /// }));
288    ///
289    /// handles.push(tokio::spawn(async {
290    ///    time::sleep(time::Duration::from_secs(10)).await;
291    ///    false
292    /// }));
293    ///
294    /// let abort_handles: Vec<task::AbortHandle> = handles.iter().map(|h| h.abort_handle()).collect();
295    ///
296    /// for handle in abort_handles {
297    ///     handle.abort();
298    /// }
299    ///
300    /// for handle in handles {
301    ///     assert!(handle.await.unwrap_err().is_cancelled());
302    /// }
303    /// # }
304    /// ```
305    /// [cancelled]: method@super::error::JoinError::is_cancelled
306    #[must_use = "abort handles do nothing unless `.abort` is called"]
307    pub fn abort_handle(&self) -> AbortHandle {
308        self.raw.ref_inc();
309        AbortHandle::new(self.raw)
310    }
311
312    /// Returns a [task ID] that uniquely identifies this task relative to other
313    /// currently spawned tasks.
314    ///
315    /// [task ID]: crate::task::Id
316    pub fn id(&self) -> super::Id {
317        // Safety: The header pointer is valid.
318        unsafe { Header::get_id(self.raw.header_ptr()) }
319    }
320}
321
322impl<T> Unpin for JoinHandle<T> {}
323
324impl<T> Future for JoinHandle<T> {
325    type Output = super::Result<T>;
326
327    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
328        ready!(crate::trace::trace_leaf(cx));
329        let mut ret = Poll::Pending;
330
331        // Keep track of task budget
332        let coop = ready!(crate::task::coop::poll_proceed(cx));
333
334        // Try to read the task output. If the task is not yet complete, the
335        // waker is stored and is notified once the task does complete.
336        //
337        // The function must go via the vtable, which requires erasing generic
338        // types. To do this, the function "return" is placed on the stack
339        // **before** calling the function and is passed into the function using
340        // `*mut ()`.
341        //
342        // Safety:
343        //
344        // The type of `T` must match the task's output type.
345        unsafe {
346            self.raw.try_read_output(&mut ret, cx.waker());
347        }
348
349        if ret.is_ready() {
350            coop.made_progress();
351        }
352
353        ret
354    }
355}
356
357impl<T> Drop for JoinHandle<T> {
358    fn drop(&mut self) {
359        if self.raw.state().drop_join_handle_fast().is_ok() {
360            return;
361        }
362
363        self.raw.drop_join_handle_slow();
364    }
365}
366
367impl<T> fmt::Debug for JoinHandle<T>
368where
369    T: fmt::Debug,
370{
371    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
372        // Safety: The header pointer is valid.
373        let id_ptr = unsafe { Header::get_id_ptr(self.raw.header_ptr()) };
374        let id = unsafe { id_ptr.as_ref() };
375        fmt.debug_struct("JoinHandle").field("id", id).finish()
376    }
377}