tokio/runtime/task/join.rs
1use crate::runtime::task::{AbortHandle, Header, RawTask};
2
3use std::fmt;
4use std::future::Future;
5use std::marker::PhantomData;
6use std::panic::{RefUnwindSafe, UnwindSafe};
7use std::pin::Pin;
8use std::task::{ready, Context, Poll, Waker};
9
10cfg_rt! {
11 /// An owned permission to join on a task (await its termination).
12 ///
13 /// This can be thought of as the equivalent of [`std::thread::JoinHandle`]
14 /// for a Tokio task rather than a thread. Note that the background task
15 /// associated with this `JoinHandle` started running immediately when you
16 /// called spawn, even if you have not yet awaited the `JoinHandle`.
17 ///
18 /// A `JoinHandle` *detaches* the associated task when it is dropped, which
19 /// means that there is no longer any handle to the task, and no way to `join`
20 /// on it.
21 ///
22 /// This `struct` is created by the [`task::spawn`] and [`task::spawn_blocking`]
23 /// functions.
24 ///
25 /// It is guaranteed that the destructor of the spawned task has finished
26 /// before task completion is observed via `JoinHandle` `await`,
27 /// [`JoinHandle::is_finished`] or [`AbortHandle::is_finished`].
28 ///
29 /// # Cancel safety
30 ///
31 /// The `&mut JoinHandle<T>` type is cancel safe. If it is used as the event
32 /// in a `tokio::select!` statement and some other branch completes first,
33 /// then it is guaranteed that the output of the task is not lost.
34 ///
35 /// If a `JoinHandle` is dropped, then the task continues running in the
36 /// background and its return value is lost.
37 ///
38 /// # Examples
39 ///
40 /// Creation from [`task::spawn`]:
41 ///
42 /// ```
43 /// use tokio::task;
44 ///
45 /// # async fn doc() {
46 /// let join_handle: task::JoinHandle<_> = task::spawn(async {
47 /// // some work here
48 /// });
49 /// # }
50 /// ```
51 ///
52 /// Creation from [`task::spawn_blocking`]:
53 ///
54 /// ```
55 /// use tokio::task;
56 ///
57 /// # async fn doc() {
58 /// let join_handle: task::JoinHandle<_> = task::spawn_blocking(|| {
59 /// // some blocking work here
60 /// });
61 /// # }
62 /// ```
63 ///
64 /// The generic parameter `T` in `JoinHandle<T>` is the return type of the spawned task.
65 /// If the return value is an `i32`, the join handle has type `JoinHandle<i32>`:
66 ///
67 /// ```
68 /// use tokio::task;
69 ///
70 /// # async fn doc() {
71 /// let join_handle: task::JoinHandle<i32> = task::spawn(async {
72 /// 5 + 3
73 /// });
74 /// # }
75 ///
76 /// ```
77 ///
78 /// If the task does not have a return value, the join handle has type `JoinHandle<()>`:
79 ///
80 /// ```
81 /// use tokio::task;
82 ///
83 /// # async fn doc() {
84 /// let join_handle: task::JoinHandle<()> = task::spawn(async {
85 /// println!("I return nothing.");
86 /// });
87 /// # }
88 /// ```
89 ///
90 /// Note that `handle.await` doesn't give you the return type directly. It is wrapped in a
91 /// `Result` because panics in the spawned task are caught by Tokio. The `?` operator has
92 /// to be double chained to extract the returned value:
93 ///
94 /// ```
95 /// use tokio::task;
96 /// use std::io;
97 ///
98 /// # #[tokio::main(flavor = "current_thread")]
99 /// # async fn main() -> io::Result<()> {
100 /// let join_handle: task::JoinHandle<Result<i32, io::Error>> = tokio::spawn(async {
101 /// Ok(5 + 3)
102 /// });
103 ///
104 /// let result = join_handle.await??;
105 /// assert_eq!(result, 8);
106 /// Ok(())
107 /// # }
108 /// ```
109 ///
110 /// If the task panics, the error is a [`JoinError`] that contains the panic:
111 ///
112 /// ```
113 /// # #[cfg(not(target_family = "wasm"))]
114 /// # {
115 /// use tokio::task;
116 /// use std::io;
117 /// use std::panic;
118 ///
119 /// #[tokio::main]
120 /// async fn main() -> io::Result<()> {
121 /// let join_handle: task::JoinHandle<Result<i32, io::Error>> = tokio::spawn(async {
122 /// panic!("boom");
123 /// });
124 ///
125 /// let err = join_handle.await.unwrap_err();
126 /// assert!(err.is_panic());
127 /// Ok(())
128 /// }
129 /// # }
130 /// ```
131 /// Child being detached and outliving its parent:
132 ///
133 /// ```no_run
134 /// use tokio::task;
135 /// use tokio::time;
136 /// use std::time::Duration;
137 ///
138 /// # #[tokio::main(flavor = "current_thread")]
139 /// # async fn main() {
140 /// let original_task = task::spawn(async {
141 /// let _detached_task = task::spawn(async {
142 /// // Here we sleep to make sure that the first task returns before.
143 /// time::sleep(Duration::from_millis(10)).await;
144 /// // This will be called, even though the JoinHandle is dropped.
145 /// println!("♫ Still alive ♫");
146 /// });
147 /// });
148 ///
149 /// original_task.await.expect("The task being joined has panicked");
150 /// println!("Original task is joined.");
151 ///
152 /// // We make sure that the new task has time to run, before the main
153 /// // task returns.
154 ///
155 /// time::sleep(Duration::from_millis(1000)).await;
156 /// # }
157 /// ```
158 ///
159 /// [`task::spawn`]: crate::task::spawn()
160 /// [`task::spawn_blocking`]: crate::task::spawn_blocking
161 /// [`std::thread::JoinHandle`]: std::thread::JoinHandle
162 /// [`JoinError`]: crate::task::JoinError
163 pub struct JoinHandle<T> {
164 raw: RawTask,
165 _p: PhantomData<T>,
166 }
167}
168
169unsafe impl<T: Send> Send for JoinHandle<T> {}
170unsafe impl<T: Send> Sync for JoinHandle<T> {}
171
172impl<T> UnwindSafe for JoinHandle<T> {}
173impl<T> RefUnwindSafe for JoinHandle<T> {}
174
175impl<T> JoinHandle<T> {
176 pub(super) fn new(raw: RawTask) -> JoinHandle<T> {
177 JoinHandle {
178 raw,
179 _p: PhantomData,
180 }
181 }
182
183 /// Abort the task associated with the handle.
184 ///
185 /// Awaiting a cancelled task might complete as usual if the task was
186 /// already completed at the time it was cancelled, but most likely it
187 /// will fail with a [cancelled] `JoinError`.
188 ///
189 /// Be aware that tasks spawned using [`spawn_blocking`] cannot be aborted
190 /// because they are not async. If you call `abort` on a `spawn_blocking`
191 /// task, then this *will not have any effect*, and the task will continue
192 /// running normally. The exception is if the task has not started running
193 /// yet; in that case, calling `abort` may prevent the task from starting.
194 ///
195 /// See also [the module level docs] for more information on cancellation.
196 ///
197 /// ```rust
198 /// use tokio::time;
199 ///
200 /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
201 /// # async fn main() {
202 /// let mut handles = Vec::new();
203 ///
204 /// handles.push(tokio::spawn(async {
205 /// time::sleep(time::Duration::from_secs(10)).await;
206 /// true
207 /// }));
208 ///
209 /// handles.push(tokio::spawn(async {
210 /// time::sleep(time::Duration::from_secs(10)).await;
211 /// false
212 /// }));
213 ///
214 /// for handle in &handles {
215 /// handle.abort();
216 /// }
217 ///
218 /// for handle in handles {
219 /// assert!(handle.await.unwrap_err().is_cancelled());
220 /// }
221 /// # }
222 /// ```
223 ///
224 /// [cancelled]: method@super::error::JoinError::is_cancelled
225 /// [the module level docs]: crate::task#cancellation
226 /// [`spawn_blocking`]: crate::task::spawn_blocking
227 pub fn abort(&self) {
228 self.raw.remote_abort();
229 }
230
231 /// Checks if the task associated with this `JoinHandle` has finished.
232 ///
233 /// Please note that this method can return `false` even if [`abort`] has been
234 /// called on the task. This is because the cancellation process may take
235 /// some time, and this method does not return `true` until it has
236 /// completed.
237 ///
238 /// ```rust
239 /// use tokio::time;
240 ///
241 /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
242 /// # async fn main() {
243 /// let handle1 = tokio::spawn(async {
244 /// // do some stuff here
245 /// });
246 /// let handle2 = tokio::spawn(async {
247 /// // do some other stuff here
248 /// time::sleep(time::Duration::from_secs(10)).await;
249 /// });
250 /// // Wait for the task to finish
251 /// handle2.abort();
252 /// time::sleep(time::Duration::from_secs(1)).await;
253 /// assert!(handle1.is_finished());
254 /// assert!(handle2.is_finished());
255 /// # }
256 /// ```
257 /// [`abort`]: method@JoinHandle::abort
258 pub fn is_finished(&self) -> bool {
259 let state = self.raw.header().state.load();
260 state.is_complete()
261 }
262
263 /// Set the waker that is notified when the task completes.
264 pub(crate) fn set_join_waker(&mut self, waker: &Waker) {
265 if self.raw.try_set_join_waker(waker) {
266 // In this case the task has already completed. We wake the waker immediately.
267 waker.wake_by_ref();
268 }
269 }
270
271 /// Returns a new `AbortHandle` that can be used to remotely abort this task.
272 ///
273 /// Awaiting a task cancelled by the `AbortHandle` might complete as usual if the task was
274 /// already completed at the time it was cancelled, but most likely it
275 /// will fail with a [cancelled] `JoinError`.
276 ///
277 /// ```rust
278 /// use tokio::{time, task};
279 ///
280 /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
281 /// # async fn main() {
282 /// let mut handles = Vec::new();
283 ///
284 /// handles.push(tokio::spawn(async {
285 /// time::sleep(time::Duration::from_secs(10)).await;
286 /// true
287 /// }));
288 ///
289 /// handles.push(tokio::spawn(async {
290 /// time::sleep(time::Duration::from_secs(10)).await;
291 /// false
292 /// }));
293 ///
294 /// let abort_handles: Vec<task::AbortHandle> = handles.iter().map(|h| h.abort_handle()).collect();
295 ///
296 /// for handle in abort_handles {
297 /// handle.abort();
298 /// }
299 ///
300 /// for handle in handles {
301 /// assert!(handle.await.unwrap_err().is_cancelled());
302 /// }
303 /// # }
304 /// ```
305 /// [cancelled]: method@super::error::JoinError::is_cancelled
306 #[must_use = "abort handles do nothing unless `.abort` is called"]
307 pub fn abort_handle(&self) -> AbortHandle {
308 self.raw.ref_inc();
309 AbortHandle::new(self.raw)
310 }
311
312 /// Returns a [task ID] that uniquely identifies this task relative to other
313 /// currently spawned tasks.
314 ///
315 /// [task ID]: crate::task::Id
316 pub fn id(&self) -> super::Id {
317 // Safety: The header pointer is valid.
318 unsafe { Header::get_id(self.raw.header_ptr()) }
319 }
320}
321
322impl<T> Unpin for JoinHandle<T> {}
323
324impl<T> Future for JoinHandle<T> {
325 type Output = super::Result<T>;
326
327 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
328 ready!(crate::trace::trace_leaf(cx));
329 let mut ret = Poll::Pending;
330
331 // Keep track of task budget
332 let coop = ready!(crate::task::coop::poll_proceed(cx));
333
334 // Try to read the task output. If the task is not yet complete, the
335 // waker is stored and is notified once the task does complete.
336 //
337 // The function must go via the vtable, which requires erasing generic
338 // types. To do this, the function "return" is placed on the stack
339 // **before** calling the function and is passed into the function using
340 // `*mut ()`.
341 //
342 // Safety:
343 //
344 // The type of `T` must match the task's output type.
345 unsafe {
346 self.raw.try_read_output(&mut ret, cx.waker());
347 }
348
349 if ret.is_ready() {
350 coop.made_progress();
351 }
352
353 ret
354 }
355}
356
357impl<T> Drop for JoinHandle<T> {
358 fn drop(&mut self) {
359 if self.raw.state().drop_join_handle_fast().is_ok() {
360 return;
361 }
362
363 self.raw.drop_join_handle_slow();
364 }
365}
366
367impl<T> fmt::Debug for JoinHandle<T>
368where
369 T: fmt::Debug,
370{
371 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
372 // Safety: The header pointer is valid.
373 let id_ptr = unsafe { Header::get_id_ptr(self.raw.header_ptr()) };
374 let id = unsafe { id_ptr.as_ref() };
375 fmt.debug_struct("JoinHandle").field("id", id).finish()
376 }
377}