Skip to main content

tokio/runtime/
builder.rs

1#![cfg_attr(loom, allow(unused_imports))]
2
3use crate::runtime::handle::Handle;
4use crate::runtime::{
5    blocking, driver, Callback, HistogramBuilder, Runtime, TaskCallback, TimerFlavor,
6};
7#[cfg(tokio_unstable)]
8use crate::runtime::{metrics::HistogramConfiguration, LocalOptions, LocalRuntime, TaskMeta};
9use crate::util::rand::{RngSeed, RngSeedGenerator};
10
11use crate::runtime::blocking::BlockingPool;
12use crate::runtime::scheduler::CurrentThread;
13use std::fmt;
14use std::io;
15use std::thread::ThreadId;
16use std::time::Duration;
17
18/// Builds Tokio Runtime with custom configuration values.
19///
20/// Methods can be chained in order to set the configuration values. The
21/// Runtime is constructed by calling [`build`].
22///
23/// New instances of `Builder` are obtained via [`Builder::new_multi_thread`]
24/// or [`Builder::new_current_thread`].
25///
26/// See function level documentation for details on the various configuration
27/// settings.
28///
29/// [`build`]: method@Self::build
30/// [`Builder::new_multi_thread`]: method@Self::new_multi_thread
31/// [`Builder::new_current_thread`]: method@Self::new_current_thread
32///
33/// # Examples
34///
35/// ```
36/// # #[cfg(not(target_family = "wasm"))]
37/// # {
38/// use tokio::runtime::Builder;
39///
40/// fn main() {
41///     // build runtime
42///     let runtime = Builder::new_multi_thread()
43///         .worker_threads(4)
44///         .thread_name("my-custom-name")
45///         .thread_stack_size(3 * 1024 * 1024)
46///         .build()
47///         .unwrap();
48///
49///     // use runtime ...
50/// }
51/// # }
52/// ```
53pub struct Builder {
54    /// Runtime type
55    kind: Kind,
56
57    /// Whether or not to enable the I/O driver
58    enable_io: bool,
59    nevents: usize,
60
61    /// Whether or not to enable the time driver
62    enable_time: bool,
63
64    /// Whether or not the clock should start paused.
65    start_paused: bool,
66
67    /// The number of worker threads, used by Runtime.
68    ///
69    /// Only used when not using the current-thread executor.
70    worker_threads: Option<usize>,
71
72    /// Cap on thread usage.
73    max_blocking_threads: usize,
74
75    /// Name fn used for threads spawned by the runtime.
76    pub(super) thread_name: ThreadNameFn,
77
78    /// Stack size used for threads spawned by the runtime.
79    pub(super) thread_stack_size: Option<usize>,
80
81    /// Callback to run after each thread starts.
82    pub(super) after_start: Option<Callback>,
83
84    /// To run before each worker thread stops
85    pub(super) before_stop: Option<Callback>,
86
87    /// To run before each worker thread is parked.
88    pub(super) before_park: Option<Callback>,
89
90    /// To run after each thread is unparked.
91    pub(super) after_unpark: Option<Callback>,
92
93    /// To run before each task is spawned.
94    pub(super) before_spawn: Option<TaskCallback>,
95
96    /// To run before each poll
97    #[cfg(tokio_unstable)]
98    pub(super) before_poll: Option<TaskCallback>,
99
100    /// To run after each poll
101    #[cfg(tokio_unstable)]
102    pub(super) after_poll: Option<TaskCallback>,
103
104    /// To run after each task is terminated.
105    pub(super) after_termination: Option<TaskCallback>,
106
107    /// Customizable keep alive timeout for `BlockingPool`
108    pub(super) keep_alive: Option<Duration>,
109
110    /// How many ticks before pulling a task from the global/remote queue?
111    ///
112    /// When `None`, the value is unspecified and behavior details are left to
113    /// the scheduler. Each scheduler flavor could choose to either pick its own
114    /// default value or use some other strategy to decide when to poll from the
115    /// global queue. For example, the multi-threaded scheduler uses a
116    /// self-tuning strategy based on mean task poll times.
117    pub(super) global_queue_interval: Option<u32>,
118
119    /// How many ticks before yielding to the driver for timer and I/O events?
120    pub(super) event_interval: u32,
121
122    /// When true, the multi-threade scheduler LIFO slot should not be used.
123    ///
124    /// This option should only be exposed as unstable.
125    pub(super) disable_lifo_slot: bool,
126
127    /// Specify a random number generator seed to provide deterministic results
128    pub(super) seed_generator: RngSeedGenerator,
129
130    /// When true, enables task poll count histogram instrumentation.
131    pub(super) metrics_poll_count_histogram_enable: bool,
132
133    /// Configures the task poll count histogram
134    pub(super) metrics_poll_count_histogram: HistogramBuilder,
135
136    #[cfg(tokio_unstable)]
137    pub(super) unhandled_panic: UnhandledPanic,
138
139    timer_flavor: TimerFlavor,
140}
141
142cfg_unstable! {
143    /// How the runtime should respond to unhandled panics.
144    ///
145    /// Instances of `UnhandledPanic` are passed to `Builder::unhandled_panic`
146    /// to configure the runtime behavior when a spawned task panics.
147    ///
148    /// See [`Builder::unhandled_panic`] for more details.
149    #[derive(Debug, Clone)]
150    #[non_exhaustive]
151    pub enum UnhandledPanic {
152        /// The runtime should ignore panics on spawned tasks.
153        ///
154        /// The panic is forwarded to the task's [`JoinHandle`] and all spawned
155        /// tasks continue running normally.
156        ///
157        /// This is the default behavior.
158        ///
159        /// # Examples
160        ///
161        /// ```
162        /// # #[cfg(not(target_family = "wasm"))]
163        /// # {
164        /// use tokio::runtime::{self, UnhandledPanic};
165        ///
166        /// # pub fn main() {
167        /// let rt = runtime::Builder::new_current_thread()
168        ///     .unhandled_panic(UnhandledPanic::Ignore)
169        ///     .build()
170        ///     .unwrap();
171        ///
172        /// let task1 = rt.spawn(async { panic!("boom"); });
173        /// let task2 = rt.spawn(async {
174        ///     // This task completes normally
175        ///     "done"
176        /// });
177        ///
178        /// rt.block_on(async {
179        ///     // The panic on the first task is forwarded to the `JoinHandle`
180        ///     assert!(task1.await.is_err());
181        ///
182        ///     // The second task completes normally
183        ///     assert!(task2.await.is_ok());
184        /// })
185        /// # }
186        /// # }
187        /// ```
188        ///
189        /// [`JoinHandle`]: struct@crate::task::JoinHandle
190        Ignore,
191
192        /// The runtime should immediately shutdown if a spawned task panics.
193        ///
194        /// The runtime will immediately shutdown even if the panicked task's
195        /// [`JoinHandle`] is still available. All further spawned tasks will be
196        /// immediately dropped and call to [`Runtime::block_on`] will panic.
197        ///
198        /// # Examples
199        ///
200        /// ```should_panic
201        /// use tokio::runtime::{self, UnhandledPanic};
202        ///
203        /// # pub fn main() {
204        /// let rt = runtime::Builder::new_current_thread()
205        ///     .unhandled_panic(UnhandledPanic::ShutdownRuntime)
206        ///     .build()
207        ///     .unwrap();
208        ///
209        /// rt.spawn(async { panic!("boom"); });
210        /// rt.spawn(async {
211        ///     // This task never completes.
212        /// });
213        ///
214        /// rt.block_on(async {
215        ///     // Do some work
216        /// # loop { tokio::task::yield_now().await; }
217        /// })
218        /// # }
219        /// ```
220        ///
221        /// [`JoinHandle`]: struct@crate::task::JoinHandle
222        ShutdownRuntime,
223    }
224}
225
226pub(crate) type ThreadNameFn = std::sync::Arc<dyn Fn() -> String + Send + Sync + 'static>;
227
228#[derive(Clone, Copy)]
229pub(crate) enum Kind {
230    CurrentThread,
231    #[cfg(feature = "rt-multi-thread")]
232    MultiThread,
233}
234
235impl Builder {
236    /// Returns a new builder with the current thread scheduler selected.
237    ///
238    /// Configuration methods can be chained on the return value.
239    ///
240    /// To spawn non-`Send` tasks on the resulting runtime, combine it with a
241    /// [`LocalSet`], or call [`build_local`] to create a [`LocalRuntime`] (unstable).
242    ///
243    /// [`LocalSet`]: crate::task::LocalSet
244    /// [`LocalRuntime`]: crate::runtime::LocalRuntime
245    /// [`build_local`]: crate::runtime::Builder::build_local
246    pub fn new_current_thread() -> Builder {
247        #[cfg(loom)]
248        const EVENT_INTERVAL: u32 = 4;
249        // The number `61` is fairly arbitrary. I believe this value was copied from golang.
250        #[cfg(not(loom))]
251        const EVENT_INTERVAL: u32 = 61;
252
253        Builder::new(Kind::CurrentThread, EVENT_INTERVAL)
254    }
255
256    /// Returns a new builder with the multi thread scheduler selected.
257    ///
258    /// Configuration methods can be chained on the return value.
259    #[cfg(feature = "rt-multi-thread")]
260    #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
261    pub fn new_multi_thread() -> Builder {
262        // The number `61` is fairly arbitrary. I believe this value was copied from golang.
263        Builder::new(Kind::MultiThread, 61)
264    }
265
266    /// Returns a new runtime builder initialized with default configuration
267    /// values.
268    ///
269    /// Configuration methods can be chained on the return value.
270    pub(crate) fn new(kind: Kind, event_interval: u32) -> Builder {
271        Builder {
272            kind,
273
274            // I/O defaults to "off"
275            enable_io: false,
276            nevents: 1024,
277
278            // Time defaults to "off"
279            enable_time: false,
280
281            // The clock starts not-paused
282            start_paused: false,
283
284            // Read from environment variable first in multi-threaded mode.
285            // Default to lazy auto-detection (one thread per CPU core)
286            worker_threads: None,
287
288            max_blocking_threads: 512,
289
290            // Default thread name
291            thread_name: std::sync::Arc::new(|| "tokio-rt-worker".into()),
292
293            // Do not set a stack size by default
294            thread_stack_size: None,
295
296            // No worker thread callbacks
297            after_start: None,
298            before_stop: None,
299            before_park: None,
300            after_unpark: None,
301
302            before_spawn: None,
303            after_termination: None,
304
305            #[cfg(tokio_unstable)]
306            before_poll: None,
307            #[cfg(tokio_unstable)]
308            after_poll: None,
309
310            keep_alive: None,
311
312            // Defaults for these values depend on the scheduler kind, so we get them
313            // as parameters.
314            global_queue_interval: None,
315            event_interval,
316
317            seed_generator: RngSeedGenerator::new(RngSeed::new()),
318
319            #[cfg(tokio_unstable)]
320            unhandled_panic: UnhandledPanic::Ignore,
321
322            metrics_poll_count_histogram_enable: false,
323
324            metrics_poll_count_histogram: HistogramBuilder::default(),
325
326            disable_lifo_slot: false,
327
328            timer_flavor: TimerFlavor::Traditional,
329        }
330    }
331
332    /// Enables both I/O and time drivers.
333    ///
334    /// Doing this is a shorthand for calling `enable_io` and `enable_time`
335    /// individually. If additional components are added to Tokio in the future,
336    /// `enable_all` will include these future components.
337    ///
338    /// # Examples
339    ///
340    /// ```
341    /// # #[cfg(not(target_family = "wasm"))]
342    /// # {
343    /// use tokio::runtime;
344    ///
345    /// let rt = runtime::Builder::new_multi_thread()
346    ///     .enable_all()
347    ///     .build()
348    ///     .unwrap();
349    /// # }
350    /// ```
351    pub fn enable_all(&mut self) -> &mut Self {
352        #[cfg(any(
353            feature = "net",
354            all(unix, feature = "process"),
355            all(unix, feature = "signal")
356        ))]
357        self.enable_io();
358
359        #[cfg(all(
360            tokio_unstable,
361            feature = "io-uring",
362            feature = "rt",
363            feature = "fs",
364            target_os = "linux",
365        ))]
366        self.enable_io_uring();
367
368        #[cfg(feature = "time")]
369        self.enable_time();
370
371        self
372    }
373
374    /// Enables the alternative timer implementation, which is disabled by default.
375    ///
376    /// The alternative timer implementation is an unstable feature that may
377    /// provide better performance on multi-threaded runtimes with a large number
378    /// of worker threads.
379    ///
380    /// This option only applies to multi-threaded runtimes. Attempting to use
381    /// this option with any other runtime type will have no effect.
382    ///
383    /// [Click here to share your experience with the alternative timer](https://github.com/tokio-rs/tokio/issues/7745)
384    ///
385    /// # Examples
386    ///
387    /// ```
388    /// # #[cfg(not(target_family = "wasm"))]
389    /// # {
390    /// use tokio::runtime;
391    ///
392    /// let rt = runtime::Builder::new_multi_thread()
393    ///   .enable_alt_timer()
394    ///   .build()
395    ///   .unwrap();
396    /// # }
397    /// ```
398    #[cfg(all(tokio_unstable, feature = "time", feature = "rt-multi-thread"))]
399    #[cfg_attr(
400        docsrs,
401        doc(cfg(all(tokio_unstable, feature = "time", feature = "rt-multi-thread")))
402    )]
403    pub fn enable_alt_timer(&mut self) -> &mut Self {
404        self.enable_time();
405        self.timer_flavor = TimerFlavor::Alternative;
406        self
407    }
408
409    /// Sets the number of worker threads the `Runtime` will use.
410    ///
411    /// This can be any number above 0 though it is advised to keep this value
412    /// on the smaller side.
413    ///
414    /// This will override the value read from environment variable `TOKIO_WORKER_THREADS`.
415    ///
416    /// # Default
417    ///
418    /// The default value is the number of cores available to the system.
419    ///
420    /// When using the `current_thread` runtime this method has no effect.
421    ///
422    /// # Examples
423    ///
424    /// ## Multi threaded runtime with 4 threads
425    ///
426    /// ```
427    /// # #[cfg(not(target_family = "wasm"))]
428    /// # {
429    /// use tokio::runtime;
430    ///
431    /// // This will spawn a work-stealing runtime with 4 worker threads.
432    /// let rt = runtime::Builder::new_multi_thread()
433    ///     .worker_threads(4)
434    ///     .build()
435    ///     .unwrap();
436    ///
437    /// rt.spawn(async move {});
438    /// # }
439    /// ```
440    ///
441    /// ## Current thread runtime (will only run on the current thread via `Runtime::block_on`)
442    ///
443    /// ```
444    /// use tokio::runtime;
445    ///
446    /// // Create a runtime that _must_ be driven from a call
447    /// // to `Runtime::block_on`.
448    /// let rt = runtime::Builder::new_current_thread()
449    ///     .build()
450    ///     .unwrap();
451    ///
452    /// // This will run the runtime and future on the current thread
453    /// rt.block_on(async move {});
454    /// ```
455    ///
456    /// # Panics
457    ///
458    /// This will panic if `val` is not larger than `0`.
459    #[track_caller]
460    pub fn worker_threads(&mut self, val: usize) -> &mut Self {
461        assert!(val > 0, "Worker threads cannot be set to 0");
462        self.worker_threads = Some(val);
463        self
464    }
465
466    /// Specifies the limit for additional threads spawned by the Runtime.
467    ///
468    /// These threads are used for blocking operations like tasks spawned
469    /// through [`spawn_blocking`], this includes but is not limited to:
470    /// - [`fs`] operations
471    /// - dns resolution through [`ToSocketAddrs`]
472    /// - writing to [`Stdout`] or [`Stderr`]
473    /// - reading from [`Stdin`]
474    ///
475    /// Unlike the [`worker_threads`], they are not always active and will exit
476    /// if left idle for too long. You can change this timeout duration with [`thread_keep_alive`].
477    ///
478    /// It's recommended to not set this limit too low in order to avoid hanging on operations
479    /// requiring [`spawn_blocking`].
480    ///
481    /// The default value is 512.
482    ///
483    /// # Queue Behavior
484    ///
485    /// When a blocking task is submitted, it will be inserted into a queue. If available, one of
486    /// the idle threads will be notified to run the task. Otherwise, if the threshold set by this
487    /// method has not been reached, a new thread will be spawned. If no idle thread is available
488    /// and no more threads are allowed to be spawned, the task will remain in the queue until one
489    /// of the busy threads pick it up. Note that since the queue does not apply any backpressure,
490    /// it could potentially grow unbounded.
491    ///
492    /// # Panics
493    ///
494    /// This will panic if `val` is not larger than `0`.
495    ///
496    /// # Upgrading from 0.x
497    ///
498    /// In old versions `max_threads` limited both blocking and worker threads, but the
499    /// current `max_blocking_threads` does not include async worker threads in the count.
500    ///
501    /// [`spawn_blocking`]: fn@crate::task::spawn_blocking
502    /// [`fs`]: mod@crate::fs
503    /// [`ToSocketAddrs`]: trait@crate::net::ToSocketAddrs
504    /// [`Stdout`]: struct@crate::io::Stdout
505    /// [`Stdin`]: struct@crate::io::Stdin
506    /// [`Stderr`]: struct@crate::io::Stderr
507    /// [`worker_threads`]: Self::worker_threads
508    /// [`thread_keep_alive`]: Self::thread_keep_alive
509    #[track_caller]
510    #[cfg_attr(docsrs, doc(alias = "max_threads"))]
511    pub fn max_blocking_threads(&mut self, val: usize) -> &mut Self {
512        assert!(val > 0, "Max blocking threads cannot be set to 0");
513        self.max_blocking_threads = val;
514        self
515    }
516
517    /// Sets name of threads spawned by the `Runtime`'s thread pool.
518    ///
519    /// The default name is "tokio-runtime-worker".
520    ///
521    /// # Examples
522    ///
523    /// ```
524    /// # #[cfg(not(target_family = "wasm"))]
525    /// # {
526    /// # use tokio::runtime;
527    ///
528    /// # pub fn main() {
529    /// let rt = runtime::Builder::new_multi_thread()
530    ///     .thread_name("my-pool")
531    ///     .build();
532    /// # }
533    /// # }
534    /// ```
535    pub fn thread_name(&mut self, val: impl Into<String>) -> &mut Self {
536        let val = val.into();
537        self.thread_name = std::sync::Arc::new(move || val.clone());
538        self
539    }
540
541    /// Sets a function used to generate the name of threads spawned by the `Runtime`'s thread pool.
542    ///
543    /// The default name fn is `|| "tokio-runtime-worker".into()`.
544    ///
545    /// # Examples
546    ///
547    /// ```
548    /// # #[cfg(not(target_family = "wasm"))]
549    /// # {
550    /// # use tokio::runtime;
551    /// # use std::sync::atomic::{AtomicUsize, Ordering};
552    /// # pub fn main() {
553    /// let rt = runtime::Builder::new_multi_thread()
554    ///     .thread_name_fn(|| {
555    ///        static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
556    ///        let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
557    ///        format!("my-pool-{}", id)
558    ///     })
559    ///     .build();
560    /// # }
561    /// # }
562    /// ```
563    pub fn thread_name_fn<F>(&mut self, f: F) -> &mut Self
564    where
565        F: Fn() -> String + Send + Sync + 'static,
566    {
567        self.thread_name = std::sync::Arc::new(f);
568        self
569    }
570
571    /// Sets the stack size (in bytes) for worker threads.
572    ///
573    /// The actual stack size may be greater than this value if the platform
574    /// specifies minimal stack size.
575    ///
576    /// The default stack size for spawned threads is 2 MiB, though this
577    /// particular stack size is subject to change in the future.
578    ///
579    /// # Examples
580    ///
581    /// ```
582    /// # #[cfg(not(target_family = "wasm"))]
583    /// # {
584    /// # use tokio::runtime;
585    ///
586    /// # pub fn main() {
587    /// let rt = runtime::Builder::new_multi_thread()
588    ///     .thread_stack_size(32 * 1024)
589    ///     .build();
590    /// # }
591    /// # }
592    /// ```
593    pub fn thread_stack_size(&mut self, val: usize) -> &mut Self {
594        self.thread_stack_size = Some(val);
595        self
596    }
597
598    /// Executes function `f` after each thread is started but before it starts
599    /// doing work.
600    ///
601    /// This is intended for bookkeeping and monitoring use cases.
602    ///
603    /// # Examples
604    ///
605    /// ```
606    /// # #[cfg(not(target_family = "wasm"))]
607    /// # {
608    /// # use tokio::runtime;
609    /// # pub fn main() {
610    /// let runtime = runtime::Builder::new_multi_thread()
611    ///     .on_thread_start(|| {
612    ///         println!("thread started");
613    ///     })
614    ///     .build();
615    /// # }
616    /// # }
617    /// ```
618    #[cfg(not(loom))]
619    pub fn on_thread_start<F>(&mut self, f: F) -> &mut Self
620    where
621        F: Fn() + Send + Sync + 'static,
622    {
623        self.after_start = Some(std::sync::Arc::new(f));
624        self
625    }
626
627    /// Executes function `f` before each thread stops.
628    ///
629    /// This is intended for bookkeeping and monitoring use cases.
630    ///
631    /// # Examples
632    ///
633    /// ```
634    /// # #[cfg(not(target_family = "wasm"))]
635    /// {
636    /// # use tokio::runtime;
637    /// # pub fn main() {
638    /// let runtime = runtime::Builder::new_multi_thread()
639    ///     .on_thread_stop(|| {
640    ///         println!("thread stopping");
641    ///     })
642    ///     .build();
643    /// # }
644    /// # }
645    /// ```
646    #[cfg(not(loom))]
647    pub fn on_thread_stop<F>(&mut self, f: F) -> &mut Self
648    where
649        F: Fn() + Send + Sync + 'static,
650    {
651        self.before_stop = Some(std::sync::Arc::new(f));
652        self
653    }
654
655    /// Executes function `f` just before a thread is parked (goes idle).
656    /// `f` is called within the Tokio context, so functions like [`tokio::spawn`](crate::spawn)
657    /// can be called, and may result in this thread being unparked immediately.
658    ///
659    /// This can be used to start work only when the executor is idle, or for bookkeeping
660    /// and monitoring purposes.
661    ///
662    /// Note: There can only be one park callback for a runtime; calling this function
663    /// more than once replaces the last callback defined, rather than adding to it.
664    ///
665    /// # Examples
666    ///
667    /// ## Multithreaded executor
668    /// ```
669    /// # #[cfg(not(target_family = "wasm"))]
670    /// # {
671    /// # use std::sync::Arc;
672    /// # use std::sync::atomic::{AtomicBool, Ordering};
673    /// # use tokio::runtime;
674    /// # use tokio::sync::Barrier;
675    /// # pub fn main() {
676    /// let once = AtomicBool::new(true);
677    /// let barrier = Arc::new(Barrier::new(2));
678    ///
679    /// let runtime = runtime::Builder::new_multi_thread()
680    ///     .worker_threads(1)
681    ///     .on_thread_park({
682    ///         let barrier = barrier.clone();
683    ///         move || {
684    ///             let barrier = barrier.clone();
685    ///             if once.swap(false, Ordering::Relaxed) {
686    ///                 tokio::spawn(async move { barrier.wait().await; });
687    ///            }
688    ///         }
689    ///     })
690    ///     .build()
691    ///     .unwrap();
692    ///
693    /// runtime.block_on(async {
694    ///    barrier.wait().await;
695    /// })
696    /// # }
697    /// # }
698    /// ```
699    /// ## Current thread executor
700    /// ```
701    /// # use std::sync::Arc;
702    /// # use std::sync::atomic::{AtomicBool, Ordering};
703    /// # use tokio::runtime;
704    /// # use tokio::sync::Barrier;
705    /// # pub fn main() {
706    /// let once = AtomicBool::new(true);
707    /// let barrier = Arc::new(Barrier::new(2));
708    ///
709    /// let runtime = runtime::Builder::new_current_thread()
710    ///     .on_thread_park({
711    ///         let barrier = barrier.clone();
712    ///         move || {
713    ///             let barrier = barrier.clone();
714    ///             if once.swap(false, Ordering::Relaxed) {
715    ///                 tokio::spawn(async move { barrier.wait().await; });
716    ///            }
717    ///         }
718    ///     })
719    ///     .build()
720    ///     .unwrap();
721    ///
722    /// runtime.block_on(async {
723    ///    barrier.wait().await;
724    /// })
725    /// # }
726    /// ```
727    #[cfg(not(loom))]
728    pub fn on_thread_park<F>(&mut self, f: F) -> &mut Self
729    where
730        F: Fn() + Send + Sync + 'static,
731    {
732        self.before_park = Some(std::sync::Arc::new(f));
733        self
734    }
735
736    /// Executes function `f` just after a thread unparks (starts executing tasks).
737    ///
738    /// This is intended for bookkeeping and monitoring use cases; note that work
739    /// in this callback will increase latencies when the application has allowed one or
740    /// more runtime threads to go idle.
741    ///
742    /// Note: There can only be one unpark callback for a runtime; calling this function
743    /// more than once replaces the last callback defined, rather than adding to it.
744    ///
745    /// # Examples
746    ///
747    /// ```
748    /// # #[cfg(not(target_family = "wasm"))]
749    /// # {
750    /// # use tokio::runtime;
751    /// # pub fn main() {
752    /// let runtime = runtime::Builder::new_multi_thread()
753    ///     .on_thread_unpark(|| {
754    ///         println!("thread unparking");
755    ///     })
756    ///     .build();
757    ///
758    /// runtime.unwrap().block_on(async {
759    ///    tokio::task::yield_now().await;
760    ///    println!("Hello from Tokio!");
761    /// })
762    /// # }
763    /// # }
764    /// ```
765    #[cfg(not(loom))]
766    pub fn on_thread_unpark<F>(&mut self, f: F) -> &mut Self
767    where
768        F: Fn() + Send + Sync + 'static,
769    {
770        self.after_unpark = Some(std::sync::Arc::new(f));
771        self
772    }
773
774    /// Executes function `f` just before a task is spawned.
775    ///
776    /// `f` is called within the Tokio context, so functions like
777    /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being
778    /// invoked immediately.
779    ///
780    /// This can be used for bookkeeping or monitoring purposes.
781    ///
782    /// Note: There can only be one spawn callback for a runtime; calling this function more
783    /// than once replaces the last callback defined, rather than adding to it.
784    ///
785    /// This *does not* support [`LocalSet`](crate::task::LocalSet) at this time.
786    ///
787    /// **Note**: This is an [unstable API][unstable]. The public API of this type
788    /// may break in 1.x releases. See [the documentation on unstable
789    /// features][unstable] for details.
790    ///
791    /// [unstable]: crate#unstable-features
792    ///
793    /// # Examples
794    ///
795    /// ```
796    /// # use tokio::runtime;
797    /// # pub fn main() {
798    /// let runtime = runtime::Builder::new_current_thread()
799    ///     .on_task_spawn(|_| {
800    ///         println!("spawning task");
801    ///     })
802    ///     .build()
803    ///     .unwrap();
804    ///
805    /// runtime.block_on(async {
806    ///     tokio::task::spawn(std::future::ready(()));
807    ///
808    ///     for _ in 0..64 {
809    ///         tokio::task::yield_now().await;
810    ///     }
811    /// })
812    /// # }
813    /// ```
814    #[cfg(all(not(loom), tokio_unstable))]
815    #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
816    pub fn on_task_spawn<F>(&mut self, f: F) -> &mut Self
817    where
818        F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
819    {
820        self.before_spawn = Some(std::sync::Arc::new(f));
821        self
822    }
823
824    /// Executes function `f` just before a task is polled
825    ///
826    /// `f` is called within the Tokio context, so functions like
827    /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being
828    /// invoked immediately.
829    ///
830    /// **Note**: This is an [unstable API][unstable]. The public API of this type
831    /// may break in 1.x releases. See [the documentation on unstable
832    /// features][unstable] for details.
833    ///
834    /// [unstable]: crate#unstable-features
835    ///
836    /// # Examples
837    ///
838    /// ```
839    /// # #[cfg(not(target_family = "wasm"))]
840    /// # {
841    /// # use std::sync::{atomic::AtomicUsize, Arc};
842    /// # use tokio::task::yield_now;
843    /// # pub fn main() {
844    /// let poll_start_counter = Arc::new(AtomicUsize::new(0));
845    /// let poll_start = poll_start_counter.clone();
846    /// let rt = tokio::runtime::Builder::new_multi_thread()
847    ///     .enable_all()
848    ///     .on_before_task_poll(move |meta| {
849    ///         println!("task {} is about to be polled", meta.id())
850    ///     })
851    ///     .build()
852    ///     .unwrap();
853    /// let task = rt.spawn(async {
854    ///     yield_now().await;
855    /// });
856    /// let _ = rt.block_on(task);
857    ///
858    /// # }
859    /// # }
860    /// ```
861    #[cfg(tokio_unstable)]
862    #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
863    pub fn on_before_task_poll<F>(&mut self, f: F) -> &mut Self
864    where
865        F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
866    {
867        self.before_poll = Some(std::sync::Arc::new(f));
868        self
869    }
870
871    /// Executes function `f` just after a task is polled
872    ///
873    /// `f` is called within the Tokio context, so functions like
874    /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being
875    /// invoked immediately.
876    ///
877    /// **Note**: This is an [unstable API][unstable]. The public API of this type
878    /// may break in 1.x releases. See [the documentation on unstable
879    /// features][unstable] for details.
880    ///
881    /// [unstable]: crate#unstable-features
882    ///
883    /// # Examples
884    ///
885    /// ```
886    /// # #[cfg(not(target_family = "wasm"))]
887    /// # {
888    /// # use std::sync::{atomic::AtomicUsize, Arc};
889    /// # use tokio::task::yield_now;
890    /// # pub fn main() {
891    /// let poll_stop_counter = Arc::new(AtomicUsize::new(0));
892    /// let poll_stop = poll_stop_counter.clone();
893    /// let rt = tokio::runtime::Builder::new_multi_thread()
894    ///     .enable_all()
895    ///     .on_after_task_poll(move |meta| {
896    ///         println!("task {} completed polling", meta.id());
897    ///     })
898    ///     .build()
899    ///     .unwrap();
900    /// let task = rt.spawn(async {
901    ///     yield_now().await;
902    /// });
903    /// let _ = rt.block_on(task);
904    ///
905    /// # }
906    /// # }
907    /// ```
908    #[cfg(tokio_unstable)]
909    #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
910    pub fn on_after_task_poll<F>(&mut self, f: F) -> &mut Self
911    where
912        F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
913    {
914        self.after_poll = Some(std::sync::Arc::new(f));
915        self
916    }
917
918    /// Executes function `f` just after a task is terminated.
919    ///
920    /// `f` is called within the Tokio context, so functions like
921    /// [`tokio::spawn`](crate::spawn) can be called.
922    ///
923    /// This can be used for bookkeeping or monitoring purposes.
924    ///
925    /// Note: There can only be one task termination callback for a runtime; calling this
926    /// function more than once replaces the last callback defined, rather than adding to it.
927    ///
928    /// This *does not* support [`LocalSet`](crate::task::LocalSet) at this time.
929    ///
930    /// **Note**: This is an [unstable API][unstable]. The public API of this type
931    /// may break in 1.x releases. See [the documentation on unstable
932    /// features][unstable] for details.
933    ///
934    /// [unstable]: crate#unstable-features
935    ///
936    /// # Examples
937    ///
938    /// ```
939    /// # use tokio::runtime;
940    /// # pub fn main() {
941    /// let runtime = runtime::Builder::new_current_thread()
942    ///     .on_task_terminate(|_| {
943    ///         println!("killing task");
944    ///     })
945    ///     .build()
946    ///     .unwrap();
947    ///
948    /// runtime.block_on(async {
949    ///     tokio::task::spawn(std::future::ready(()));
950    ///
951    ///     for _ in 0..64 {
952    ///         tokio::task::yield_now().await;
953    ///     }
954    /// })
955    /// # }
956    /// ```
957    #[cfg(all(not(loom), tokio_unstable))]
958    #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
959    pub fn on_task_terminate<F>(&mut self, f: F) -> &mut Self
960    where
961        F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
962    {
963        self.after_termination = Some(std::sync::Arc::new(f));
964        self
965    }
966
967    /// Creates the configured `Runtime`.
968    ///
969    /// The returned `Runtime` instance is ready to spawn tasks.
970    ///
971    /// # Examples
972    ///
973    /// ```
974    /// # #[cfg(not(target_family = "wasm"))]
975    /// # {
976    /// use tokio::runtime::Builder;
977    ///
978    /// let rt  = Builder::new_multi_thread().build().unwrap();
979    ///
980    /// rt.block_on(async {
981    ///     println!("Hello from the Tokio runtime");
982    /// });
983    /// # }
984    /// ```
985    pub fn build(&mut self) -> io::Result<Runtime> {
986        match &self.kind {
987            Kind::CurrentThread => self.build_current_thread_runtime(),
988            #[cfg(feature = "rt-multi-thread")]
989            Kind::MultiThread => self.build_threaded_runtime(),
990        }
991    }
992
993    /// Creates the configured [`LocalRuntime`].
994    ///
995    /// The returned [`LocalRuntime`] instance is ready to spawn tasks.
996    ///
997    /// # Panics
998    ///
999    /// This will panic if the runtime is configured with [`new_multi_thread()`].
1000    ///
1001    /// [`new_multi_thread()`]: Builder::new_multi_thread
1002    ///
1003    /// # Examples
1004    ///
1005    /// ```
1006    /// use tokio::runtime::{Builder, LocalOptions};
1007    ///
1008    /// let rt = Builder::new_current_thread()
1009    ///     .build_local(LocalOptions::default())
1010    ///     .unwrap();
1011    ///
1012    /// rt.spawn_local(async {
1013    ///     println!("Hello from the Tokio runtime");
1014    /// });
1015    /// ```
1016    #[allow(unused_variables, unreachable_patterns)]
1017    #[cfg(tokio_unstable)]
1018    #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
1019    pub fn build_local(&mut self, options: LocalOptions) -> io::Result<LocalRuntime> {
1020        match &self.kind {
1021            Kind::CurrentThread => self.build_current_thread_local_runtime(),
1022            #[cfg(feature = "rt-multi-thread")]
1023            Kind::MultiThread => panic!("multi_thread is not supported for LocalRuntime"),
1024        }
1025    }
1026
1027    fn get_cfg(&self) -> driver::Cfg {
1028        driver::Cfg {
1029            enable_pause_time: match self.kind {
1030                Kind::CurrentThread => true,
1031                #[cfg(feature = "rt-multi-thread")]
1032                Kind::MultiThread => false,
1033            },
1034            enable_io: self.enable_io,
1035            enable_time: self.enable_time,
1036            start_paused: self.start_paused,
1037            nevents: self.nevents,
1038            timer_flavor: self.timer_flavor,
1039        }
1040    }
1041
1042    /// Sets a custom timeout for a thread in the blocking pool.
1043    ///
1044    /// By default, the timeout for a thread is set to 10 seconds. This can
1045    /// be overridden using `.thread_keep_alive()`.
1046    ///
1047    /// # Example
1048    ///
1049    /// ```
1050    /// # #[cfg(not(target_family = "wasm"))]
1051    /// # {
1052    /// # use tokio::runtime;
1053    /// # use std::time::Duration;
1054    /// # pub fn main() {
1055    /// let rt = runtime::Builder::new_multi_thread()
1056    ///     .thread_keep_alive(Duration::from_millis(100))
1057    ///     .build();
1058    /// # }
1059    /// # }
1060    /// ```
1061    pub fn thread_keep_alive(&mut self, duration: Duration) -> &mut Self {
1062        self.keep_alive = Some(duration);
1063        self
1064    }
1065
1066    /// Sets the number of scheduler ticks after which the scheduler will poll the global
1067    /// task queue.
1068    ///
1069    /// A scheduler "tick" roughly corresponds to one `poll` invocation on a task.
1070    ///
1071    /// By default the global queue interval is 31 for the current-thread scheduler. Please see
1072    /// [the module documentation] for the default behavior of the multi-thread scheduler.
1073    ///
1074    /// Schedulers have a local queue of already-claimed tasks, and a global queue of incoming
1075    /// tasks. Setting the interval to a smaller value increases the fairness of the scheduler,
1076    /// at the cost of more synchronization overhead. That can be beneficial for prioritizing
1077    /// getting started on new work, especially if tasks frequently yield rather than complete
1078    /// or await on further I/O. Setting the interval to `1` will prioritize the global queue and
1079    /// tasks from the local queue will be executed only if the global queue is empty.
1080    /// Conversely, a higher value prioritizes existing work, and is a good choice when most
1081    /// tasks quickly complete polling.
1082    ///
1083    /// [the module documentation]: crate::runtime#multi-threaded-runtime-behavior-at-the-time-of-writing
1084    ///
1085    /// # Panics
1086    ///
1087    /// This function will panic if 0 is passed as an argument.
1088    ///
1089    /// # Examples
1090    ///
1091    /// ```
1092    /// # #[cfg(not(target_family = "wasm"))]
1093    /// # {
1094    /// # use tokio::runtime;
1095    /// # pub fn main() {
1096    /// let rt = runtime::Builder::new_multi_thread()
1097    ///     .global_queue_interval(31)
1098    ///     .build();
1099    /// # }
1100    /// # }
1101    /// ```
1102    #[track_caller]
1103    pub fn global_queue_interval(&mut self, val: u32) -> &mut Self {
1104        assert!(val > 0, "global_queue_interval must be greater than 0");
1105        self.global_queue_interval = Some(val);
1106        self
1107    }
1108
1109    /// Sets the number of scheduler ticks after which the scheduler will poll for
1110    /// external events (timers, I/O, and so on).
1111    ///
1112    /// A scheduler "tick" roughly corresponds to one `poll` invocation on a task.
1113    ///
1114    /// By default, the event interval is `61` for all scheduler types.
1115    ///
1116    /// Setting the event interval determines the effective "priority" of delivering
1117    /// these external events (which may wake up additional tasks), compared to
1118    /// executing tasks that are currently ready to run. A smaller value is useful
1119    /// when tasks frequently spend a long time in polling, or frequently yield,
1120    /// which can result in overly long delays picking up I/O events. Conversely,
1121    /// picking up new events requires extra synchronization and syscall overhead,
1122    /// so if tasks generally complete their polling quickly, a higher event interval
1123    /// will minimize that overhead while still keeping the scheduler responsive to
1124    /// events.
1125    ///
1126    /// # Panics
1127    ///
1128    /// This function will panic if 0 is passed as an argument.
1129    ///
1130    /// # Examples
1131    ///
1132    /// ```
1133    /// # #[cfg(not(target_family = "wasm"))]
1134    /// # {
1135    /// # use tokio::runtime;
1136    /// # pub fn main() {
1137    /// let rt = runtime::Builder::new_multi_thread()
1138    ///     .event_interval(31)
1139    ///     .build();
1140    /// # }
1141    /// # }
1142    /// ```
1143    #[track_caller]
1144    pub fn event_interval(&mut self, val: u32) -> &mut Self {
1145        assert!(val > 0, "event_interval must be greater than 0");
1146        self.event_interval = val;
1147        self
1148    }
1149
1150    cfg_unstable! {
1151        /// Configure how the runtime responds to an unhandled panic on a
1152        /// spawned task.
1153        ///
1154        /// By default, an unhandled panic (i.e. a panic not caught by
1155        /// [`std::panic::catch_unwind`]) has no impact on the runtime's
1156        /// execution. The panic's error value is forwarded to the task's
1157        /// [`JoinHandle`] and all other spawned tasks continue running.
1158        ///
1159        /// The `unhandled_panic` option enables configuring this behavior.
1160        ///
1161        /// * `UnhandledPanic::Ignore` is the default behavior. Panics on
1162        ///   spawned tasks have no impact on the runtime's execution.
1163        /// * `UnhandledPanic::ShutdownRuntime` will force the runtime to
1164        ///   shutdown immediately when a spawned task panics even if that
1165        ///   task's `JoinHandle` has not been dropped. All other spawned tasks
1166        ///   will immediately terminate and further calls to
1167        ///   [`Runtime::block_on`] will panic.
1168        ///
1169        /// # Panics
1170        /// This method panics if called with [`UnhandledPanic::ShutdownRuntime`]
1171        /// on a runtime other than the current thread runtime.
1172        ///
1173        /// # Unstable
1174        ///
1175        /// This option is currently unstable and its implementation is
1176        /// incomplete. The API may change or be removed in the future. See
1177        /// issue [tokio-rs/tokio#4516] for more details.
1178        ///
1179        /// # Examples
1180        ///
1181        /// The following demonstrates a runtime configured to shutdown on
1182        /// panic. The first spawned task panics and results in the runtime
1183        /// shutting down. The second spawned task never has a chance to
1184        /// execute. The call to `block_on` will panic due to the runtime being
1185        /// forcibly shutdown.
1186        ///
1187        /// ```should_panic
1188        /// use tokio::runtime::{self, UnhandledPanic};
1189        ///
1190        /// # pub fn main() {
1191        /// let rt = runtime::Builder::new_current_thread()
1192        ///     .unhandled_panic(UnhandledPanic::ShutdownRuntime)
1193        ///     .build()
1194        ///     .unwrap();
1195        ///
1196        /// rt.spawn(async { panic!("boom"); });
1197        /// rt.spawn(async {
1198        ///     // This task never completes.
1199        /// });
1200        ///
1201        /// rt.block_on(async {
1202        ///     // Do some work
1203        /// # loop { tokio::task::yield_now().await; }
1204        /// })
1205        /// # }
1206        /// ```
1207        ///
1208        /// [`JoinHandle`]: struct@crate::task::JoinHandle
1209        /// [tokio-rs/tokio#4516]: https://github.com/tokio-rs/tokio/issues/4516
1210        pub fn unhandled_panic(&mut self, behavior: UnhandledPanic) -> &mut Self {
1211            if !matches!(self.kind, Kind::CurrentThread) && matches!(behavior, UnhandledPanic::ShutdownRuntime) {
1212                panic!("UnhandledPanic::ShutdownRuntime is only supported in current thread runtime");
1213            }
1214
1215            self.unhandled_panic = behavior;
1216            self
1217        }
1218
1219        /// Disables the LIFO task scheduler heuristic.
1220        ///
1221        /// The multi-threaded scheduler includes a heuristic for optimizing
1222        /// message-passing patterns. This heuristic results in the **last**
1223        /// scheduled task being polled first.
1224        ///
1225        /// To implement this heuristic, each worker thread has a slot which
1226        /// holds the task that should be polled next. However, this slot cannot
1227        /// be stolen by other worker threads, which can result in lower total
1228        /// throughput when tasks tend to have longer poll times.
1229        ///
1230        /// This configuration option will disable this heuristic resulting in
1231        /// all scheduled tasks being pushed into the worker-local queue, which
1232        /// is stealable.
1233        ///
1234        /// Consider trying this option when the task "scheduled" time is high
1235        /// but the runtime is underutilized. Use [tokio-rs/tokio-metrics] to
1236        /// collect this data.
1237        ///
1238        /// # Unstable
1239        ///
1240        /// This configuration option is considered a workaround for the LIFO
1241        /// slot not being stealable. When the slot becomes stealable, we will
1242        /// revisit whether or not this option is necessary. See
1243        /// issue [tokio-rs/tokio#4941].
1244        ///
1245        /// # Examples
1246        ///
1247        /// ```
1248        /// # #[cfg(not(target_family = "wasm"))]
1249        /// # {
1250        /// use tokio::runtime;
1251        ///
1252        /// let rt = runtime::Builder::new_multi_thread()
1253        ///     .disable_lifo_slot()
1254        ///     .build()
1255        ///     .unwrap();
1256        /// # }
1257        /// ```
1258        ///
1259        /// [tokio-rs/tokio-metrics]: https://github.com/tokio-rs/tokio-metrics
1260        /// [tokio-rs/tokio#4941]: https://github.com/tokio-rs/tokio/issues/4941
1261        pub fn disable_lifo_slot(&mut self) -> &mut Self {
1262            self.disable_lifo_slot = true;
1263            self
1264        }
1265
1266        /// Specifies the random number generation seed to use within all
1267        /// threads associated with the runtime being built.
1268        ///
1269        /// This option is intended to make certain parts of the runtime
1270        /// deterministic (e.g. the [`tokio::select!`] macro). In the case of
1271        /// [`tokio::select!`] it will ensure that the order that branches are
1272        /// polled is deterministic.
1273        ///
1274        /// In addition to the code specifying `rng_seed` and interacting with
1275        /// the runtime, the internals of Tokio and the Rust compiler may affect
1276        /// the sequences of random numbers. In order to ensure repeatable
1277        /// results, the version of Tokio, the versions of all other
1278        /// dependencies that interact with Tokio, and the Rust compiler version
1279        /// should also all remain constant.
1280        ///
1281        /// # Examples
1282        ///
1283        /// ```
1284        /// # use tokio::runtime::{self, RngSeed};
1285        /// # pub fn main() {
1286        /// let seed = RngSeed::from_bytes(b"place your seed here");
1287        /// let rt = runtime::Builder::new_current_thread()
1288        ///     .rng_seed(seed)
1289        ///     .build();
1290        /// # }
1291        /// ```
1292        ///
1293        /// [`tokio::select!`]: crate::select
1294        pub fn rng_seed(&mut self, seed: RngSeed) -> &mut Self {
1295            self.seed_generator = RngSeedGenerator::new(seed);
1296            self
1297        }
1298    }
1299
1300    cfg_unstable_metrics! {
1301        /// Enables tracking the distribution of task poll times.
1302        ///
1303        /// Task poll times are not instrumented by default as doing so requires
1304        /// calling [`Instant::now()`] twice per task poll, which could add
1305        /// measurable overhead. Use the [`Handle::metrics()`] to access the
1306        /// metrics data.
1307        ///
1308        /// The histogram uses fixed bucket sizes. In other words, the histogram
1309        /// buckets are not dynamic based on input values. Use the
1310        /// `metrics_poll_time_histogram` builder methods to configure the
1311        /// histogram details.
1312        ///
1313        /// By default, a linear histogram with 10 buckets each 100 microseconds wide will be used.
1314        /// This has an extremely low memory footprint, but may not provide enough granularity. For
1315        /// better granularity with low memory usage, use [`metrics_poll_time_histogram_configuration()`]
1316        /// to select [`LogHistogram`] instead.
1317        ///
1318        /// # Examples
1319        ///
1320        /// ```
1321        /// # #[cfg(not(target_family = "wasm"))]
1322        /// # {
1323        /// use tokio::runtime;
1324        ///
1325        /// let rt = runtime::Builder::new_multi_thread()
1326        ///     .enable_metrics_poll_time_histogram()
1327        ///     .build()
1328        ///     .unwrap();
1329        /// # // Test default values here
1330        /// # fn us(n: u64) -> std::time::Duration { std::time::Duration::from_micros(n) }
1331        /// # let m = rt.handle().metrics();
1332        /// # assert_eq!(m.poll_time_histogram_num_buckets(), 10);
1333        /// # assert_eq!(m.poll_time_histogram_bucket_range(0), us(0)..us(100));
1334        /// # assert_eq!(m.poll_time_histogram_bucket_range(1), us(100)..us(200));
1335        /// # }
1336        /// ```
1337        ///
1338        /// [`Handle::metrics()`]: crate::runtime::Handle::metrics
1339        /// [`Instant::now()`]: std::time::Instant::now
1340        /// [`LogHistogram`]: crate::runtime::LogHistogram
1341        /// [`metrics_poll_time_histogram_configuration()`]: Builder::metrics_poll_time_histogram_configuration
1342        pub fn enable_metrics_poll_time_histogram(&mut self) -> &mut Self {
1343            self.metrics_poll_count_histogram_enable = true;
1344            self
1345        }
1346
1347        /// Deprecated. Use [`enable_metrics_poll_time_histogram()`] instead.
1348        ///
1349        /// [`enable_metrics_poll_time_histogram()`]: Builder::enable_metrics_poll_time_histogram
1350        #[deprecated(note = "`poll_count_histogram` related methods have been renamed `poll_time_histogram` to better reflect their functionality.")]
1351        #[doc(hidden)]
1352        pub fn enable_metrics_poll_count_histogram(&mut self) -> &mut Self {
1353            self.enable_metrics_poll_time_histogram()
1354        }
1355
1356        /// Sets the histogram scale for tracking the distribution of task poll
1357        /// times.
1358        ///
1359        /// Tracking the distribution of task poll times can be done using a
1360        /// linear or log scale. When using linear scale, each histogram bucket
1361        /// will represent the same range of poll times. When using log scale,
1362        /// each histogram bucket will cover a range twice as big as the
1363        /// previous bucket.
1364        ///
1365        /// **Default:** linear scale.
1366        ///
1367        /// # Examples
1368        ///
1369        /// ```
1370        /// # #[cfg(not(target_family = "wasm"))]
1371        /// # {
1372        /// use tokio::runtime::{self, HistogramScale};
1373        ///
1374        /// # #[allow(deprecated)]
1375        /// let rt = runtime::Builder::new_multi_thread()
1376        ///     .enable_metrics_poll_time_histogram()
1377        ///     .metrics_poll_count_histogram_scale(HistogramScale::Log)
1378        ///     .build()
1379        ///     .unwrap();
1380        /// # }
1381        /// ```
1382        #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")]
1383        pub fn metrics_poll_count_histogram_scale(&mut self, histogram_scale: crate::runtime::HistogramScale) -> &mut Self {
1384            self.metrics_poll_count_histogram.legacy_mut(|b|b.scale = histogram_scale);
1385            self
1386        }
1387
1388        /// Configure the histogram for tracking poll times
1389        ///
1390        /// By default, a linear histogram with 10 buckets each 100 microseconds wide will be used.
1391        /// This has an extremely low memory footprint, but may not provide enough granularity. For
1392        /// better granularity with low memory usage, use [`LogHistogram`] instead.
1393        ///
1394        /// # Examples
1395        /// Configure a [`LogHistogram`] with [default configuration]:
1396        /// ```
1397        /// # #[cfg(not(target_family = "wasm"))]
1398        /// # {
1399        /// use tokio::runtime;
1400        /// use tokio::runtime::{HistogramConfiguration, LogHistogram};
1401        ///
1402        /// let rt = runtime::Builder::new_multi_thread()
1403        ///     .enable_metrics_poll_time_histogram()
1404        ///     .metrics_poll_time_histogram_configuration(
1405        ///         HistogramConfiguration::log(LogHistogram::default())
1406        ///     )
1407        ///     .build()
1408        ///     .unwrap();
1409        /// # }
1410        /// ```
1411        ///
1412        /// Configure a linear histogram with 100 buckets, each 10μs wide
1413        /// ```
1414        /// # #[cfg(not(target_family = "wasm"))]
1415        /// # {
1416        /// use tokio::runtime;
1417        /// use std::time::Duration;
1418        /// use tokio::runtime::HistogramConfiguration;
1419        ///
1420        /// let rt = runtime::Builder::new_multi_thread()
1421        ///     .enable_metrics_poll_time_histogram()
1422        ///     .metrics_poll_time_histogram_configuration(
1423        ///         HistogramConfiguration::linear(Duration::from_micros(10), 100)
1424        ///     )
1425        ///     .build()
1426        ///     .unwrap();
1427        /// # }
1428        /// ```
1429        ///
1430        /// Configure a [`LogHistogram`] with the following settings:
1431        /// - Measure times from 100ns to 120s
1432        /// - Max error of 0.1
1433        /// - No more than 1024 buckets
1434        /// ```
1435        /// # #[cfg(not(target_family = "wasm"))]
1436        /// # {
1437        /// use std::time::Duration;
1438        /// use tokio::runtime;
1439        /// use tokio::runtime::{HistogramConfiguration, LogHistogram};
1440        ///
1441        /// let rt = runtime::Builder::new_multi_thread()
1442        ///     .enable_metrics_poll_time_histogram()
1443        ///     .metrics_poll_time_histogram_configuration(
1444        ///         HistogramConfiguration::log(LogHistogram::builder()
1445        ///             .max_value(Duration::from_secs(120))
1446        ///             .min_value(Duration::from_nanos(100))
1447        ///             .max_error(0.1)
1448        ///             .max_buckets(1024)
1449        ///             .expect("configuration uses 488 buckets")
1450        ///         )
1451        ///     )
1452        ///     .build()
1453        ///     .unwrap();
1454        /// # }
1455        /// ```
1456        ///
1457        /// When migrating from the legacy histogram ([`HistogramScale::Log`]) and wanting
1458        /// to match the previous behavior, use `precision_exact(0)`. This creates a histogram
1459        /// where each bucket is twice the size of the previous bucket.
1460        /// ```rust
1461        /// use std::time::Duration;
1462        /// use tokio::runtime::{HistogramConfiguration, LogHistogram};
1463        /// let rt = tokio::runtime::Builder::new_current_thread()
1464        ///     .enable_all()
1465        ///     .enable_metrics_poll_time_histogram()
1466        ///     .metrics_poll_time_histogram_configuration(HistogramConfiguration::log(
1467        ///         LogHistogram::builder()
1468        ///             .min_value(Duration::from_micros(20))
1469        ///             .max_value(Duration::from_millis(4))
1470        ///             // Set `precision_exact` to `0` to match `HistogramScale::Log`
1471        ///             .precision_exact(0)
1472        ///             .max_buckets(10)
1473        ///             .unwrap(),
1474        ///     ))
1475        ///     .build()
1476        ///     .unwrap();
1477        /// ```
1478        ///
1479        /// [`LogHistogram`]: crate::runtime::LogHistogram
1480        /// [default configuration]: crate::runtime::LogHistogramBuilder
1481        /// [`HistogramScale::Log`]: crate::runtime::HistogramScale::Log
1482        pub fn metrics_poll_time_histogram_configuration(&mut self, configuration: HistogramConfiguration) -> &mut Self {
1483            self.metrics_poll_count_histogram.histogram_type = configuration.inner;
1484            self
1485        }
1486
1487        /// Sets the histogram resolution for tracking the distribution of task
1488        /// poll times.
1489        ///
1490        /// The resolution is the histogram's first bucket's range. When using a
1491        /// linear histogram scale, each bucket will cover the same range. When
1492        /// using a log scale, each bucket will cover a range twice as big as
1493        /// the previous bucket. In the log case, the resolution represents the
1494        /// smallest bucket range.
1495        ///
1496        /// Note that, when using log scale, the resolution is rounded up to the
1497        /// nearest power of 2 in nanoseconds.
1498        ///
1499        /// **Default:** 100 microseconds.
1500        ///
1501        /// # Examples
1502        ///
1503        /// ```
1504        /// # #[cfg(not(target_family = "wasm"))]
1505        /// # {
1506        /// use tokio::runtime;
1507        /// use std::time::Duration;
1508        ///
1509        /// # #[allow(deprecated)]
1510        /// let rt = runtime::Builder::new_multi_thread()
1511        ///     .enable_metrics_poll_time_histogram()
1512        ///     .metrics_poll_count_histogram_resolution(Duration::from_micros(100))
1513        ///     .build()
1514        ///     .unwrap();
1515        /// # }
1516        /// ```
1517        #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")]
1518        pub fn metrics_poll_count_histogram_resolution(&mut self, resolution: Duration) -> &mut Self {
1519            assert!(resolution > Duration::from_secs(0));
1520            // Sanity check the argument and also make the cast below safe.
1521            assert!(resolution <= Duration::from_secs(1));
1522
1523            let resolution = resolution.as_nanos() as u64;
1524
1525            self.metrics_poll_count_histogram.legacy_mut(|b|b.resolution = resolution);
1526            self
1527        }
1528
1529        /// Sets the number of buckets for the histogram tracking the
1530        /// distribution of task poll times.
1531        ///
1532        /// The last bucket tracks all greater values that fall out of other
1533        /// ranges. So, configuring the histogram using a linear scale,
1534        /// resolution of 50ms, and 10 buckets, the 10th bucket will track task
1535        /// polls that take more than 450ms to complete.
1536        ///
1537        /// **Default:** 10
1538        ///
1539        /// # Examples
1540        ///
1541        /// ```
1542        /// # #[cfg(not(target_family = "wasm"))]
1543        /// # {
1544        /// use tokio::runtime;
1545        ///
1546        /// # #[allow(deprecated)]
1547        /// let rt = runtime::Builder::new_multi_thread()
1548        ///     .enable_metrics_poll_time_histogram()
1549        ///     .metrics_poll_count_histogram_buckets(15)
1550        ///     .build()
1551        ///     .unwrap();
1552        /// # }
1553        /// ```
1554        #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")]
1555        pub fn metrics_poll_count_histogram_buckets(&mut self, buckets: usize) -> &mut Self {
1556            self.metrics_poll_count_histogram.legacy_mut(|b|b.num_buckets = buckets);
1557            self
1558        }
1559    }
1560
1561    fn build_current_thread_runtime(&mut self) -> io::Result<Runtime> {
1562        use crate::runtime::runtime::Scheduler;
1563
1564        let (scheduler, handle, blocking_pool) =
1565            self.build_current_thread_runtime_components(None)?;
1566
1567        Ok(Runtime::from_parts(
1568            Scheduler::CurrentThread(scheduler),
1569            handle,
1570            blocking_pool,
1571        ))
1572    }
1573
1574    #[cfg(tokio_unstable)]
1575    fn build_current_thread_local_runtime(&mut self) -> io::Result<LocalRuntime> {
1576        use crate::runtime::local_runtime::LocalRuntimeScheduler;
1577
1578        let tid = std::thread::current().id();
1579
1580        let (scheduler, handle, blocking_pool) =
1581            self.build_current_thread_runtime_components(Some(tid))?;
1582
1583        Ok(LocalRuntime::from_parts(
1584            LocalRuntimeScheduler::CurrentThread(scheduler),
1585            handle,
1586            blocking_pool,
1587        ))
1588    }
1589
1590    fn build_current_thread_runtime_components(
1591        &mut self,
1592        local_tid: Option<ThreadId>,
1593    ) -> io::Result<(CurrentThread, Handle, BlockingPool)> {
1594        use crate::runtime::scheduler;
1595        use crate::runtime::Config;
1596
1597        let mut cfg = self.get_cfg();
1598        cfg.timer_flavor = TimerFlavor::Traditional;
1599        let (driver, driver_handle) = driver::Driver::new(cfg)?;
1600
1601        // Blocking pool
1602        let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads);
1603        let blocking_spawner = blocking_pool.spawner().clone();
1604
1605        // Generate a rng seed for this runtime.
1606        let seed_generator_1 = self.seed_generator.next_generator();
1607        let seed_generator_2 = self.seed_generator.next_generator();
1608
1609        // And now put a single-threaded scheduler on top of the timer. When
1610        // there are no futures ready to do something, it'll let the timer or
1611        // the reactor to generate some new stimuli for the futures to continue
1612        // in their life.
1613        let (scheduler, handle) = CurrentThread::new(
1614            driver,
1615            driver_handle,
1616            blocking_spawner,
1617            seed_generator_2,
1618            Config {
1619                before_park: self.before_park.clone(),
1620                after_unpark: self.after_unpark.clone(),
1621                before_spawn: self.before_spawn.clone(),
1622                #[cfg(tokio_unstable)]
1623                before_poll: self.before_poll.clone(),
1624                #[cfg(tokio_unstable)]
1625                after_poll: self.after_poll.clone(),
1626                after_termination: self.after_termination.clone(),
1627                global_queue_interval: self.global_queue_interval,
1628                event_interval: self.event_interval,
1629                #[cfg(tokio_unstable)]
1630                unhandled_panic: self.unhandled_panic.clone(),
1631                disable_lifo_slot: self.disable_lifo_slot,
1632                seed_generator: seed_generator_1,
1633                metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
1634            },
1635            local_tid,
1636        );
1637
1638        let handle = Handle {
1639            inner: scheduler::Handle::CurrentThread(handle),
1640        };
1641
1642        Ok((scheduler, handle, blocking_pool))
1643    }
1644
1645    fn metrics_poll_count_histogram_builder(&self) -> Option<HistogramBuilder> {
1646        if self.metrics_poll_count_histogram_enable {
1647            Some(self.metrics_poll_count_histogram.clone())
1648        } else {
1649            None
1650        }
1651    }
1652}
1653
1654cfg_io_driver! {
1655    impl Builder {
1656        /// Enables the I/O driver.
1657        ///
1658        /// Doing this enables using net, process, signal, and some I/O types on
1659        /// the runtime.
1660        ///
1661        /// # Examples
1662        ///
1663        /// ```
1664        /// use tokio::runtime;
1665        ///
1666        /// let rt = runtime::Builder::new_multi_thread()
1667        ///     .enable_io()
1668        ///     .build()
1669        ///     .unwrap();
1670        /// ```
1671        pub fn enable_io(&mut self) -> &mut Self {
1672            self.enable_io = true;
1673            self
1674        }
1675
1676        /// Enables the I/O driver and configures the max number of events to be
1677        /// processed per tick.
1678        ///
1679        /// # Examples
1680        ///
1681        /// ```
1682        /// use tokio::runtime;
1683        ///
1684        /// let rt = runtime::Builder::new_current_thread()
1685        ///     .enable_io()
1686        ///     .max_io_events_per_tick(1024)
1687        ///     .build()
1688        ///     .unwrap();
1689        /// ```
1690        pub fn max_io_events_per_tick(&mut self, capacity: usize) -> &mut Self {
1691            self.nevents = capacity;
1692            self
1693        }
1694    }
1695}
1696
1697cfg_time! {
1698    impl Builder {
1699        /// Enables the time driver.
1700        ///
1701        /// Doing this enables using `tokio::time` on the runtime.
1702        ///
1703        /// # Examples
1704        ///
1705        /// ```
1706        /// # #[cfg(not(target_family = "wasm"))]
1707        /// # {
1708        /// use tokio::runtime;
1709        ///
1710        /// let rt = runtime::Builder::new_multi_thread()
1711        ///     .enable_time()
1712        ///     .build()
1713        ///     .unwrap();
1714        /// # }
1715        /// ```
1716        pub fn enable_time(&mut self) -> &mut Self {
1717            self.enable_time = true;
1718            self
1719        }
1720    }
1721}
1722
1723cfg_io_uring! {
1724    impl Builder {
1725        /// Enables the tokio's io_uring driver.
1726        ///
1727        /// Doing this enables using io_uring operations on the runtime.
1728        ///
1729        /// # Examples
1730        ///
1731        /// ```
1732        /// use tokio::runtime;
1733        ///
1734        /// let rt = runtime::Builder::new_multi_thread()
1735        ///     .enable_io_uring()
1736        ///     .build()
1737        ///     .unwrap();
1738        /// ```
1739        #[cfg_attr(docsrs, doc(cfg(feature = "io-uring")))]
1740        pub fn enable_io_uring(&mut self) -> &mut Self {
1741            // Currently, the uring flag is equivalent to `enable_io`.
1742            self.enable_io = true;
1743            self
1744        }
1745    }
1746}
1747
1748cfg_test_util! {
1749    impl Builder {
1750        /// Controls if the runtime's clock starts paused or advancing.
1751        ///
1752        /// Pausing time requires the current-thread runtime; construction of
1753        /// the runtime will panic otherwise.
1754        ///
1755        /// # Examples
1756        ///
1757        /// ```
1758        /// use tokio::runtime;
1759        ///
1760        /// let rt = runtime::Builder::new_current_thread()
1761        ///     .enable_time()
1762        ///     .start_paused(true)
1763        ///     .build()
1764        ///     .unwrap();
1765        /// ```
1766        pub fn start_paused(&mut self, start_paused: bool) -> &mut Self {
1767            self.start_paused = start_paused;
1768            self
1769        }
1770    }
1771}
1772
1773cfg_rt_multi_thread! {
1774    impl Builder {
1775        fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
1776            use crate::loom::sys::num_cpus;
1777            use crate::runtime::{Config, runtime::Scheduler};
1778            use crate::runtime::scheduler::{self, MultiThread};
1779
1780            let worker_threads = self.worker_threads.unwrap_or_else(num_cpus);
1781
1782            let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?;
1783
1784            // Create the blocking pool
1785            let blocking_pool =
1786                blocking::create_blocking_pool(self, self.max_blocking_threads + worker_threads);
1787            let blocking_spawner = blocking_pool.spawner().clone();
1788
1789            // Generate a rng seed for this runtime.
1790            let seed_generator_1 = self.seed_generator.next_generator();
1791            let seed_generator_2 = self.seed_generator.next_generator();
1792
1793            let (scheduler, handle, launch) = MultiThread::new(
1794                worker_threads,
1795                driver,
1796                driver_handle,
1797                blocking_spawner,
1798                seed_generator_2,
1799                Config {
1800                    before_park: self.before_park.clone(),
1801                    after_unpark: self.after_unpark.clone(),
1802                    before_spawn: self.before_spawn.clone(),
1803                    #[cfg(tokio_unstable)]
1804                    before_poll: self.before_poll.clone(),
1805                    #[cfg(tokio_unstable)]
1806                    after_poll: self.after_poll.clone(),
1807                    after_termination: self.after_termination.clone(),
1808                    global_queue_interval: self.global_queue_interval,
1809                    event_interval: self.event_interval,
1810                    #[cfg(tokio_unstable)]
1811                    unhandled_panic: self.unhandled_panic.clone(),
1812                    disable_lifo_slot: self.disable_lifo_slot,
1813                    seed_generator: seed_generator_1,
1814                    metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
1815                },
1816                self.timer_flavor,
1817            );
1818
1819            let handle = Handle { inner: scheduler::Handle::MultiThread(handle) };
1820
1821            // Spawn the thread pool workers
1822            let _enter = handle.enter();
1823            launch.launch();
1824
1825            Ok(Runtime::from_parts(Scheduler::MultiThread(scheduler), handle, blocking_pool))
1826        }
1827    }
1828}
1829
1830impl fmt::Debug for Builder {
1831    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1832        fmt.debug_struct("Builder")
1833            .field("worker_threads", &self.worker_threads)
1834            .field("max_blocking_threads", &self.max_blocking_threads)
1835            .field(
1836                "thread_name",
1837                &"<dyn Fn() -> String + Send + Sync + 'static>",
1838            )
1839            .field("thread_stack_size", &self.thread_stack_size)
1840            .field("after_start", &self.after_start.as_ref().map(|_| "..."))
1841            .field("before_stop", &self.before_stop.as_ref().map(|_| "..."))
1842            .field("before_park", &self.before_park.as_ref().map(|_| "..."))
1843            .field("after_unpark", &self.after_unpark.as_ref().map(|_| "..."))
1844            .finish()
1845    }
1846}