Skip to main content

tokio/runtime/
runtime.rs

1use super::BOX_FUTURE_THRESHOLD;
2use crate::runtime::blocking::BlockingPool;
3use crate::runtime::scheduler::CurrentThread;
4use crate::runtime::{context, EnterGuard, Handle};
5use crate::task::JoinHandle;
6use crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR;
7use crate::util::trace::SpawnMeta;
8
9use std::future::Future;
10use std::io;
11use std::mem;
12use std::time::Duration;
13
14cfg_rt_multi_thread! {
15    use crate::runtime::Builder;
16    use crate::runtime::scheduler::MultiThread;
17}
18
19/// The Tokio runtime.
20///
21/// The runtime provides an I/O driver, task scheduler, [timer], and
22/// blocking pool, necessary for running asynchronous tasks.
23///
24/// Instances of `Runtime` can be created using [`new`], or [`Builder`].
25/// However, most users will use the [`#[tokio::main]`][main] annotation on
26/// their entry point instead.
27///
28/// See [module level][mod] documentation for more details.
29///
30/// # Shutdown
31///
32/// Shutting down the runtime is done by dropping the value, or calling
33/// [`shutdown_background`] or [`shutdown_timeout`].
34///
35/// Tasks spawned through [`Runtime::spawn`] keep running until they yield.
36/// Then they are dropped. They are not *guaranteed* to run to completion, but
37/// *might* do so if they do not yield until completion.
38///
39/// Blocking functions spawned through [`Runtime::spawn_blocking`] keep running
40/// until they return.
41///
42/// The thread initiating the shutdown blocks until all spawned work has been
43/// stopped. This can take an indefinite amount of time. The `Drop`
44/// implementation waits forever for this.
45///
46/// The [`shutdown_background`] and [`shutdown_timeout`] methods can be used if
47/// waiting forever is undesired. When the timeout is reached, spawned work that
48/// did not stop in time and threads running it are leaked. The work continues
49/// to run until one of the stopping conditions is fulfilled, but the thread
50/// initiating the shutdown is unblocked.
51///
52/// Once the runtime has been dropped, any outstanding I/O resources bound to
53/// it will no longer function. Calling any method on them will result in an
54/// error.
55///
56/// # Sharing
57///
58/// There are several ways to establish shared access to a Tokio runtime:
59///
60///  * Using an <code>[Arc]\<Runtime></code>.
61///  * Using a [`Handle`].
62///  * Entering the runtime context.
63///
64/// Using an <code>[Arc]\<Runtime></code> or [`Handle`] allows you to do various
65/// things with the runtime such as spawning new tasks or entering the runtime
66/// context. Both types can be cloned to create a new handle that allows access
67/// to the same runtime. By passing clones into different tasks or threads, you
68/// will be able to access the runtime from those tasks or threads.
69///
70/// The difference between <code>[Arc]\<Runtime></code> and [`Handle`] is that
71/// an <code>[Arc]\<Runtime></code> will prevent the runtime from shutting down,
72/// whereas a [`Handle`] does not prevent that. This is because shutdown of the
73/// runtime happens when the destructor of the `Runtime` object runs.
74///
75/// Calls to [`shutdown_background`] and [`shutdown_timeout`] require exclusive
76/// ownership of the `Runtime` type. When using an <code>[Arc]\<Runtime></code>,
77/// this can be achieved via [`Arc::try_unwrap`] when only one strong count
78/// reference is left over.
79///
80/// The runtime context is entered using the [`Runtime::enter`] or
81/// [`Handle::enter`] methods, which use a thread-local variable to store the
82/// current runtime. Whenever you are inside the runtime context, methods such
83/// as [`tokio::spawn`] will use the runtime whose context you are inside.
84///
85/// [timer]: crate::time
86/// [mod]: index.html
87/// [`new`]: method@Self::new
88/// [`Builder`]: struct@Builder
89/// [`Handle`]: struct@Handle
90/// [main]: macro@crate::main
91/// [`tokio::spawn`]: crate::spawn
92/// [`Arc::try_unwrap`]: std::sync::Arc::try_unwrap
93/// [Arc]: std::sync::Arc
94/// [`shutdown_background`]: method@Runtime::shutdown_background
95/// [`shutdown_timeout`]: method@Runtime::shutdown_timeout
96#[derive(Debug)]
97pub struct Runtime {
98    /// Task scheduler
99    scheduler: Scheduler,
100
101    /// Handle to runtime, also contains driver handles
102    handle: Handle,
103
104    /// Blocking pool handle, used to signal shutdown
105    blocking_pool: BlockingPool,
106}
107
108/// The flavor of a `Runtime`.
109///
110/// This is the return type for [`Handle::runtime_flavor`](crate::runtime::Handle::runtime_flavor()).
111#[derive(Debug, PartialEq, Eq)]
112#[non_exhaustive]
113pub enum RuntimeFlavor {
114    /// The flavor that executes all tasks on the current thread.
115    CurrentThread,
116    /// The flavor that executes tasks across multiple threads.
117    MultiThread,
118}
119
120/// The runtime scheduler is either a multi-thread or a current-thread executor.
121#[derive(Debug)]
122pub(super) enum Scheduler {
123    /// Execute all tasks on the current-thread.
124    CurrentThread(CurrentThread),
125
126    /// Execute tasks across multiple threads.
127    #[cfg(feature = "rt-multi-thread")]
128    MultiThread(MultiThread),
129}
130
131impl Runtime {
132    pub(super) fn from_parts(
133        scheduler: Scheduler,
134        handle: Handle,
135        blocking_pool: BlockingPool,
136    ) -> Runtime {
137        Runtime {
138            scheduler,
139            handle,
140            blocking_pool,
141        }
142    }
143
144    /// Creates a new runtime instance with default configuration values.
145    ///
146    /// This results in the multi threaded scheduler, I/O driver, and time driver being
147    /// initialized.
148    ///
149    /// Most applications will not need to call this function directly. Instead,
150    /// they will use the  [`#[tokio::main]` attribute][main]. When a more complex
151    /// configuration is necessary, the [runtime builder] may be used.
152    ///
153    /// See [module level][mod] documentation for more details.
154    ///
155    /// # Examples
156    ///
157    /// Creating a new `Runtime` with default configuration values.
158    ///
159    /// ```
160    /// use tokio::runtime::Runtime;
161    ///
162    /// let rt = Runtime::new()
163    ///     .unwrap();
164    ///
165    /// // Use the runtime...
166    /// ```
167    ///
168    /// [mod]: index.html
169    /// [main]: ../attr.main.html
170    /// [threaded scheduler]: index.html#threaded-scheduler
171    /// [runtime builder]: crate::runtime::Builder
172    #[cfg(feature = "rt-multi-thread")]
173    #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
174    pub fn new() -> std::io::Result<Runtime> {
175        Builder::new_multi_thread().enable_all().build()
176    }
177
178    /// Returns a handle to the runtime's spawner.
179    ///
180    /// The returned handle can be used to spawn tasks that run on this runtime, and can
181    /// be cloned to allow moving the `Handle` to other threads.
182    ///
183    /// Calling [`Handle::block_on`] on a handle to a `current_thread` runtime is error-prone.
184    /// Refer to the documentation of [`Handle::block_on`] for more.
185    ///
186    /// # Examples
187    ///
188    /// ```
189    /// # #[cfg(not(target_family = "wasm"))]
190    /// # {
191    /// use tokio::runtime::Runtime;
192    ///
193    /// let rt = Runtime::new()
194    ///     .unwrap();
195    ///
196    /// let handle = rt.handle();
197    ///
198    /// // Use the handle...
199    /// # }
200    /// ```
201    pub fn handle(&self) -> &Handle {
202        &self.handle
203    }
204
205    /// Spawns a future onto the Tokio runtime.
206    ///
207    /// This spawns the given future onto the runtime's executor, usually a
208    /// thread pool. The thread pool is then responsible for polling the future
209    /// until it completes.
210    ///
211    /// The provided future will start running in the background immediately
212    /// when `spawn` is called, even if you don't await the returned
213    /// `JoinHandle` (assuming that the runtime [is running][running-runtime]).
214    ///
215    /// See [module level][mod] documentation for more details.
216    ///
217    /// [mod]: index.html
218    /// [running-runtime]: index.html#driving-the-runtime
219    ///
220    /// # Examples
221    ///
222    /// ```
223    /// # #[cfg(not(target_family = "wasm"))]
224    /// # {
225    /// use tokio::runtime::Runtime;
226    ///
227    /// # fn dox() {
228    /// // Create the runtime
229    /// let rt = Runtime::new().unwrap();
230    ///
231    /// // Spawn a future onto the runtime
232    /// rt.spawn(async {
233    ///     println!("now running on a worker thread");
234    /// });
235    /// # }
236    /// # }
237    /// ```
238    #[track_caller]
239    pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
240    where
241        F: Future + Send + 'static,
242        F::Output: Send + 'static,
243    {
244        let fut_size = mem::size_of::<F>();
245        if fut_size > BOX_FUTURE_THRESHOLD {
246            self.handle
247                .spawn_named(Box::pin(future), SpawnMeta::new_unnamed(fut_size))
248        } else {
249            self.handle
250                .spawn_named(future, SpawnMeta::new_unnamed(fut_size))
251        }
252    }
253
254    /// Runs the provided function on an executor dedicated to blocking operations.
255    ///
256    /// # Examples
257    ///
258    /// ```
259    /// # #[cfg(not(target_family = "wasm"))]
260    /// # {
261    /// use tokio::runtime::Runtime;
262    ///
263    /// # fn dox() {
264    /// // Create the runtime
265    /// let rt = Runtime::new().unwrap();
266    ///
267    /// // Spawn a blocking function onto the runtime
268    /// rt.spawn_blocking(|| {
269    ///     println!("now running on a worker thread");
270    /// });
271    /// # }
272    /// # }
273    /// ```
274    #[track_caller]
275    pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
276    where
277        F: FnOnce() -> R + Send + 'static,
278        R: Send + 'static,
279    {
280        self.handle.spawn_blocking(func)
281    }
282
283    /// Runs a future to completion on the Tokio runtime. This is the
284    /// runtime's entry point.
285    ///
286    /// This runs the given future on the current thread, blocking until it is
287    /// complete, and yielding its resolved result. Any tasks or timers
288    /// which the future spawns internally will be executed on the runtime.
289    ///
290    /// # Non-worker future
291    ///
292    /// Note that the future required by this function does not run as a
293    /// worker. The expectation is that other tasks are spawned by the future here.
294    /// Awaiting on other futures from the future provided here will not
295    /// perform as fast as those spawned as workers.
296    ///
297    /// # Multi thread scheduler
298    ///
299    /// When the multi thread scheduler is used this will allow futures
300    /// to run within the io driver and timer context of the overall runtime.
301    ///
302    /// Any spawned tasks will continue running after `block_on` returns.
303    ///
304    /// # Current thread scheduler
305    ///
306    /// When the current thread scheduler is enabled `block_on`
307    /// can be called concurrently from multiple threads. The first call
308    /// will take ownership of the io and timer drivers. This means
309    /// other threads which do not own the drivers will hook into that one.
310    /// When the first `block_on` completes, other threads will be able to
311    /// "steal" the driver to allow continued execution of their futures.
312    ///
313    /// Any spawned tasks will be suspended after `block_on` returns. Calling
314    /// `block_on` again will resume previously spawned tasks.
315    ///
316    /// # Panics
317    ///
318    /// This function panics if the provided future panics, or if called within an
319    /// asynchronous execution context.
320    ///
321    /// # Examples
322    ///
323    /// ```no_run
324    /// # #[cfg(not(target_family = "wasm"))]
325    /// # {
326    /// use tokio::runtime::Runtime;
327    ///
328    /// // Create the runtime
329    /// let rt  = Runtime::new().unwrap();
330    ///
331    /// // Execute the future, blocking the current thread until completion
332    /// rt.block_on(async {
333    ///     println!("hello");
334    /// });
335    /// # }
336    /// ```
337    ///
338    /// [handle]: fn@Handle::block_on
339    #[track_caller]
340    pub fn block_on<F: Future>(&self, future: F) -> F::Output {
341        let fut_size = mem::size_of::<F>();
342        if fut_size > BOX_FUTURE_THRESHOLD {
343            self.block_on_inner(Box::pin(future), SpawnMeta::new_unnamed(fut_size))
344        } else {
345            self.block_on_inner(future, SpawnMeta::new_unnamed(fut_size))
346        }
347    }
348
349    #[track_caller]
350    fn block_on_inner<F: Future>(&self, future: F, _meta: SpawnMeta<'_>) -> F::Output {
351        #[cfg(all(
352            tokio_unstable,
353            feature = "taskdump",
354            feature = "rt",
355            target_os = "linux",
356            any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
357        ))]
358        let future = super::task::trace::Trace::root(future);
359
360        #[cfg(all(tokio_unstable, feature = "tracing"))]
361        let future = crate::util::trace::task(
362            future,
363            "block_on",
364            _meta,
365            crate::runtime::task::Id::next().as_u64(),
366        );
367
368        let _enter = self.enter();
369
370        match &self.scheduler {
371            Scheduler::CurrentThread(exec) => exec.block_on(&self.handle.inner, future),
372            #[cfg(feature = "rt-multi-thread")]
373            Scheduler::MultiThread(exec) => exec.block_on(&self.handle.inner, future),
374        }
375    }
376
377    /// Enters the runtime context.
378    ///
379    /// This allows you to construct types that must have an executor
380    /// available on creation such as [`Sleep`] or [`TcpStream`]. It will
381    /// also allow you to call methods such as [`tokio::spawn`].
382    ///
383    /// [`Sleep`]: struct@crate::time::Sleep
384    /// [`TcpStream`]: struct@crate::net::TcpStream
385    /// [`tokio::spawn`]: fn@crate::spawn
386    ///
387    /// # Example
388    ///
389    /// ```
390    /// # #[cfg(not(target_family = "wasm"))]
391    /// # {
392    /// use tokio::runtime::Runtime;
393    /// use tokio::task::JoinHandle;
394    ///
395    /// fn function_that_spawns(msg: String) -> JoinHandle<()> {
396    ///     // Had we not used `rt.enter` below, this would panic.
397    ///     tokio::spawn(async move {
398    ///         println!("{}", msg);
399    ///     })
400    /// }
401    ///
402    /// fn main() {
403    ///     let rt = Runtime::new().unwrap();
404    ///
405    ///     let s = "Hello World!".to_string();
406    ///
407    ///     // By entering the context, we tie `tokio::spawn` to this executor.
408    ///     let _guard = rt.enter();
409    ///     let handle = function_that_spawns(s);
410    ///
411    ///     // Wait for the task before we end the test.
412    ///     rt.block_on(handle).unwrap();
413    /// }
414    /// # }
415    /// ```
416    pub fn enter(&self) -> EnterGuard<'_> {
417        self.handle.enter()
418    }
419
420    /// Shuts down the runtime, waiting for at most `duration` for all spawned
421    /// work to stop.
422    ///
423    /// See the [struct level documentation](Runtime#shutdown) for more details.
424    ///
425    /// # Examples
426    ///
427    /// ```
428    /// # #[cfg(not(target_family = "wasm"))]
429    /// # {
430    /// use tokio::runtime::Runtime;
431    /// use tokio::task;
432    ///
433    /// use std::thread;
434    /// use std::time::Duration;
435    ///
436    /// fn main() {
437    ///    let runtime = Runtime::new().unwrap();
438    ///
439    ///    runtime.block_on(async move {
440    ///        task::spawn_blocking(move || {
441    ///            thread::sleep(Duration::from_secs(10_000));
442    ///        });
443    ///    });
444    ///
445    ///    runtime.shutdown_timeout(Duration::from_millis(100));
446    /// }
447    /// # }
448    /// ```
449    pub fn shutdown_timeout(mut self, duration: Duration) {
450        // Wakeup and shutdown all the worker threads
451        self.handle.inner.shutdown();
452        self.blocking_pool.shutdown(Some(duration));
453    }
454
455    /// Shuts down the runtime, without waiting for any spawned work to stop.
456    ///
457    /// This can be useful if you want to drop a runtime from within another runtime.
458    /// Normally, dropping a runtime will block indefinitely for spawned blocking tasks
459    /// to complete, which would normally not be permitted within an asynchronous context.
460    /// By calling `shutdown_background()`, you can drop the runtime from such a context.
461    ///
462    /// Note however, that because we do not wait for any blocking tasks to complete, this
463    /// may result in a resource leak (in that any blocking tasks are still running until they
464    /// return.
465    ///
466    /// See the [struct level documentation](Runtime#shutdown) for more details.
467    ///
468    /// This function is equivalent to calling `shutdown_timeout(Duration::from_nanos(0))`.
469    ///
470    /// ```
471    /// # #[cfg(not(target_family = "wasm"))]
472    /// # {
473    /// use tokio::runtime::Runtime;
474    ///
475    /// fn main() {
476    ///    let runtime = Runtime::new().unwrap();
477    ///
478    ///    runtime.block_on(async move {
479    ///        let inner_runtime = Runtime::new().unwrap();
480    ///        // ...
481    ///        inner_runtime.shutdown_background();
482    ///    });
483    /// }
484    /// # }
485    /// ```
486    pub fn shutdown_background(self) {
487        self.shutdown_timeout(Duration::from_nanos(0));
488    }
489
490    /// Returns a view that lets you get information about how the runtime
491    /// is performing.
492    pub fn metrics(&self) -> crate::runtime::RuntimeMetrics {
493        self.handle.metrics()
494    }
495}
496
497impl Drop for Runtime {
498    fn drop(&mut self) {
499        match &mut self.scheduler {
500            Scheduler::CurrentThread(current_thread) => {
501                // This ensures that tasks spawned on the current-thread
502                // runtime are dropped inside the runtime's context.
503                let _guard = context::try_set_current(&self.handle.inner);
504                current_thread.shutdown(&self.handle.inner);
505            }
506            #[cfg(feature = "rt-multi-thread")]
507            Scheduler::MultiThread(multi_thread) => {
508                // The threaded scheduler drops its tasks on its worker threads, which is
509                // already in the runtime's context.
510                multi_thread.shutdown(&self.handle.inner);
511            }
512        }
513    }
514}
515
516impl std::panic::UnwindSafe for Runtime {}
517
518impl std::panic::RefUnwindSafe for Runtime {}
519
520fn display_eq(d: impl std::fmt::Display, s: &str) -> bool {
521    use std::fmt::Write;
522
523    struct FormatEq<'r> {
524        remainder: &'r str,
525        unequal: bool,
526    }
527
528    impl<'r> Write for FormatEq<'r> {
529        fn write_str(&mut self, s: &str) -> std::fmt::Result {
530            if !self.unequal {
531                if let Some(new_remainder) = self.remainder.strip_prefix(s) {
532                    self.remainder = new_remainder;
533                } else {
534                    self.unequal = true;
535                }
536            }
537            Ok(())
538        }
539    }
540
541    let mut fmt_eq = FormatEq {
542        remainder: s,
543        unequal: false,
544    };
545    let _ = write!(fmt_eq, "{d}");
546    fmt_eq.remainder.is_empty() && !fmt_eq.unequal
547}
548
549/// Checks whether the given error was emitted by Tokio when shutting down its runtime.
550///
551/// # Examples
552///
553/// ```
554/// # #[cfg(not(target_family = "wasm"))]
555/// # {
556/// use tokio::runtime::Runtime;
557/// use tokio::net::TcpListener;
558///
559/// fn main() {
560///     let rt1 = Runtime::new().unwrap();
561///     let rt2 = Runtime::new().unwrap();
562///
563///     let listener = rt1.block_on(async {
564///         TcpListener::bind("127.0.0.1:0").await.unwrap()
565///     });
566///
567///     drop(rt1);
568///
569///     rt2.block_on(async {
570///         let res = listener.accept().await;
571///         assert!(res.is_err());
572///         assert!(tokio::runtime::is_rt_shutdown_err(res.as_ref().unwrap_err()));
573///     });
574/// }
575/// # }
576/// ```
577pub fn is_rt_shutdown_err(err: &io::Error) -> bool {
578    if let Some(inner) = err.get_ref() {
579        err.kind() == io::ErrorKind::Other
580            && inner.source().is_none()
581            && display_eq(inner, RUNTIME_SHUTTING_DOWN_ERROR)
582    } else {
583        false
584    }
585}