tokio/runtime/builder.rs
1#![cfg_attr(loom, allow(unused_imports))]
2
3use crate::runtime::handle::Handle;
4use crate::runtime::{
5 blocking, driver, Callback, HistogramBuilder, Runtime, TaskCallback, TimerFlavor,
6};
7#[cfg(tokio_unstable)]
8use crate::runtime::{metrics::HistogramConfiguration, LocalOptions, LocalRuntime, TaskMeta};
9use crate::util::rand::{RngSeed, RngSeedGenerator};
10
11use crate::runtime::blocking::BlockingPool;
12use crate::runtime::scheduler::CurrentThread;
13use std::fmt;
14use std::io;
15use std::thread::ThreadId;
16use std::time::Duration;
17
18/// Builds Tokio Runtime with custom configuration values.
19///
20/// Methods can be chained in order to set the configuration values. The
21/// Runtime is constructed by calling [`build`].
22///
23/// New instances of `Builder` are obtained via [`Builder::new_multi_thread`]
24/// or [`Builder::new_current_thread`].
25///
26/// See function level documentation for details on the various configuration
27/// settings.
28///
29/// [`build`]: method@Self::build
30/// [`Builder::new_multi_thread`]: method@Self::new_multi_thread
31/// [`Builder::new_current_thread`]: method@Self::new_current_thread
32///
33/// # Examples
34///
35/// ```
36/// # #[cfg(not(target_family = "wasm"))]
37/// # {
38/// use tokio::runtime::Builder;
39///
40/// fn main() {
41/// // build runtime
42/// let runtime = Builder::new_multi_thread()
43/// .worker_threads(4)
44/// .thread_name("my-custom-name")
45/// .thread_stack_size(3 * 1024 * 1024)
46/// .build()
47/// .unwrap();
48///
49/// // use runtime ...
50/// }
51/// # }
52/// ```
53pub struct Builder {
54 /// Runtime type
55 kind: Kind,
56
57 /// Whether or not to enable the I/O driver
58 enable_io: bool,
59 nevents: usize,
60
61 /// Whether or not to enable the time driver
62 enable_time: bool,
63
64 /// Whether or not the clock should start paused.
65 start_paused: bool,
66
67 /// The number of worker threads, used by Runtime.
68 ///
69 /// Only used when not using the current-thread executor.
70 worker_threads: Option<usize>,
71
72 /// Cap on thread usage.
73 max_blocking_threads: usize,
74
75 /// Name fn used for threads spawned by the runtime.
76 pub(super) thread_name: ThreadNameFn,
77
78 /// Stack size used for threads spawned by the runtime.
79 pub(super) thread_stack_size: Option<usize>,
80
81 /// Callback to run after each thread starts.
82 pub(super) after_start: Option<Callback>,
83
84 /// To run before each worker thread stops
85 pub(super) before_stop: Option<Callback>,
86
87 /// To run before each worker thread is parked.
88 pub(super) before_park: Option<Callback>,
89
90 /// To run after each thread is unparked.
91 pub(super) after_unpark: Option<Callback>,
92
93 /// To run before each task is spawned.
94 pub(super) before_spawn: Option<TaskCallback>,
95
96 /// To run before each poll
97 #[cfg(tokio_unstable)]
98 pub(super) before_poll: Option<TaskCallback>,
99
100 /// To run after each poll
101 #[cfg(tokio_unstable)]
102 pub(super) after_poll: Option<TaskCallback>,
103
104 /// To run after each task is terminated.
105 pub(super) after_termination: Option<TaskCallback>,
106
107 /// Customizable keep alive timeout for `BlockingPool`
108 pub(super) keep_alive: Option<Duration>,
109
110 /// How many ticks before pulling a task from the global/remote queue?
111 ///
112 /// When `None`, the value is unspecified and behavior details are left to
113 /// the scheduler. Each scheduler flavor could choose to either pick its own
114 /// default value or use some other strategy to decide when to poll from the
115 /// global queue. For example, the multi-threaded scheduler uses a
116 /// self-tuning strategy based on mean task poll times.
117 pub(super) global_queue_interval: Option<u32>,
118
119 /// How many ticks before yielding to the driver for timer and I/O events?
120 pub(super) event_interval: u32,
121
122 /// When true, the multi-threade scheduler LIFO slot should not be used.
123 ///
124 /// This option should only be exposed as unstable.
125 pub(super) disable_lifo_slot: bool,
126
127 /// Specify a random number generator seed to provide deterministic results
128 pub(super) seed_generator: RngSeedGenerator,
129
130 /// When true, enables task poll count histogram instrumentation.
131 pub(super) metrics_poll_count_histogram_enable: bool,
132
133 /// Configures the task poll count histogram
134 pub(super) metrics_poll_count_histogram: HistogramBuilder,
135
136 #[cfg(tokio_unstable)]
137 pub(super) unhandled_panic: UnhandledPanic,
138
139 timer_flavor: TimerFlavor,
140}
141
142cfg_unstable! {
143 /// How the runtime should respond to unhandled panics.
144 ///
145 /// Instances of `UnhandledPanic` are passed to `Builder::unhandled_panic`
146 /// to configure the runtime behavior when a spawned task panics.
147 ///
148 /// See [`Builder::unhandled_panic`] for more details.
149 #[derive(Debug, Clone)]
150 #[non_exhaustive]
151 pub enum UnhandledPanic {
152 /// The runtime should ignore panics on spawned tasks.
153 ///
154 /// The panic is forwarded to the task's [`JoinHandle`] and all spawned
155 /// tasks continue running normally.
156 ///
157 /// This is the default behavior.
158 ///
159 /// # Examples
160 ///
161 /// ```
162 /// # #[cfg(not(target_family = "wasm"))]
163 /// # {
164 /// use tokio::runtime::{self, UnhandledPanic};
165 ///
166 /// # pub fn main() {
167 /// let rt = runtime::Builder::new_current_thread()
168 /// .unhandled_panic(UnhandledPanic::Ignore)
169 /// .build()
170 /// .unwrap();
171 ///
172 /// let task1 = rt.spawn(async { panic!("boom"); });
173 /// let task2 = rt.spawn(async {
174 /// // This task completes normally
175 /// "done"
176 /// });
177 ///
178 /// rt.block_on(async {
179 /// // The panic on the first task is forwarded to the `JoinHandle`
180 /// assert!(task1.await.is_err());
181 ///
182 /// // The second task completes normally
183 /// assert!(task2.await.is_ok());
184 /// })
185 /// # }
186 /// # }
187 /// ```
188 ///
189 /// [`JoinHandle`]: struct@crate::task::JoinHandle
190 Ignore,
191
192 /// The runtime should immediately shutdown if a spawned task panics.
193 ///
194 /// The runtime will immediately shutdown even if the panicked task's
195 /// [`JoinHandle`] is still available. All further spawned tasks will be
196 /// immediately dropped and call to [`Runtime::block_on`] will panic.
197 ///
198 /// # Examples
199 ///
200 /// ```should_panic
201 /// use tokio::runtime::{self, UnhandledPanic};
202 ///
203 /// # pub fn main() {
204 /// let rt = runtime::Builder::new_current_thread()
205 /// .unhandled_panic(UnhandledPanic::ShutdownRuntime)
206 /// .build()
207 /// .unwrap();
208 ///
209 /// rt.spawn(async { panic!("boom"); });
210 /// rt.spawn(async {
211 /// // This task never completes.
212 /// });
213 ///
214 /// rt.block_on(async {
215 /// // Do some work
216 /// # loop { tokio::task::yield_now().await; }
217 /// })
218 /// # }
219 /// ```
220 ///
221 /// [`JoinHandle`]: struct@crate::task::JoinHandle
222 ShutdownRuntime,
223 }
224}
225
226pub(crate) type ThreadNameFn = std::sync::Arc<dyn Fn() -> String + Send + Sync + 'static>;
227
228#[derive(Clone, Copy)]
229pub(crate) enum Kind {
230 CurrentThread,
231 #[cfg(feature = "rt-multi-thread")]
232 MultiThread,
233}
234
235impl Builder {
236 /// Returns a new builder with the current thread scheduler selected.
237 ///
238 /// Configuration methods can be chained on the return value.
239 ///
240 /// To spawn non-`Send` tasks on the resulting runtime, combine it with a
241 /// [`LocalSet`], or call [`build_local`] to create a [`LocalRuntime`] (unstable).
242 ///
243 /// [`LocalSet`]: crate::task::LocalSet
244 /// [`LocalRuntime`]: crate::runtime::LocalRuntime
245 /// [`build_local`]: crate::runtime::Builder::build_local
246 pub fn new_current_thread() -> Builder {
247 #[cfg(loom)]
248 const EVENT_INTERVAL: u32 = 4;
249 // The number `61` is fairly arbitrary. I believe this value was copied from golang.
250 #[cfg(not(loom))]
251 const EVENT_INTERVAL: u32 = 61;
252
253 Builder::new(Kind::CurrentThread, EVENT_INTERVAL)
254 }
255
256 /// Returns a new builder with the multi thread scheduler selected.
257 ///
258 /// Configuration methods can be chained on the return value.
259 #[cfg(feature = "rt-multi-thread")]
260 #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
261 pub fn new_multi_thread() -> Builder {
262 // The number `61` is fairly arbitrary. I believe this value was copied from golang.
263 Builder::new(Kind::MultiThread, 61)
264 }
265
266 /// Returns a new runtime builder initialized with default configuration
267 /// values.
268 ///
269 /// Configuration methods can be chained on the return value.
270 pub(crate) fn new(kind: Kind, event_interval: u32) -> Builder {
271 Builder {
272 kind,
273
274 // I/O defaults to "off"
275 enable_io: false,
276 nevents: 1024,
277
278 // Time defaults to "off"
279 enable_time: false,
280
281 // The clock starts not-paused
282 start_paused: false,
283
284 // Read from environment variable first in multi-threaded mode.
285 // Default to lazy auto-detection (one thread per CPU core)
286 worker_threads: None,
287
288 max_blocking_threads: 512,
289
290 // Default thread name
291 thread_name: std::sync::Arc::new(|| "tokio-rt-worker".into()),
292
293 // Do not set a stack size by default
294 thread_stack_size: None,
295
296 // No worker thread callbacks
297 after_start: None,
298 before_stop: None,
299 before_park: None,
300 after_unpark: None,
301
302 before_spawn: None,
303 after_termination: None,
304
305 #[cfg(tokio_unstable)]
306 before_poll: None,
307 #[cfg(tokio_unstable)]
308 after_poll: None,
309
310 keep_alive: None,
311
312 // Defaults for these values depend on the scheduler kind, so we get them
313 // as parameters.
314 global_queue_interval: None,
315 event_interval,
316
317 seed_generator: RngSeedGenerator::new(RngSeed::new()),
318
319 #[cfg(tokio_unstable)]
320 unhandled_panic: UnhandledPanic::Ignore,
321
322 metrics_poll_count_histogram_enable: false,
323
324 metrics_poll_count_histogram: HistogramBuilder::default(),
325
326 disable_lifo_slot: false,
327
328 timer_flavor: TimerFlavor::Traditional,
329 }
330 }
331
332 /// Enables both I/O and time drivers.
333 ///
334 /// Doing this is a shorthand for calling `enable_io` and `enable_time`
335 /// individually. If additional components are added to Tokio in the future,
336 /// `enable_all` will include these future components.
337 ///
338 /// # Examples
339 ///
340 /// ```
341 /// # #[cfg(not(target_family = "wasm"))]
342 /// # {
343 /// use tokio::runtime;
344 ///
345 /// let rt = runtime::Builder::new_multi_thread()
346 /// .enable_all()
347 /// .build()
348 /// .unwrap();
349 /// # }
350 /// ```
351 pub fn enable_all(&mut self) -> &mut Self {
352 #[cfg(any(
353 feature = "net",
354 all(unix, feature = "process"),
355 all(unix, feature = "signal")
356 ))]
357 self.enable_io();
358
359 #[cfg(all(
360 tokio_unstable,
361 feature = "io-uring",
362 feature = "rt",
363 feature = "fs",
364 target_os = "linux",
365 ))]
366 self.enable_io_uring();
367
368 #[cfg(feature = "time")]
369 self.enable_time();
370
371 self
372 }
373
374 /// Enables the alternative timer implementation, which is disabled by default.
375 ///
376 /// The alternative timer implementation is an unstable feature that may
377 /// provide better performance on multi-threaded runtimes with a large number
378 /// of worker threads.
379 ///
380 /// This option only applies to multi-threaded runtimes. Attempting to use
381 /// this option with any other runtime type will have no effect.
382 ///
383 /// [Click here to share your experience with the alternative timer](https://github.com/tokio-rs/tokio/issues/7745)
384 ///
385 /// # Examples
386 ///
387 /// ```
388 /// # #[cfg(not(target_family = "wasm"))]
389 /// # {
390 /// use tokio::runtime;
391 ///
392 /// let rt = runtime::Builder::new_multi_thread()
393 /// .enable_alt_timer()
394 /// .build()
395 /// .unwrap();
396 /// # }
397 /// ```
398 #[cfg(all(tokio_unstable, feature = "time", feature = "rt-multi-thread"))]
399 #[cfg_attr(
400 docsrs,
401 doc(cfg(all(tokio_unstable, feature = "time", feature = "rt-multi-thread")))
402 )]
403 pub fn enable_alt_timer(&mut self) -> &mut Self {
404 self.enable_time();
405 self.timer_flavor = TimerFlavor::Alternative;
406 self
407 }
408
409 /// Sets the number of worker threads the `Runtime` will use.
410 ///
411 /// This can be any number above 0 though it is advised to keep this value
412 /// on the smaller side.
413 ///
414 /// This will override the value read from environment variable `TOKIO_WORKER_THREADS`.
415 ///
416 /// # Default
417 ///
418 /// The default value is the number of cores available to the system.
419 ///
420 /// When using the `current_thread` runtime this method has no effect.
421 ///
422 /// # Examples
423 ///
424 /// ## Multi threaded runtime with 4 threads
425 ///
426 /// ```
427 /// # #[cfg(not(target_family = "wasm"))]
428 /// # {
429 /// use tokio::runtime;
430 ///
431 /// // This will spawn a work-stealing runtime with 4 worker threads.
432 /// let rt = runtime::Builder::new_multi_thread()
433 /// .worker_threads(4)
434 /// .build()
435 /// .unwrap();
436 ///
437 /// rt.spawn(async move {});
438 /// # }
439 /// ```
440 ///
441 /// ## Current thread runtime (will only run on the current thread via `Runtime::block_on`)
442 ///
443 /// ```
444 /// use tokio::runtime;
445 ///
446 /// // Create a runtime that _must_ be driven from a call
447 /// // to `Runtime::block_on`.
448 /// let rt = runtime::Builder::new_current_thread()
449 /// .build()
450 /// .unwrap();
451 ///
452 /// // This will run the runtime and future on the current thread
453 /// rt.block_on(async move {});
454 /// ```
455 ///
456 /// # Panics
457 ///
458 /// This will panic if `val` is not larger than `0`.
459 #[track_caller]
460 pub fn worker_threads(&mut self, val: usize) -> &mut Self {
461 assert!(val > 0, "Worker threads cannot be set to 0");
462 self.worker_threads = Some(val);
463 self
464 }
465
466 /// Specifies the limit for additional threads spawned by the Runtime.
467 ///
468 /// These threads are used for blocking operations like tasks spawned
469 /// through [`spawn_blocking`], this includes but is not limited to:
470 /// - [`fs`] operations
471 /// - dns resolution through [`ToSocketAddrs`]
472 /// - writing to [`Stdout`] or [`Stderr`]
473 /// - reading from [`Stdin`]
474 ///
475 /// Unlike the [`worker_threads`], they are not always active and will exit
476 /// if left idle for too long. You can change this timeout duration with [`thread_keep_alive`].
477 ///
478 /// It's recommended to not set this limit too low in order to avoid hanging on operations
479 /// requiring [`spawn_blocking`].
480 ///
481 /// The default value is 512.
482 ///
483 /// # Queue Behavior
484 ///
485 /// When a blocking task is submitted, it will be inserted into a queue. If available, one of
486 /// the idle threads will be notified to run the task. Otherwise, if the threshold set by this
487 /// method has not been reached, a new thread will be spawned. If no idle thread is available
488 /// and no more threads are allowed to be spawned, the task will remain in the queue until one
489 /// of the busy threads pick it up. Note that since the queue does not apply any backpressure,
490 /// it could potentially grow unbounded.
491 ///
492 /// # Panics
493 ///
494 /// This will panic if `val` is not larger than `0`.
495 ///
496 /// # Upgrading from 0.x
497 ///
498 /// In old versions `max_threads` limited both blocking and worker threads, but the
499 /// current `max_blocking_threads` does not include async worker threads in the count.
500 ///
501 /// [`spawn_blocking`]: fn@crate::task::spawn_blocking
502 /// [`fs`]: mod@crate::fs
503 /// [`ToSocketAddrs`]: trait@crate::net::ToSocketAddrs
504 /// [`Stdout`]: struct@crate::io::Stdout
505 /// [`Stdin`]: struct@crate::io::Stdin
506 /// [`Stderr`]: struct@crate::io::Stderr
507 /// [`worker_threads`]: Self::worker_threads
508 /// [`thread_keep_alive`]: Self::thread_keep_alive
509 #[track_caller]
510 #[cfg_attr(docsrs, doc(alias = "max_threads"))]
511 pub fn max_blocking_threads(&mut self, val: usize) -> &mut Self {
512 assert!(val > 0, "Max blocking threads cannot be set to 0");
513 self.max_blocking_threads = val;
514 self
515 }
516
517 /// Sets name of threads spawned by the `Runtime`'s thread pool.
518 ///
519 /// The default name is "tokio-runtime-worker".
520 ///
521 /// # Examples
522 ///
523 /// ```
524 /// # #[cfg(not(target_family = "wasm"))]
525 /// # {
526 /// # use tokio::runtime;
527 ///
528 /// # pub fn main() {
529 /// let rt = runtime::Builder::new_multi_thread()
530 /// .thread_name("my-pool")
531 /// .build();
532 /// # }
533 /// # }
534 /// ```
535 pub fn thread_name(&mut self, val: impl Into<String>) -> &mut Self {
536 let val = val.into();
537 self.thread_name = std::sync::Arc::new(move || val.clone());
538 self
539 }
540
541 /// Sets a function used to generate the name of threads spawned by the `Runtime`'s thread pool.
542 ///
543 /// The default name fn is `|| "tokio-runtime-worker".into()`.
544 ///
545 /// # Examples
546 ///
547 /// ```
548 /// # #[cfg(not(target_family = "wasm"))]
549 /// # {
550 /// # use tokio::runtime;
551 /// # use std::sync::atomic::{AtomicUsize, Ordering};
552 /// # pub fn main() {
553 /// let rt = runtime::Builder::new_multi_thread()
554 /// .thread_name_fn(|| {
555 /// static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
556 /// let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
557 /// format!("my-pool-{}", id)
558 /// })
559 /// .build();
560 /// # }
561 /// # }
562 /// ```
563 pub fn thread_name_fn<F>(&mut self, f: F) -> &mut Self
564 where
565 F: Fn() -> String + Send + Sync + 'static,
566 {
567 self.thread_name = std::sync::Arc::new(f);
568 self
569 }
570
571 /// Sets the stack size (in bytes) for worker threads.
572 ///
573 /// The actual stack size may be greater than this value if the platform
574 /// specifies minimal stack size.
575 ///
576 /// The default stack size for spawned threads is 2 MiB, though this
577 /// particular stack size is subject to change in the future.
578 ///
579 /// # Examples
580 ///
581 /// ```
582 /// # #[cfg(not(target_family = "wasm"))]
583 /// # {
584 /// # use tokio::runtime;
585 ///
586 /// # pub fn main() {
587 /// let rt = runtime::Builder::new_multi_thread()
588 /// .thread_stack_size(32 * 1024)
589 /// .build();
590 /// # }
591 /// # }
592 /// ```
593 pub fn thread_stack_size(&mut self, val: usize) -> &mut Self {
594 self.thread_stack_size = Some(val);
595 self
596 }
597
598 /// Executes function `f` after each thread is started but before it starts
599 /// doing work.
600 ///
601 /// This is intended for bookkeeping and monitoring use cases.
602 ///
603 /// # Examples
604 ///
605 /// ```
606 /// # #[cfg(not(target_family = "wasm"))]
607 /// # {
608 /// # use tokio::runtime;
609 /// # pub fn main() {
610 /// let runtime = runtime::Builder::new_multi_thread()
611 /// .on_thread_start(|| {
612 /// println!("thread started");
613 /// })
614 /// .build();
615 /// # }
616 /// # }
617 /// ```
618 #[cfg(not(loom))]
619 pub fn on_thread_start<F>(&mut self, f: F) -> &mut Self
620 where
621 F: Fn() + Send + Sync + 'static,
622 {
623 self.after_start = Some(std::sync::Arc::new(f));
624 self
625 }
626
627 /// Executes function `f` before each thread stops.
628 ///
629 /// This is intended for bookkeeping and monitoring use cases.
630 ///
631 /// # Examples
632 ///
633 /// ```
634 /// # #[cfg(not(target_family = "wasm"))]
635 /// {
636 /// # use tokio::runtime;
637 /// # pub fn main() {
638 /// let runtime = runtime::Builder::new_multi_thread()
639 /// .on_thread_stop(|| {
640 /// println!("thread stopping");
641 /// })
642 /// .build();
643 /// # }
644 /// # }
645 /// ```
646 #[cfg(not(loom))]
647 pub fn on_thread_stop<F>(&mut self, f: F) -> &mut Self
648 where
649 F: Fn() + Send + Sync + 'static,
650 {
651 self.before_stop = Some(std::sync::Arc::new(f));
652 self
653 }
654
655 /// Executes function `f` just before a thread is parked (goes idle).
656 /// `f` is called within the Tokio context, so functions like [`tokio::spawn`](crate::spawn)
657 /// can be called, and may result in this thread being unparked immediately.
658 ///
659 /// This can be used to start work only when the executor is idle, or for bookkeeping
660 /// and monitoring purposes.
661 ///
662 /// Note: There can only be one park callback for a runtime; calling this function
663 /// more than once replaces the last callback defined, rather than adding to it.
664 ///
665 /// # Examples
666 ///
667 /// ## Multithreaded executor
668 /// ```
669 /// # #[cfg(not(target_family = "wasm"))]
670 /// # {
671 /// # use std::sync::Arc;
672 /// # use std::sync::atomic::{AtomicBool, Ordering};
673 /// # use tokio::runtime;
674 /// # use tokio::sync::Barrier;
675 /// # pub fn main() {
676 /// let once = AtomicBool::new(true);
677 /// let barrier = Arc::new(Barrier::new(2));
678 ///
679 /// let runtime = runtime::Builder::new_multi_thread()
680 /// .worker_threads(1)
681 /// .on_thread_park({
682 /// let barrier = barrier.clone();
683 /// move || {
684 /// let barrier = barrier.clone();
685 /// if once.swap(false, Ordering::Relaxed) {
686 /// tokio::spawn(async move { barrier.wait().await; });
687 /// }
688 /// }
689 /// })
690 /// .build()
691 /// .unwrap();
692 ///
693 /// runtime.block_on(async {
694 /// barrier.wait().await;
695 /// })
696 /// # }
697 /// # }
698 /// ```
699 /// ## Current thread executor
700 /// ```
701 /// # use std::sync::Arc;
702 /// # use std::sync::atomic::{AtomicBool, Ordering};
703 /// # use tokio::runtime;
704 /// # use tokio::sync::Barrier;
705 /// # pub fn main() {
706 /// let once = AtomicBool::new(true);
707 /// let barrier = Arc::new(Barrier::new(2));
708 ///
709 /// let runtime = runtime::Builder::new_current_thread()
710 /// .on_thread_park({
711 /// let barrier = barrier.clone();
712 /// move || {
713 /// let barrier = barrier.clone();
714 /// if once.swap(false, Ordering::Relaxed) {
715 /// tokio::spawn(async move { barrier.wait().await; });
716 /// }
717 /// }
718 /// })
719 /// .build()
720 /// .unwrap();
721 ///
722 /// runtime.block_on(async {
723 /// barrier.wait().await;
724 /// })
725 /// # }
726 /// ```
727 #[cfg(not(loom))]
728 pub fn on_thread_park<F>(&mut self, f: F) -> &mut Self
729 where
730 F: Fn() + Send + Sync + 'static,
731 {
732 self.before_park = Some(std::sync::Arc::new(f));
733 self
734 }
735
736 /// Executes function `f` just after a thread unparks (starts executing tasks).
737 ///
738 /// This is intended for bookkeeping and monitoring use cases; note that work
739 /// in this callback will increase latencies when the application has allowed one or
740 /// more runtime threads to go idle.
741 ///
742 /// Note: There can only be one unpark callback for a runtime; calling this function
743 /// more than once replaces the last callback defined, rather than adding to it.
744 ///
745 /// # Examples
746 ///
747 /// ```
748 /// # #[cfg(not(target_family = "wasm"))]
749 /// # {
750 /// # use tokio::runtime;
751 /// # pub fn main() {
752 /// let runtime = runtime::Builder::new_multi_thread()
753 /// .on_thread_unpark(|| {
754 /// println!("thread unparking");
755 /// })
756 /// .build();
757 ///
758 /// runtime.unwrap().block_on(async {
759 /// tokio::task::yield_now().await;
760 /// println!("Hello from Tokio!");
761 /// })
762 /// # }
763 /// # }
764 /// ```
765 #[cfg(not(loom))]
766 pub fn on_thread_unpark<F>(&mut self, f: F) -> &mut Self
767 where
768 F: Fn() + Send + Sync + 'static,
769 {
770 self.after_unpark = Some(std::sync::Arc::new(f));
771 self
772 }
773
774 /// Executes function `f` just before a task is spawned.
775 ///
776 /// `f` is called within the Tokio context, so functions like
777 /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being
778 /// invoked immediately.
779 ///
780 /// This can be used for bookkeeping or monitoring purposes.
781 ///
782 /// Note: There can only be one spawn callback for a runtime; calling this function more
783 /// than once replaces the last callback defined, rather than adding to it.
784 ///
785 /// This *does not* support [`LocalSet`](crate::task::LocalSet) at this time.
786 ///
787 /// **Note**: This is an [unstable API][unstable]. The public API of this type
788 /// may break in 1.x releases. See [the documentation on unstable
789 /// features][unstable] for details.
790 ///
791 /// [unstable]: crate#unstable-features
792 ///
793 /// # Examples
794 ///
795 /// ```
796 /// # use tokio::runtime;
797 /// # pub fn main() {
798 /// let runtime = runtime::Builder::new_current_thread()
799 /// .on_task_spawn(|_| {
800 /// println!("spawning task");
801 /// })
802 /// .build()
803 /// .unwrap();
804 ///
805 /// runtime.block_on(async {
806 /// tokio::task::spawn(std::future::ready(()));
807 ///
808 /// for _ in 0..64 {
809 /// tokio::task::yield_now().await;
810 /// }
811 /// })
812 /// # }
813 /// ```
814 #[cfg(all(not(loom), tokio_unstable))]
815 #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
816 pub fn on_task_spawn<F>(&mut self, f: F) -> &mut Self
817 where
818 F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
819 {
820 self.before_spawn = Some(std::sync::Arc::new(f));
821 self
822 }
823
824 /// Executes function `f` just before a task is polled
825 ///
826 /// `f` is called within the Tokio context, so functions like
827 /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being
828 /// invoked immediately.
829 ///
830 /// **Note**: This is an [unstable API][unstable]. The public API of this type
831 /// may break in 1.x releases. See [the documentation on unstable
832 /// features][unstable] for details.
833 ///
834 /// [unstable]: crate#unstable-features
835 ///
836 /// # Examples
837 ///
838 /// ```
839 /// # #[cfg(not(target_family = "wasm"))]
840 /// # {
841 /// # use std::sync::{atomic::AtomicUsize, Arc};
842 /// # use tokio::task::yield_now;
843 /// # pub fn main() {
844 /// let poll_start_counter = Arc::new(AtomicUsize::new(0));
845 /// let poll_start = poll_start_counter.clone();
846 /// let rt = tokio::runtime::Builder::new_multi_thread()
847 /// .enable_all()
848 /// .on_before_task_poll(move |meta| {
849 /// println!("task {} is about to be polled", meta.id())
850 /// })
851 /// .build()
852 /// .unwrap();
853 /// let task = rt.spawn(async {
854 /// yield_now().await;
855 /// });
856 /// let _ = rt.block_on(task);
857 ///
858 /// # }
859 /// # }
860 /// ```
861 #[cfg(tokio_unstable)]
862 #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
863 pub fn on_before_task_poll<F>(&mut self, f: F) -> &mut Self
864 where
865 F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
866 {
867 self.before_poll = Some(std::sync::Arc::new(f));
868 self
869 }
870
871 /// Executes function `f` just after a task is polled
872 ///
873 /// `f` is called within the Tokio context, so functions like
874 /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being
875 /// invoked immediately.
876 ///
877 /// **Note**: This is an [unstable API][unstable]. The public API of this type
878 /// may break in 1.x releases. See [the documentation on unstable
879 /// features][unstable] for details.
880 ///
881 /// [unstable]: crate#unstable-features
882 ///
883 /// # Examples
884 ///
885 /// ```
886 /// # #[cfg(not(target_family = "wasm"))]
887 /// # {
888 /// # use std::sync::{atomic::AtomicUsize, Arc};
889 /// # use tokio::task::yield_now;
890 /// # pub fn main() {
891 /// let poll_stop_counter = Arc::new(AtomicUsize::new(0));
892 /// let poll_stop = poll_stop_counter.clone();
893 /// let rt = tokio::runtime::Builder::new_multi_thread()
894 /// .enable_all()
895 /// .on_after_task_poll(move |meta| {
896 /// println!("task {} completed polling", meta.id());
897 /// })
898 /// .build()
899 /// .unwrap();
900 /// let task = rt.spawn(async {
901 /// yield_now().await;
902 /// });
903 /// let _ = rt.block_on(task);
904 ///
905 /// # }
906 /// # }
907 /// ```
908 #[cfg(tokio_unstable)]
909 #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
910 pub fn on_after_task_poll<F>(&mut self, f: F) -> &mut Self
911 where
912 F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
913 {
914 self.after_poll = Some(std::sync::Arc::new(f));
915 self
916 }
917
918 /// Executes function `f` just after a task is terminated.
919 ///
920 /// `f` is called within the Tokio context, so functions like
921 /// [`tokio::spawn`](crate::spawn) can be called.
922 ///
923 /// This can be used for bookkeeping or monitoring purposes.
924 ///
925 /// Note: There can only be one task termination callback for a runtime; calling this
926 /// function more than once replaces the last callback defined, rather than adding to it.
927 ///
928 /// This *does not* support [`LocalSet`](crate::task::LocalSet) at this time.
929 ///
930 /// **Note**: This is an [unstable API][unstable]. The public API of this type
931 /// may break in 1.x releases. See [the documentation on unstable
932 /// features][unstable] for details.
933 ///
934 /// [unstable]: crate#unstable-features
935 ///
936 /// # Examples
937 ///
938 /// ```
939 /// # use tokio::runtime;
940 /// # pub fn main() {
941 /// let runtime = runtime::Builder::new_current_thread()
942 /// .on_task_terminate(|_| {
943 /// println!("killing task");
944 /// })
945 /// .build()
946 /// .unwrap();
947 ///
948 /// runtime.block_on(async {
949 /// tokio::task::spawn(std::future::ready(()));
950 ///
951 /// for _ in 0..64 {
952 /// tokio::task::yield_now().await;
953 /// }
954 /// })
955 /// # }
956 /// ```
957 #[cfg(all(not(loom), tokio_unstable))]
958 #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
959 pub fn on_task_terminate<F>(&mut self, f: F) -> &mut Self
960 where
961 F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
962 {
963 self.after_termination = Some(std::sync::Arc::new(f));
964 self
965 }
966
967 /// Creates the configured `Runtime`.
968 ///
969 /// The returned `Runtime` instance is ready to spawn tasks.
970 ///
971 /// # Examples
972 ///
973 /// ```
974 /// # #[cfg(not(target_family = "wasm"))]
975 /// # {
976 /// use tokio::runtime::Builder;
977 ///
978 /// let rt = Builder::new_multi_thread().build().unwrap();
979 ///
980 /// rt.block_on(async {
981 /// println!("Hello from the Tokio runtime");
982 /// });
983 /// # }
984 /// ```
985 pub fn build(&mut self) -> io::Result<Runtime> {
986 match &self.kind {
987 Kind::CurrentThread => self.build_current_thread_runtime(),
988 #[cfg(feature = "rt-multi-thread")]
989 Kind::MultiThread => self.build_threaded_runtime(),
990 }
991 }
992
993 /// Creates the configured [`LocalRuntime`].
994 ///
995 /// The returned [`LocalRuntime`] instance is ready to spawn tasks.
996 ///
997 /// # Panics
998 ///
999 /// This will panic if the runtime is configured with [`new_multi_thread()`].
1000 ///
1001 /// [`new_multi_thread()`]: Builder::new_multi_thread
1002 ///
1003 /// # Examples
1004 ///
1005 /// ```
1006 /// use tokio::runtime::{Builder, LocalOptions};
1007 ///
1008 /// let rt = Builder::new_current_thread()
1009 /// .build_local(LocalOptions::default())
1010 /// .unwrap();
1011 ///
1012 /// rt.spawn_local(async {
1013 /// println!("Hello from the Tokio runtime");
1014 /// });
1015 /// ```
1016 #[allow(unused_variables, unreachable_patterns)]
1017 #[cfg(tokio_unstable)]
1018 #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
1019 pub fn build_local(&mut self, options: LocalOptions) -> io::Result<LocalRuntime> {
1020 match &self.kind {
1021 Kind::CurrentThread => self.build_current_thread_local_runtime(),
1022 #[cfg(feature = "rt-multi-thread")]
1023 Kind::MultiThread => panic!("multi_thread is not supported for LocalRuntime"),
1024 }
1025 }
1026
1027 fn get_cfg(&self) -> driver::Cfg {
1028 driver::Cfg {
1029 enable_pause_time: match self.kind {
1030 Kind::CurrentThread => true,
1031 #[cfg(feature = "rt-multi-thread")]
1032 Kind::MultiThread => false,
1033 },
1034 enable_io: self.enable_io,
1035 enable_time: self.enable_time,
1036 start_paused: self.start_paused,
1037 nevents: self.nevents,
1038 timer_flavor: self.timer_flavor,
1039 }
1040 }
1041
1042 /// Sets a custom timeout for a thread in the blocking pool.
1043 ///
1044 /// By default, the timeout for a thread is set to 10 seconds. This can
1045 /// be overridden using `.thread_keep_alive()`.
1046 ///
1047 /// # Example
1048 ///
1049 /// ```
1050 /// # #[cfg(not(target_family = "wasm"))]
1051 /// # {
1052 /// # use tokio::runtime;
1053 /// # use std::time::Duration;
1054 /// # pub fn main() {
1055 /// let rt = runtime::Builder::new_multi_thread()
1056 /// .thread_keep_alive(Duration::from_millis(100))
1057 /// .build();
1058 /// # }
1059 /// # }
1060 /// ```
1061 pub fn thread_keep_alive(&mut self, duration: Duration) -> &mut Self {
1062 self.keep_alive = Some(duration);
1063 self
1064 }
1065
1066 /// Sets the number of scheduler ticks after which the scheduler will poll the global
1067 /// task queue.
1068 ///
1069 /// A scheduler "tick" roughly corresponds to one `poll` invocation on a task.
1070 ///
1071 /// By default the global queue interval is 31 for the current-thread scheduler. Please see
1072 /// [the module documentation] for the default behavior of the multi-thread scheduler.
1073 ///
1074 /// Schedulers have a local queue of already-claimed tasks, and a global queue of incoming
1075 /// tasks. Setting the interval to a smaller value increases the fairness of the scheduler,
1076 /// at the cost of more synchronization overhead. That can be beneficial for prioritizing
1077 /// getting started on new work, especially if tasks frequently yield rather than complete
1078 /// or await on further I/O. Setting the interval to `1` will prioritize the global queue and
1079 /// tasks from the local queue will be executed only if the global queue is empty.
1080 /// Conversely, a higher value prioritizes existing work, and is a good choice when most
1081 /// tasks quickly complete polling.
1082 ///
1083 /// [the module documentation]: crate::runtime#multi-threaded-runtime-behavior-at-the-time-of-writing
1084 ///
1085 /// # Panics
1086 ///
1087 /// This function will panic if 0 is passed as an argument.
1088 ///
1089 /// # Examples
1090 ///
1091 /// ```
1092 /// # #[cfg(not(target_family = "wasm"))]
1093 /// # {
1094 /// # use tokio::runtime;
1095 /// # pub fn main() {
1096 /// let rt = runtime::Builder::new_multi_thread()
1097 /// .global_queue_interval(31)
1098 /// .build();
1099 /// # }
1100 /// # }
1101 /// ```
1102 #[track_caller]
1103 pub fn global_queue_interval(&mut self, val: u32) -> &mut Self {
1104 assert!(val > 0, "global_queue_interval must be greater than 0");
1105 self.global_queue_interval = Some(val);
1106 self
1107 }
1108
1109 /// Sets the number of scheduler ticks after which the scheduler will poll for
1110 /// external events (timers, I/O, and so on).
1111 ///
1112 /// A scheduler "tick" roughly corresponds to one `poll` invocation on a task.
1113 ///
1114 /// By default, the event interval is `61` for all scheduler types.
1115 ///
1116 /// Setting the event interval determines the effective "priority" of delivering
1117 /// these external events (which may wake up additional tasks), compared to
1118 /// executing tasks that are currently ready to run. A smaller value is useful
1119 /// when tasks frequently spend a long time in polling, or frequently yield,
1120 /// which can result in overly long delays picking up I/O events. Conversely,
1121 /// picking up new events requires extra synchronization and syscall overhead,
1122 /// so if tasks generally complete their polling quickly, a higher event interval
1123 /// will minimize that overhead while still keeping the scheduler responsive to
1124 /// events.
1125 ///
1126 /// # Panics
1127 ///
1128 /// This function will panic if 0 is passed as an argument.
1129 ///
1130 /// # Examples
1131 ///
1132 /// ```
1133 /// # #[cfg(not(target_family = "wasm"))]
1134 /// # {
1135 /// # use tokio::runtime;
1136 /// # pub fn main() {
1137 /// let rt = runtime::Builder::new_multi_thread()
1138 /// .event_interval(31)
1139 /// .build();
1140 /// # }
1141 /// # }
1142 /// ```
1143 #[track_caller]
1144 pub fn event_interval(&mut self, val: u32) -> &mut Self {
1145 assert!(val > 0, "event_interval must be greater than 0");
1146 self.event_interval = val;
1147 self
1148 }
1149
1150 cfg_unstable! {
1151 /// Configure how the runtime responds to an unhandled panic on a
1152 /// spawned task.
1153 ///
1154 /// By default, an unhandled panic (i.e. a panic not caught by
1155 /// [`std::panic::catch_unwind`]) has no impact on the runtime's
1156 /// execution. The panic's error value is forwarded to the task's
1157 /// [`JoinHandle`] and all other spawned tasks continue running.
1158 ///
1159 /// The `unhandled_panic` option enables configuring this behavior.
1160 ///
1161 /// * `UnhandledPanic::Ignore` is the default behavior. Panics on
1162 /// spawned tasks have no impact on the runtime's execution.
1163 /// * `UnhandledPanic::ShutdownRuntime` will force the runtime to
1164 /// shutdown immediately when a spawned task panics even if that
1165 /// task's `JoinHandle` has not been dropped. All other spawned tasks
1166 /// will immediately terminate and further calls to
1167 /// [`Runtime::block_on`] will panic.
1168 ///
1169 /// # Panics
1170 /// This method panics if called with [`UnhandledPanic::ShutdownRuntime`]
1171 /// on a runtime other than the current thread runtime.
1172 ///
1173 /// # Unstable
1174 ///
1175 /// This option is currently unstable and its implementation is
1176 /// incomplete. The API may change or be removed in the future. See
1177 /// issue [tokio-rs/tokio#4516] for more details.
1178 ///
1179 /// # Examples
1180 ///
1181 /// The following demonstrates a runtime configured to shutdown on
1182 /// panic. The first spawned task panics and results in the runtime
1183 /// shutting down. The second spawned task never has a chance to
1184 /// execute. The call to `block_on` will panic due to the runtime being
1185 /// forcibly shutdown.
1186 ///
1187 /// ```should_panic
1188 /// use tokio::runtime::{self, UnhandledPanic};
1189 ///
1190 /// # pub fn main() {
1191 /// let rt = runtime::Builder::new_current_thread()
1192 /// .unhandled_panic(UnhandledPanic::ShutdownRuntime)
1193 /// .build()
1194 /// .unwrap();
1195 ///
1196 /// rt.spawn(async { panic!("boom"); });
1197 /// rt.spawn(async {
1198 /// // This task never completes.
1199 /// });
1200 ///
1201 /// rt.block_on(async {
1202 /// // Do some work
1203 /// # loop { tokio::task::yield_now().await; }
1204 /// })
1205 /// # }
1206 /// ```
1207 ///
1208 /// [`JoinHandle`]: struct@crate::task::JoinHandle
1209 /// [tokio-rs/tokio#4516]: https://github.com/tokio-rs/tokio/issues/4516
1210 pub fn unhandled_panic(&mut self, behavior: UnhandledPanic) -> &mut Self {
1211 if !matches!(self.kind, Kind::CurrentThread) && matches!(behavior, UnhandledPanic::ShutdownRuntime) {
1212 panic!("UnhandledPanic::ShutdownRuntime is only supported in current thread runtime");
1213 }
1214
1215 self.unhandled_panic = behavior;
1216 self
1217 }
1218
1219 /// Disables the LIFO task scheduler heuristic.
1220 ///
1221 /// The multi-threaded scheduler includes a heuristic for optimizing
1222 /// message-passing patterns. This heuristic results in the **last**
1223 /// scheduled task being polled first.
1224 ///
1225 /// To implement this heuristic, each worker thread has a slot which
1226 /// holds the task that should be polled next. However, this slot cannot
1227 /// be stolen by other worker threads, which can result in lower total
1228 /// throughput when tasks tend to have longer poll times.
1229 ///
1230 /// This configuration option will disable this heuristic resulting in
1231 /// all scheduled tasks being pushed into the worker-local queue, which
1232 /// is stealable.
1233 ///
1234 /// Consider trying this option when the task "scheduled" time is high
1235 /// but the runtime is underutilized. Use [tokio-rs/tokio-metrics] to
1236 /// collect this data.
1237 ///
1238 /// # Unstable
1239 ///
1240 /// This configuration option is considered a workaround for the LIFO
1241 /// slot not being stealable. When the slot becomes stealable, we will
1242 /// revisit whether or not this option is necessary. See
1243 /// issue [tokio-rs/tokio#4941].
1244 ///
1245 /// # Examples
1246 ///
1247 /// ```
1248 /// # #[cfg(not(target_family = "wasm"))]
1249 /// # {
1250 /// use tokio::runtime;
1251 ///
1252 /// let rt = runtime::Builder::new_multi_thread()
1253 /// .disable_lifo_slot()
1254 /// .build()
1255 /// .unwrap();
1256 /// # }
1257 /// ```
1258 ///
1259 /// [tokio-rs/tokio-metrics]: https://github.com/tokio-rs/tokio-metrics
1260 /// [tokio-rs/tokio#4941]: https://github.com/tokio-rs/tokio/issues/4941
1261 pub fn disable_lifo_slot(&mut self) -> &mut Self {
1262 self.disable_lifo_slot = true;
1263 self
1264 }
1265
1266 /// Specifies the random number generation seed to use within all
1267 /// threads associated with the runtime being built.
1268 ///
1269 /// This option is intended to make certain parts of the runtime
1270 /// deterministic (e.g. the [`tokio::select!`] macro). In the case of
1271 /// [`tokio::select!`] it will ensure that the order that branches are
1272 /// polled is deterministic.
1273 ///
1274 /// In addition to the code specifying `rng_seed` and interacting with
1275 /// the runtime, the internals of Tokio and the Rust compiler may affect
1276 /// the sequences of random numbers. In order to ensure repeatable
1277 /// results, the version of Tokio, the versions of all other
1278 /// dependencies that interact with Tokio, and the Rust compiler version
1279 /// should also all remain constant.
1280 ///
1281 /// # Examples
1282 ///
1283 /// ```
1284 /// # use tokio::runtime::{self, RngSeed};
1285 /// # pub fn main() {
1286 /// let seed = RngSeed::from_bytes(b"place your seed here");
1287 /// let rt = runtime::Builder::new_current_thread()
1288 /// .rng_seed(seed)
1289 /// .build();
1290 /// # }
1291 /// ```
1292 ///
1293 /// [`tokio::select!`]: crate::select
1294 pub fn rng_seed(&mut self, seed: RngSeed) -> &mut Self {
1295 self.seed_generator = RngSeedGenerator::new(seed);
1296 self
1297 }
1298 }
1299
1300 cfg_unstable_metrics! {
1301 /// Enables tracking the distribution of task poll times.
1302 ///
1303 /// Task poll times are not instrumented by default as doing so requires
1304 /// calling [`Instant::now()`] twice per task poll, which could add
1305 /// measurable overhead. Use the [`Handle::metrics()`] to access the
1306 /// metrics data.
1307 ///
1308 /// The histogram uses fixed bucket sizes. In other words, the histogram
1309 /// buckets are not dynamic based on input values. Use the
1310 /// `metrics_poll_time_histogram` builder methods to configure the
1311 /// histogram details.
1312 ///
1313 /// By default, a linear histogram with 10 buckets each 100 microseconds wide will be used.
1314 /// This has an extremely low memory footprint, but may not provide enough granularity. For
1315 /// better granularity with low memory usage, use [`metrics_poll_time_histogram_configuration()`]
1316 /// to select [`LogHistogram`] instead.
1317 ///
1318 /// # Examples
1319 ///
1320 /// ```
1321 /// # #[cfg(not(target_family = "wasm"))]
1322 /// # {
1323 /// use tokio::runtime;
1324 ///
1325 /// let rt = runtime::Builder::new_multi_thread()
1326 /// .enable_metrics_poll_time_histogram()
1327 /// .build()
1328 /// .unwrap();
1329 /// # // Test default values here
1330 /// # fn us(n: u64) -> std::time::Duration { std::time::Duration::from_micros(n) }
1331 /// # let m = rt.handle().metrics();
1332 /// # assert_eq!(m.poll_time_histogram_num_buckets(), 10);
1333 /// # assert_eq!(m.poll_time_histogram_bucket_range(0), us(0)..us(100));
1334 /// # assert_eq!(m.poll_time_histogram_bucket_range(1), us(100)..us(200));
1335 /// # }
1336 /// ```
1337 ///
1338 /// [`Handle::metrics()`]: crate::runtime::Handle::metrics
1339 /// [`Instant::now()`]: std::time::Instant::now
1340 /// [`LogHistogram`]: crate::runtime::LogHistogram
1341 /// [`metrics_poll_time_histogram_configuration()`]: Builder::metrics_poll_time_histogram_configuration
1342 pub fn enable_metrics_poll_time_histogram(&mut self) -> &mut Self {
1343 self.metrics_poll_count_histogram_enable = true;
1344 self
1345 }
1346
1347 /// Deprecated. Use [`enable_metrics_poll_time_histogram()`] instead.
1348 ///
1349 /// [`enable_metrics_poll_time_histogram()`]: Builder::enable_metrics_poll_time_histogram
1350 #[deprecated(note = "`poll_count_histogram` related methods have been renamed `poll_time_histogram` to better reflect their functionality.")]
1351 #[doc(hidden)]
1352 pub fn enable_metrics_poll_count_histogram(&mut self) -> &mut Self {
1353 self.enable_metrics_poll_time_histogram()
1354 }
1355
1356 /// Sets the histogram scale for tracking the distribution of task poll
1357 /// times.
1358 ///
1359 /// Tracking the distribution of task poll times can be done using a
1360 /// linear or log scale. When using linear scale, each histogram bucket
1361 /// will represent the same range of poll times. When using log scale,
1362 /// each histogram bucket will cover a range twice as big as the
1363 /// previous bucket.
1364 ///
1365 /// **Default:** linear scale.
1366 ///
1367 /// # Examples
1368 ///
1369 /// ```
1370 /// # #[cfg(not(target_family = "wasm"))]
1371 /// # {
1372 /// use tokio::runtime::{self, HistogramScale};
1373 ///
1374 /// # #[allow(deprecated)]
1375 /// let rt = runtime::Builder::new_multi_thread()
1376 /// .enable_metrics_poll_time_histogram()
1377 /// .metrics_poll_count_histogram_scale(HistogramScale::Log)
1378 /// .build()
1379 /// .unwrap();
1380 /// # }
1381 /// ```
1382 #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")]
1383 pub fn metrics_poll_count_histogram_scale(&mut self, histogram_scale: crate::runtime::HistogramScale) -> &mut Self {
1384 self.metrics_poll_count_histogram.legacy_mut(|b|b.scale = histogram_scale);
1385 self
1386 }
1387
1388 /// Configure the histogram for tracking poll times
1389 ///
1390 /// By default, a linear histogram with 10 buckets each 100 microseconds wide will be used.
1391 /// This has an extremely low memory footprint, but may not provide enough granularity. For
1392 /// better granularity with low memory usage, use [`LogHistogram`] instead.
1393 ///
1394 /// # Examples
1395 /// Configure a [`LogHistogram`] with [default configuration]:
1396 /// ```
1397 /// # #[cfg(not(target_family = "wasm"))]
1398 /// # {
1399 /// use tokio::runtime;
1400 /// use tokio::runtime::{HistogramConfiguration, LogHistogram};
1401 ///
1402 /// let rt = runtime::Builder::new_multi_thread()
1403 /// .enable_metrics_poll_time_histogram()
1404 /// .metrics_poll_time_histogram_configuration(
1405 /// HistogramConfiguration::log(LogHistogram::default())
1406 /// )
1407 /// .build()
1408 /// .unwrap();
1409 /// # }
1410 /// ```
1411 ///
1412 /// Configure a linear histogram with 100 buckets, each 10μs wide
1413 /// ```
1414 /// # #[cfg(not(target_family = "wasm"))]
1415 /// # {
1416 /// use tokio::runtime;
1417 /// use std::time::Duration;
1418 /// use tokio::runtime::HistogramConfiguration;
1419 ///
1420 /// let rt = runtime::Builder::new_multi_thread()
1421 /// .enable_metrics_poll_time_histogram()
1422 /// .metrics_poll_time_histogram_configuration(
1423 /// HistogramConfiguration::linear(Duration::from_micros(10), 100)
1424 /// )
1425 /// .build()
1426 /// .unwrap();
1427 /// # }
1428 /// ```
1429 ///
1430 /// Configure a [`LogHistogram`] with the following settings:
1431 /// - Measure times from 100ns to 120s
1432 /// - Max error of 0.1
1433 /// - No more than 1024 buckets
1434 /// ```
1435 /// # #[cfg(not(target_family = "wasm"))]
1436 /// # {
1437 /// use std::time::Duration;
1438 /// use tokio::runtime;
1439 /// use tokio::runtime::{HistogramConfiguration, LogHistogram};
1440 ///
1441 /// let rt = runtime::Builder::new_multi_thread()
1442 /// .enable_metrics_poll_time_histogram()
1443 /// .metrics_poll_time_histogram_configuration(
1444 /// HistogramConfiguration::log(LogHistogram::builder()
1445 /// .max_value(Duration::from_secs(120))
1446 /// .min_value(Duration::from_nanos(100))
1447 /// .max_error(0.1)
1448 /// .max_buckets(1024)
1449 /// .expect("configuration uses 488 buckets")
1450 /// )
1451 /// )
1452 /// .build()
1453 /// .unwrap();
1454 /// # }
1455 /// ```
1456 ///
1457 /// When migrating from the legacy histogram ([`HistogramScale::Log`]) and wanting
1458 /// to match the previous behavior, use `precision_exact(0)`. This creates a histogram
1459 /// where each bucket is twice the size of the previous bucket.
1460 /// ```rust
1461 /// use std::time::Duration;
1462 /// use tokio::runtime::{HistogramConfiguration, LogHistogram};
1463 /// let rt = tokio::runtime::Builder::new_current_thread()
1464 /// .enable_all()
1465 /// .enable_metrics_poll_time_histogram()
1466 /// .metrics_poll_time_histogram_configuration(HistogramConfiguration::log(
1467 /// LogHistogram::builder()
1468 /// .min_value(Duration::from_micros(20))
1469 /// .max_value(Duration::from_millis(4))
1470 /// // Set `precision_exact` to `0` to match `HistogramScale::Log`
1471 /// .precision_exact(0)
1472 /// .max_buckets(10)
1473 /// .unwrap(),
1474 /// ))
1475 /// .build()
1476 /// .unwrap();
1477 /// ```
1478 ///
1479 /// [`LogHistogram`]: crate::runtime::LogHistogram
1480 /// [default configuration]: crate::runtime::LogHistogramBuilder
1481 /// [`HistogramScale::Log`]: crate::runtime::HistogramScale::Log
1482 pub fn metrics_poll_time_histogram_configuration(&mut self, configuration: HistogramConfiguration) -> &mut Self {
1483 self.metrics_poll_count_histogram.histogram_type = configuration.inner;
1484 self
1485 }
1486
1487 /// Sets the histogram resolution for tracking the distribution of task
1488 /// poll times.
1489 ///
1490 /// The resolution is the histogram's first bucket's range. When using a
1491 /// linear histogram scale, each bucket will cover the same range. When
1492 /// using a log scale, each bucket will cover a range twice as big as
1493 /// the previous bucket. In the log case, the resolution represents the
1494 /// smallest bucket range.
1495 ///
1496 /// Note that, when using log scale, the resolution is rounded up to the
1497 /// nearest power of 2 in nanoseconds.
1498 ///
1499 /// **Default:** 100 microseconds.
1500 ///
1501 /// # Examples
1502 ///
1503 /// ```
1504 /// # #[cfg(not(target_family = "wasm"))]
1505 /// # {
1506 /// use tokio::runtime;
1507 /// use std::time::Duration;
1508 ///
1509 /// # #[allow(deprecated)]
1510 /// let rt = runtime::Builder::new_multi_thread()
1511 /// .enable_metrics_poll_time_histogram()
1512 /// .metrics_poll_count_histogram_resolution(Duration::from_micros(100))
1513 /// .build()
1514 /// .unwrap();
1515 /// # }
1516 /// ```
1517 #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")]
1518 pub fn metrics_poll_count_histogram_resolution(&mut self, resolution: Duration) -> &mut Self {
1519 assert!(resolution > Duration::from_secs(0));
1520 // Sanity check the argument and also make the cast below safe.
1521 assert!(resolution <= Duration::from_secs(1));
1522
1523 let resolution = resolution.as_nanos() as u64;
1524
1525 self.metrics_poll_count_histogram.legacy_mut(|b|b.resolution = resolution);
1526 self
1527 }
1528
1529 /// Sets the number of buckets for the histogram tracking the
1530 /// distribution of task poll times.
1531 ///
1532 /// The last bucket tracks all greater values that fall out of other
1533 /// ranges. So, configuring the histogram using a linear scale,
1534 /// resolution of 50ms, and 10 buckets, the 10th bucket will track task
1535 /// polls that take more than 450ms to complete.
1536 ///
1537 /// **Default:** 10
1538 ///
1539 /// # Examples
1540 ///
1541 /// ```
1542 /// # #[cfg(not(target_family = "wasm"))]
1543 /// # {
1544 /// use tokio::runtime;
1545 ///
1546 /// # #[allow(deprecated)]
1547 /// let rt = runtime::Builder::new_multi_thread()
1548 /// .enable_metrics_poll_time_histogram()
1549 /// .metrics_poll_count_histogram_buckets(15)
1550 /// .build()
1551 /// .unwrap();
1552 /// # }
1553 /// ```
1554 #[deprecated(note = "use `metrics_poll_time_histogram_configuration`")]
1555 pub fn metrics_poll_count_histogram_buckets(&mut self, buckets: usize) -> &mut Self {
1556 self.metrics_poll_count_histogram.legacy_mut(|b|b.num_buckets = buckets);
1557 self
1558 }
1559 }
1560
1561 fn build_current_thread_runtime(&mut self) -> io::Result<Runtime> {
1562 use crate::runtime::runtime::Scheduler;
1563
1564 let (scheduler, handle, blocking_pool) =
1565 self.build_current_thread_runtime_components(None)?;
1566
1567 Ok(Runtime::from_parts(
1568 Scheduler::CurrentThread(scheduler),
1569 handle,
1570 blocking_pool,
1571 ))
1572 }
1573
1574 #[cfg(tokio_unstable)]
1575 fn build_current_thread_local_runtime(&mut self) -> io::Result<LocalRuntime> {
1576 use crate::runtime::local_runtime::LocalRuntimeScheduler;
1577
1578 let tid = std::thread::current().id();
1579
1580 let (scheduler, handle, blocking_pool) =
1581 self.build_current_thread_runtime_components(Some(tid))?;
1582
1583 Ok(LocalRuntime::from_parts(
1584 LocalRuntimeScheduler::CurrentThread(scheduler),
1585 handle,
1586 blocking_pool,
1587 ))
1588 }
1589
1590 fn build_current_thread_runtime_components(
1591 &mut self,
1592 local_tid: Option<ThreadId>,
1593 ) -> io::Result<(CurrentThread, Handle, BlockingPool)> {
1594 use crate::runtime::scheduler;
1595 use crate::runtime::Config;
1596
1597 let mut cfg = self.get_cfg();
1598 cfg.timer_flavor = TimerFlavor::Traditional;
1599 let (driver, driver_handle) = driver::Driver::new(cfg)?;
1600
1601 // Blocking pool
1602 let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads);
1603 let blocking_spawner = blocking_pool.spawner().clone();
1604
1605 // Generate a rng seed for this runtime.
1606 let seed_generator_1 = self.seed_generator.next_generator();
1607 let seed_generator_2 = self.seed_generator.next_generator();
1608
1609 // And now put a single-threaded scheduler on top of the timer. When
1610 // there are no futures ready to do something, it'll let the timer or
1611 // the reactor to generate some new stimuli for the futures to continue
1612 // in their life.
1613 let (scheduler, handle) = CurrentThread::new(
1614 driver,
1615 driver_handle,
1616 blocking_spawner,
1617 seed_generator_2,
1618 Config {
1619 before_park: self.before_park.clone(),
1620 after_unpark: self.after_unpark.clone(),
1621 before_spawn: self.before_spawn.clone(),
1622 #[cfg(tokio_unstable)]
1623 before_poll: self.before_poll.clone(),
1624 #[cfg(tokio_unstable)]
1625 after_poll: self.after_poll.clone(),
1626 after_termination: self.after_termination.clone(),
1627 global_queue_interval: self.global_queue_interval,
1628 event_interval: self.event_interval,
1629 #[cfg(tokio_unstable)]
1630 unhandled_panic: self.unhandled_panic.clone(),
1631 disable_lifo_slot: self.disable_lifo_slot,
1632 seed_generator: seed_generator_1,
1633 metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
1634 },
1635 local_tid,
1636 );
1637
1638 let handle = Handle {
1639 inner: scheduler::Handle::CurrentThread(handle),
1640 };
1641
1642 Ok((scheduler, handle, blocking_pool))
1643 }
1644
1645 fn metrics_poll_count_histogram_builder(&self) -> Option<HistogramBuilder> {
1646 if self.metrics_poll_count_histogram_enable {
1647 Some(self.metrics_poll_count_histogram.clone())
1648 } else {
1649 None
1650 }
1651 }
1652}
1653
1654cfg_io_driver! {
1655 impl Builder {
1656 /// Enables the I/O driver.
1657 ///
1658 /// Doing this enables using net, process, signal, and some I/O types on
1659 /// the runtime.
1660 ///
1661 /// # Examples
1662 ///
1663 /// ```
1664 /// use tokio::runtime;
1665 ///
1666 /// let rt = runtime::Builder::new_multi_thread()
1667 /// .enable_io()
1668 /// .build()
1669 /// .unwrap();
1670 /// ```
1671 pub fn enable_io(&mut self) -> &mut Self {
1672 self.enable_io = true;
1673 self
1674 }
1675
1676 /// Enables the I/O driver and configures the max number of events to be
1677 /// processed per tick.
1678 ///
1679 /// # Examples
1680 ///
1681 /// ```
1682 /// use tokio::runtime;
1683 ///
1684 /// let rt = runtime::Builder::new_current_thread()
1685 /// .enable_io()
1686 /// .max_io_events_per_tick(1024)
1687 /// .build()
1688 /// .unwrap();
1689 /// ```
1690 pub fn max_io_events_per_tick(&mut self, capacity: usize) -> &mut Self {
1691 self.nevents = capacity;
1692 self
1693 }
1694 }
1695}
1696
1697cfg_time! {
1698 impl Builder {
1699 /// Enables the time driver.
1700 ///
1701 /// Doing this enables using `tokio::time` on the runtime.
1702 ///
1703 /// # Examples
1704 ///
1705 /// ```
1706 /// # #[cfg(not(target_family = "wasm"))]
1707 /// # {
1708 /// use tokio::runtime;
1709 ///
1710 /// let rt = runtime::Builder::new_multi_thread()
1711 /// .enable_time()
1712 /// .build()
1713 /// .unwrap();
1714 /// # }
1715 /// ```
1716 pub fn enable_time(&mut self) -> &mut Self {
1717 self.enable_time = true;
1718 self
1719 }
1720 }
1721}
1722
1723cfg_io_uring! {
1724 impl Builder {
1725 /// Enables the tokio's io_uring driver.
1726 ///
1727 /// Doing this enables using io_uring operations on the runtime.
1728 ///
1729 /// # Examples
1730 ///
1731 /// ```
1732 /// use tokio::runtime;
1733 ///
1734 /// let rt = runtime::Builder::new_multi_thread()
1735 /// .enable_io_uring()
1736 /// .build()
1737 /// .unwrap();
1738 /// ```
1739 #[cfg_attr(docsrs, doc(cfg(feature = "io-uring")))]
1740 pub fn enable_io_uring(&mut self) -> &mut Self {
1741 // Currently, the uring flag is equivalent to `enable_io`.
1742 self.enable_io = true;
1743 self
1744 }
1745 }
1746}
1747
1748cfg_test_util! {
1749 impl Builder {
1750 /// Controls if the runtime's clock starts paused or advancing.
1751 ///
1752 /// Pausing time requires the current-thread runtime; construction of
1753 /// the runtime will panic otherwise.
1754 ///
1755 /// # Examples
1756 ///
1757 /// ```
1758 /// use tokio::runtime;
1759 ///
1760 /// let rt = runtime::Builder::new_current_thread()
1761 /// .enable_time()
1762 /// .start_paused(true)
1763 /// .build()
1764 /// .unwrap();
1765 /// ```
1766 pub fn start_paused(&mut self, start_paused: bool) -> &mut Self {
1767 self.start_paused = start_paused;
1768 self
1769 }
1770 }
1771}
1772
1773cfg_rt_multi_thread! {
1774 impl Builder {
1775 fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
1776 use crate::loom::sys::num_cpus;
1777 use crate::runtime::{Config, runtime::Scheduler};
1778 use crate::runtime::scheduler::{self, MultiThread};
1779
1780 let worker_threads = self.worker_threads.unwrap_or_else(num_cpus);
1781
1782 let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?;
1783
1784 // Create the blocking pool
1785 let blocking_pool =
1786 blocking::create_blocking_pool(self, self.max_blocking_threads + worker_threads);
1787 let blocking_spawner = blocking_pool.spawner().clone();
1788
1789 // Generate a rng seed for this runtime.
1790 let seed_generator_1 = self.seed_generator.next_generator();
1791 let seed_generator_2 = self.seed_generator.next_generator();
1792
1793 let (scheduler, handle, launch) = MultiThread::new(
1794 worker_threads,
1795 driver,
1796 driver_handle,
1797 blocking_spawner,
1798 seed_generator_2,
1799 Config {
1800 before_park: self.before_park.clone(),
1801 after_unpark: self.after_unpark.clone(),
1802 before_spawn: self.before_spawn.clone(),
1803 #[cfg(tokio_unstable)]
1804 before_poll: self.before_poll.clone(),
1805 #[cfg(tokio_unstable)]
1806 after_poll: self.after_poll.clone(),
1807 after_termination: self.after_termination.clone(),
1808 global_queue_interval: self.global_queue_interval,
1809 event_interval: self.event_interval,
1810 #[cfg(tokio_unstable)]
1811 unhandled_panic: self.unhandled_panic.clone(),
1812 disable_lifo_slot: self.disable_lifo_slot,
1813 seed_generator: seed_generator_1,
1814 metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
1815 },
1816 self.timer_flavor,
1817 );
1818
1819 let handle = Handle { inner: scheduler::Handle::MultiThread(handle) };
1820
1821 // Spawn the thread pool workers
1822 let _enter = handle.enter();
1823 launch.launch();
1824
1825 Ok(Runtime::from_parts(Scheduler::MultiThread(scheduler), handle, blocking_pool))
1826 }
1827 }
1828}
1829
1830impl fmt::Debug for Builder {
1831 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1832 fmt.debug_struct("Builder")
1833 .field("worker_threads", &self.worker_threads)
1834 .field("max_blocking_threads", &self.max_blocking_threads)
1835 .field(
1836 "thread_name",
1837 &"<dyn Fn() -> String + Send + Sync + 'static>",
1838 )
1839 .field("thread_stack_size", &self.thread_stack_size)
1840 .field("after_start", &self.after_start.as_ref().map(|_| "..."))
1841 .field("before_stop", &self.before_stop.as_ref().map(|_| "..."))
1842 .field("before_park", &self.before_park.as_ref().map(|_| "..."))
1843 .field("after_unpark", &self.after_unpark.as_ref().map(|_| "..."))
1844 .finish()
1845 }
1846}