tokio/runtime/task/trace/
mod.rs1use crate::loom::sync::Arc;
2use crate::runtime::context;
3use crate::runtime::scheduler::{self, current_thread, Inject};
4use crate::task::Id;
5
6use backtrace::BacktraceFrame;
7use std::cell::Cell;
8use std::collections::VecDeque;
9use std::ffi::c_void;
10use std::fmt;
11use std::future::Future;
12use std::pin::Pin;
13use std::ptr::{self, NonNull};
14use std::task::{self, Poll};
15
16mod symbol;
17mod tree;
18
19use symbol::Symbol;
20use tree::Tree;
21
22use super::{Notified, OwnedTasks, Schedule};
23
24type Backtrace = Vec<BacktraceFrame>;
25type SymbolTrace = Vec<Symbol>;
26
27pub(crate) struct Context {
29 active_frame: Cell<Option<NonNull<Frame>>>,
32 collector: Cell<Option<Trace>>,
34}
35
36struct Frame {
38 inner_addr: *const c_void,
40
41 parent: Option<NonNull<Frame>>,
43}
44
45#[derive(Clone, Debug)]
50pub(crate) struct Trace {
51 backtraces: Vec<Backtrace>,
54}
55
56pin_project_lite::pin_project! {
57 #[derive(Debug, Clone)]
58 #[must_use = "futures do nothing unless you `.await` or poll them"]
59 pub struct Root<T> {
61 #[pin]
62 future: T,
63 }
64}
65
66const FAIL_NO_THREAD_LOCAL: &str = "The Tokio thread-local has been destroyed \
67 as part of shutting down the current \
68 thread, so collecting a taskdump is not \
69 possible.";
70
71impl Context {
72 pub(crate) const fn new() -> Self {
73 Context {
74 active_frame: Cell::new(None),
75 collector: Cell::new(None),
76 }
77 }
78
79 unsafe fn try_with_current<F, R>(f: F) -> Option<R>
82 where
83 F: FnOnce(&Self) -> R,
84 {
85 unsafe { crate::runtime::context::with_trace(f) }
86 }
87
88 unsafe fn with_current_frame<F, R>(f: F) -> R
91 where
92 F: FnOnce(&Cell<Option<NonNull<Frame>>>) -> R,
93 {
94 unsafe {
95 Self::try_with_current(|context| f(&context.active_frame)).expect(FAIL_NO_THREAD_LOCAL)
96 }
97 }
98
99 fn with_current_collector<F, R>(f: F) -> R
100 where
101 F: FnOnce(&Cell<Option<Trace>>) -> R,
102 {
103 unsafe {
106 Self::try_with_current(|context| f(&context.collector)).expect(FAIL_NO_THREAD_LOCAL)
107 }
108 }
109
110 pub(crate) fn is_tracing() -> bool {
112 Self::with_current_collector(|maybe_collector| {
113 let collector = maybe_collector.take();
114 let result = collector.is_some();
115 maybe_collector.set(collector);
116 result
117 })
118 }
119}
120
121impl Trace {
122 #[inline(never)]
125 pub(crate) fn capture<F, R>(f: F) -> (R, Trace)
126 where
127 F: FnOnce() -> R,
128 {
129 let collector = Trace { backtraces: vec![] };
130
131 let previous = Context::with_current_collector(|current| current.replace(Some(collector)));
132
133 let result = f();
134
135 let collector =
136 Context::with_current_collector(|current| current.replace(previous)).unwrap();
137
138 (result, collector)
139 }
140
141 #[inline(never)]
143 pub(crate) fn root<F>(future: F) -> Root<F> {
144 Root { future }
145 }
146
147 pub(crate) fn backtraces(&self) -> &[Backtrace] {
148 &self.backtraces
149 }
150}
151
152#[inline(never)]
162pub(crate) fn trace_leaf(cx: &mut task::Context<'_>) -> Poll<()> {
163 let did_trace = unsafe {
165 Context::try_with_current(|context_cell| {
166 if let Some(mut collector) = context_cell.collector.take() {
167 let mut frames = vec![];
168 let mut above_leaf = false;
169
170 if let Some(active_frame) = context_cell.active_frame.get() {
171 let active_frame = active_frame.as_ref();
172
173 backtrace::trace(|frame| {
174 let below_root = !ptr::eq(frame.symbol_address(), active_frame.inner_addr);
175
176 if above_leaf && below_root {
179 frames.push(frame.to_owned().into());
180 }
181
182 if ptr::eq(frame.symbol_address(), trace_leaf as *const _) {
183 above_leaf = true;
184 }
185
186 below_root
188 });
189 }
190 collector.backtraces.push(frames);
191 context_cell.collector.set(Some(collector));
192 true
193 } else {
194 false
195 }
196 })
197 .unwrap_or(false)
198 };
199
200 if did_trace {
201 context::with_scheduler(|scheduler| {
204 if let Some(scheduler) = scheduler {
205 match scheduler {
206 scheduler::Context::CurrentThread(s) => s.defer.defer(cx.waker()),
207 #[cfg(feature = "rt-multi-thread")]
208 scheduler::Context::MultiThread(s) => s.defer.defer(cx.waker()),
209 }
210 }
211 });
212
213 Poll::Pending
214 } else {
215 Poll::Ready(())
216 }
217}
218
219impl fmt::Display for Trace {
220 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
221 Tree::from_trace(self.clone()).fmt(f)
222 }
223}
224
225fn defer<F: FnOnce() -> R, R>(f: F) -> impl Drop {
226 use std::mem::ManuallyDrop;
227
228 struct Defer<F: FnOnce() -> R, R>(ManuallyDrop<F>);
229
230 impl<F: FnOnce() -> R, R> Drop for Defer<F, R> {
231 #[inline(always)]
232 fn drop(&mut self) {
233 unsafe {
234 ManuallyDrop::take(&mut self.0)();
235 }
236 }
237 }
238
239 Defer(ManuallyDrop::new(f))
240}
241
242impl<T: Future> Future for Root<T> {
243 type Output = T::Output;
244
245 #[inline(never)]
246 fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
247 unsafe {
250 let mut frame = Frame {
251 inner_addr: Self::poll as *const c_void,
252 parent: None,
253 };
254
255 Context::with_current_frame(|current| {
256 frame.parent = current.take();
257 current.set(Some(NonNull::from(&frame)));
258 });
259
260 let _restore = defer(|| {
261 Context::with_current_frame(|current| {
262 current.set(frame.parent);
263 });
264 });
265
266 let this = self.project();
267 this.future.poll(cx)
268 }
269 }
270}
271
272pub(in crate::runtime) fn trace_current_thread(
274 owned: &OwnedTasks<Arc<current_thread::Handle>>,
275 local: &mut VecDeque<Notified<Arc<current_thread::Handle>>>,
276 injection: &Inject<Arc<current_thread::Handle>>,
277) -> Vec<(Id, Trace)> {
278 let mut dequeued = Vec::new();
281
282 while let Some(task) = local.pop_back() {
283 dequeued.push(task);
284 }
285
286 while let Some(task) = injection.pop() {
287 dequeued.push(task);
288 }
289
290 trace_owned(owned, dequeued)
292}
293
294cfg_rt_multi_thread! {
295 use crate::loom::sync::Mutex;
296 use crate::runtime::scheduler::multi_thread;
297 use crate::runtime::scheduler::multi_thread::Synced;
298 use crate::runtime::scheduler::inject::Shared;
299
300 pub(in crate::runtime) unsafe fn trace_multi_thread(
306 owned: &OwnedTasks<Arc<multi_thread::Handle>>,
307 local: &mut multi_thread::queue::Local<Arc<multi_thread::Handle>>,
308 synced: &Mutex<Synced>,
309 injection: &Shared<Arc<multi_thread::Handle>>,
310 ) -> Vec<(Id, Trace)> {
311 let mut dequeued = Vec::new();
312
313 while let Some(notified) = local.pop() {
315 dequeued.push(notified);
316 }
317
318 let mut synced = synced.lock();
320 while let Some(notified) = unsafe { injection.pop(&mut synced.inject) } {
322 dequeued.push(notified);
323 }
324
325 drop(synced);
326
327 trace_owned(owned, dequeued)
330 }
331}
332
333fn trace_owned<S: Schedule>(owned: &OwnedTasks<S>, dequeued: Vec<Notified<S>>) -> Vec<(Id, Trace)> {
340 let mut tasks = dequeued;
341 owned.for_each(|task| {
344 if let Some(notified) = task.notify_for_tracing() {
348 tasks.push(notified);
349 }
350 });
354
355 tasks
356 .into_iter()
357 .map(|task| {
358 let local_notified = owned.assert_owner(task);
359 let id = local_notified.task.id();
360 let ((), trace) = Trace::capture(|| local_notified.run());
361 (id, trace)
362 })
363 .collect()
364}