tokio/runtime/runtime.rs
1use super::BOX_FUTURE_THRESHOLD;
2use crate::runtime::blocking::BlockingPool;
3use crate::runtime::scheduler::CurrentThread;
4use crate::runtime::{context, EnterGuard, Handle};
5use crate::task::JoinHandle;
6use crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR;
7use crate::util::trace::SpawnMeta;
8
9use std::future::Future;
10use std::io;
11use std::mem;
12use std::time::Duration;
13
14cfg_rt_multi_thread! {
15 use crate::runtime::Builder;
16 use crate::runtime::scheduler::MultiThread;
17}
18
19/// The Tokio runtime.
20///
21/// The runtime provides an I/O driver, task scheduler, [timer], and
22/// blocking pool, necessary for running asynchronous tasks.
23///
24/// Instances of `Runtime` can be created using [`new`], or [`Builder`].
25/// However, most users will use the [`#[tokio::main]`][main] annotation on
26/// their entry point instead.
27///
28/// See [module level][mod] documentation for more details.
29///
30/// # Shutdown
31///
32/// Shutting down the runtime is done by dropping the value, or calling
33/// [`shutdown_background`] or [`shutdown_timeout`].
34///
35/// Tasks spawned through [`Runtime::spawn`] keep running until they yield.
36/// Then they are dropped. They are not *guaranteed* to run to completion, but
37/// *might* do so if they do not yield until completion.
38///
39/// Blocking functions spawned through [`Runtime::spawn_blocking`] keep running
40/// until they return.
41///
42/// The thread initiating the shutdown blocks until all spawned work has been
43/// stopped. This can take an indefinite amount of time. The `Drop`
44/// implementation waits forever for this.
45///
46/// The [`shutdown_background`] and [`shutdown_timeout`] methods can be used if
47/// waiting forever is undesired. When the timeout is reached, spawned work that
48/// did not stop in time and threads running it are leaked. The work continues
49/// to run until one of the stopping conditions is fulfilled, but the thread
50/// initiating the shutdown is unblocked.
51///
52/// Once the runtime has been dropped, any outstanding I/O resources bound to
53/// it will no longer function. Calling any method on them will result in an
54/// error.
55///
56/// # Sharing
57///
58/// There are several ways to establish shared access to a Tokio runtime:
59///
60/// * Using an <code>[Arc]\<Runtime></code>.
61/// * Using a [`Handle`].
62/// * Entering the runtime context.
63///
64/// Using an <code>[Arc]\<Runtime></code> or [`Handle`] allows you to do various
65/// things with the runtime such as spawning new tasks or entering the runtime
66/// context. Both types can be cloned to create a new handle that allows access
67/// to the same runtime. By passing clones into different tasks or threads, you
68/// will be able to access the runtime from those tasks or threads.
69///
70/// The difference between <code>[Arc]\<Runtime></code> and [`Handle`] is that
71/// an <code>[Arc]\<Runtime></code> will prevent the runtime from shutting down,
72/// whereas a [`Handle`] does not prevent that. This is because shutdown of the
73/// runtime happens when the destructor of the `Runtime` object runs.
74///
75/// Calls to [`shutdown_background`] and [`shutdown_timeout`] require exclusive
76/// ownership of the `Runtime` type. When using an <code>[Arc]\<Runtime></code>,
77/// this can be achieved via [`Arc::try_unwrap`] when only one strong count
78/// reference is left over.
79///
80/// The runtime context is entered using the [`Runtime::enter`] or
81/// [`Handle::enter`] methods, which use a thread-local variable to store the
82/// current runtime. Whenever you are inside the runtime context, methods such
83/// as [`tokio::spawn`] will use the runtime whose context you are inside.
84///
85/// [timer]: crate::time
86/// [mod]: index.html
87/// [`new`]: method@Self::new
88/// [`Builder`]: struct@Builder
89/// [`Handle`]: struct@Handle
90/// [main]: macro@crate::main
91/// [`tokio::spawn`]: crate::spawn
92/// [`Arc::try_unwrap`]: std::sync::Arc::try_unwrap
93/// [Arc]: std::sync::Arc
94/// [`shutdown_background`]: method@Runtime::shutdown_background
95/// [`shutdown_timeout`]: method@Runtime::shutdown_timeout
96#[derive(Debug)]
97pub struct Runtime {
98 /// Task scheduler
99 scheduler: Scheduler,
100
101 /// Handle to runtime, also contains driver handles
102 handle: Handle,
103
104 /// Blocking pool handle, used to signal shutdown
105 blocking_pool: BlockingPool,
106}
107
108/// The flavor of a `Runtime`.
109///
110/// This is the return type for [`Handle::runtime_flavor`](crate::runtime::Handle::runtime_flavor()).
111#[derive(Debug, PartialEq, Eq)]
112#[non_exhaustive]
113pub enum RuntimeFlavor {
114 /// The flavor that executes all tasks on the current thread.
115 CurrentThread,
116 /// The flavor that executes tasks across multiple threads.
117 MultiThread,
118}
119
120/// The runtime scheduler is either a multi-thread or a current-thread executor.
121#[derive(Debug)]
122pub(super) enum Scheduler {
123 /// Execute all tasks on the current-thread.
124 CurrentThread(CurrentThread),
125
126 /// Execute tasks across multiple threads.
127 #[cfg(feature = "rt-multi-thread")]
128 MultiThread(MultiThread),
129}
130
131impl Runtime {
132 pub(super) fn from_parts(
133 scheduler: Scheduler,
134 handle: Handle,
135 blocking_pool: BlockingPool,
136 ) -> Runtime {
137 Runtime {
138 scheduler,
139 handle,
140 blocking_pool,
141 }
142 }
143
144 /// Creates a new runtime instance with default configuration values.
145 ///
146 /// This results in the multi threaded scheduler, I/O driver, and time driver being
147 /// initialized.
148 ///
149 /// Most applications will not need to call this function directly. Instead,
150 /// they will use the [`#[tokio::main]` attribute][main]. When a more complex
151 /// configuration is necessary, the [runtime builder] may be used.
152 ///
153 /// See [module level][mod] documentation for more details.
154 ///
155 /// # Examples
156 ///
157 /// Creating a new `Runtime` with default configuration values.
158 ///
159 /// ```
160 /// use tokio::runtime::Runtime;
161 ///
162 /// let rt = Runtime::new()
163 /// .unwrap();
164 ///
165 /// // Use the runtime...
166 /// ```
167 ///
168 /// [mod]: index.html
169 /// [main]: ../attr.main.html
170 /// [threaded scheduler]: index.html#threaded-scheduler
171 /// [runtime builder]: crate::runtime::Builder
172 #[cfg(feature = "rt-multi-thread")]
173 #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
174 pub fn new() -> std::io::Result<Runtime> {
175 Builder::new_multi_thread().enable_all().build()
176 }
177
178 /// Returns a handle to the runtime's spawner.
179 ///
180 /// The returned handle can be used to spawn tasks that run on this runtime, and can
181 /// be cloned to allow moving the `Handle` to other threads.
182 ///
183 /// Calling [`Handle::block_on`] on a handle to a `current_thread` runtime is error-prone.
184 /// Refer to the documentation of [`Handle::block_on`] for more.
185 ///
186 /// # Examples
187 ///
188 /// ```
189 /// # #[cfg(not(target_family = "wasm"))]
190 /// # {
191 /// use tokio::runtime::Runtime;
192 ///
193 /// let rt = Runtime::new()
194 /// .unwrap();
195 ///
196 /// let handle = rt.handle();
197 ///
198 /// // Use the handle...
199 /// # }
200 /// ```
201 pub fn handle(&self) -> &Handle {
202 &self.handle
203 }
204
205 /// Spawns a future onto the Tokio runtime.
206 ///
207 /// This spawns the given future onto the runtime's executor, usually a
208 /// thread pool. The thread pool is then responsible for polling the future
209 /// until it completes.
210 ///
211 /// The provided future will start running in the background immediately
212 /// when `spawn` is called, even if you don't await the returned
213 /// `JoinHandle`.
214 ///
215 /// See [module level][mod] documentation for more details.
216 ///
217 /// [mod]: index.html
218 ///
219 /// # Examples
220 ///
221 /// ```
222 /// # #[cfg(not(target_family = "wasm"))]
223 /// # {
224 /// use tokio::runtime::Runtime;
225 ///
226 /// # fn dox() {
227 /// // Create the runtime
228 /// let rt = Runtime::new().unwrap();
229 ///
230 /// // Spawn a future onto the runtime
231 /// rt.spawn(async {
232 /// println!("now running on a worker thread");
233 /// });
234 /// # }
235 /// # }
236 /// ```
237 #[track_caller]
238 pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
239 where
240 F: Future + Send + 'static,
241 F::Output: Send + 'static,
242 {
243 let fut_size = mem::size_of::<F>();
244 if fut_size > BOX_FUTURE_THRESHOLD {
245 self.handle
246 .spawn_named(Box::pin(future), SpawnMeta::new_unnamed(fut_size))
247 } else {
248 self.handle
249 .spawn_named(future, SpawnMeta::new_unnamed(fut_size))
250 }
251 }
252
253 /// Runs the provided function on an executor dedicated to blocking operations.
254 ///
255 /// # Examples
256 ///
257 /// ```
258 /// # #[cfg(not(target_family = "wasm"))]
259 /// # {
260 /// use tokio::runtime::Runtime;
261 ///
262 /// # fn dox() {
263 /// // Create the runtime
264 /// let rt = Runtime::new().unwrap();
265 ///
266 /// // Spawn a blocking function onto the runtime
267 /// rt.spawn_blocking(|| {
268 /// println!("now running on a worker thread");
269 /// });
270 /// # }
271 /// # }
272 /// ```
273 #[track_caller]
274 pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
275 where
276 F: FnOnce() -> R + Send + 'static,
277 R: Send + 'static,
278 {
279 self.handle.spawn_blocking(func)
280 }
281
282 /// Runs a future to completion on the Tokio runtime. This is the
283 /// runtime's entry point.
284 ///
285 /// This runs the given future on the current thread, blocking until it is
286 /// complete, and yielding its resolved result. Any tasks or timers
287 /// which the future spawns internally will be executed on the runtime.
288 ///
289 /// # Non-worker future
290 ///
291 /// Note that the future required by this function does not run as a
292 /// worker. The expectation is that other tasks are spawned by the future here.
293 /// Awaiting on other futures from the future provided here will not
294 /// perform as fast as those spawned as workers.
295 ///
296 /// # Multi thread scheduler
297 ///
298 /// When the multi thread scheduler is used this will allow futures
299 /// to run within the io driver and timer context of the overall runtime.
300 ///
301 /// Any spawned tasks will continue running after `block_on` returns.
302 ///
303 /// # Current thread scheduler
304 ///
305 /// When the current thread scheduler is enabled `block_on`
306 /// can be called concurrently from multiple threads. The first call
307 /// will take ownership of the io and timer drivers. This means
308 /// other threads which do not own the drivers will hook into that one.
309 /// When the first `block_on` completes, other threads will be able to
310 /// "steal" the driver to allow continued execution of their futures.
311 ///
312 /// Any spawned tasks will be suspended after `block_on` returns. Calling
313 /// `block_on` again will resume previously spawned tasks.
314 ///
315 /// # Panics
316 ///
317 /// This function panics if the provided future panics, or if called within an
318 /// asynchronous execution context.
319 ///
320 /// # Examples
321 ///
322 /// ```no_run
323 /// # #[cfg(not(target_family = "wasm"))]
324 /// # {
325 /// use tokio::runtime::Runtime;
326 ///
327 /// // Create the runtime
328 /// let rt = Runtime::new().unwrap();
329 ///
330 /// // Execute the future, blocking the current thread until completion
331 /// rt.block_on(async {
332 /// println!("hello");
333 /// });
334 /// # }
335 /// ```
336 ///
337 /// [handle]: fn@Handle::block_on
338 #[track_caller]
339 pub fn block_on<F: Future>(&self, future: F) -> F::Output {
340 let fut_size = mem::size_of::<F>();
341 if fut_size > BOX_FUTURE_THRESHOLD {
342 self.block_on_inner(Box::pin(future), SpawnMeta::new_unnamed(fut_size))
343 } else {
344 self.block_on_inner(future, SpawnMeta::new_unnamed(fut_size))
345 }
346 }
347
348 #[track_caller]
349 fn block_on_inner<F: Future>(&self, future: F, _meta: SpawnMeta<'_>) -> F::Output {
350 #[cfg(all(
351 tokio_unstable,
352 feature = "taskdump",
353 feature = "rt",
354 target_os = "linux",
355 any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
356 ))]
357 let future = super::task::trace::Trace::root(future);
358
359 #[cfg(all(tokio_unstable, feature = "tracing"))]
360 let future = crate::util::trace::task(
361 future,
362 "block_on",
363 _meta,
364 crate::runtime::task::Id::next().as_u64(),
365 );
366
367 let _enter = self.enter();
368
369 match &self.scheduler {
370 Scheduler::CurrentThread(exec) => exec.block_on(&self.handle.inner, future),
371 #[cfg(feature = "rt-multi-thread")]
372 Scheduler::MultiThread(exec) => exec.block_on(&self.handle.inner, future),
373 }
374 }
375
376 /// Enters the runtime context.
377 ///
378 /// This allows you to construct types that must have an executor
379 /// available on creation such as [`Sleep`] or [`TcpStream`]. It will
380 /// also allow you to call methods such as [`tokio::spawn`].
381 ///
382 /// [`Sleep`]: struct@crate::time::Sleep
383 /// [`TcpStream`]: struct@crate::net::TcpStream
384 /// [`tokio::spawn`]: fn@crate::spawn
385 ///
386 /// # Example
387 ///
388 /// ```
389 /// # #[cfg(not(target_family = "wasm"))]
390 /// # {
391 /// use tokio::runtime::Runtime;
392 /// use tokio::task::JoinHandle;
393 ///
394 /// fn function_that_spawns(msg: String) -> JoinHandle<()> {
395 /// // Had we not used `rt.enter` below, this would panic.
396 /// tokio::spawn(async move {
397 /// println!("{}", msg);
398 /// })
399 /// }
400 ///
401 /// fn main() {
402 /// let rt = Runtime::new().unwrap();
403 ///
404 /// let s = "Hello World!".to_string();
405 ///
406 /// // By entering the context, we tie `tokio::spawn` to this executor.
407 /// let _guard = rt.enter();
408 /// let handle = function_that_spawns(s);
409 ///
410 /// // Wait for the task before we end the test.
411 /// rt.block_on(handle).unwrap();
412 /// }
413 /// # }
414 /// ```
415 pub fn enter(&self) -> EnterGuard<'_> {
416 self.handle.enter()
417 }
418
419 /// Shuts down the runtime, waiting for at most `duration` for all spawned
420 /// work to stop.
421 ///
422 /// See the [struct level documentation](Runtime#shutdown) for more details.
423 ///
424 /// # Examples
425 ///
426 /// ```
427 /// # #[cfg(not(target_family = "wasm"))]
428 /// # {
429 /// use tokio::runtime::Runtime;
430 /// use tokio::task;
431 ///
432 /// use std::thread;
433 /// use std::time::Duration;
434 ///
435 /// fn main() {
436 /// let runtime = Runtime::new().unwrap();
437 ///
438 /// runtime.block_on(async move {
439 /// task::spawn_blocking(move || {
440 /// thread::sleep(Duration::from_secs(10_000));
441 /// });
442 /// });
443 ///
444 /// runtime.shutdown_timeout(Duration::from_millis(100));
445 /// }
446 /// # }
447 /// ```
448 pub fn shutdown_timeout(mut self, duration: Duration) {
449 // Wakeup and shutdown all the worker threads
450 self.handle.inner.shutdown();
451 self.blocking_pool.shutdown(Some(duration));
452 }
453
454 /// Shuts down the runtime, without waiting for any spawned work to stop.
455 ///
456 /// This can be useful if you want to drop a runtime from within another runtime.
457 /// Normally, dropping a runtime will block indefinitely for spawned blocking tasks
458 /// to complete, which would normally not be permitted within an asynchronous context.
459 /// By calling `shutdown_background()`, you can drop the runtime from such a context.
460 ///
461 /// Note however, that because we do not wait for any blocking tasks to complete, this
462 /// may result in a resource leak (in that any blocking tasks are still running until they
463 /// return.
464 ///
465 /// See the [struct level documentation](Runtime#shutdown) for more details.
466 ///
467 /// This function is equivalent to calling `shutdown_timeout(Duration::from_nanos(0))`.
468 ///
469 /// ```
470 /// # #[cfg(not(target_family = "wasm"))]
471 /// # {
472 /// use tokio::runtime::Runtime;
473 ///
474 /// fn main() {
475 /// let runtime = Runtime::new().unwrap();
476 ///
477 /// runtime.block_on(async move {
478 /// let inner_runtime = Runtime::new().unwrap();
479 /// // ...
480 /// inner_runtime.shutdown_background();
481 /// });
482 /// }
483 /// # }
484 /// ```
485 pub fn shutdown_background(self) {
486 self.shutdown_timeout(Duration::from_nanos(0));
487 }
488
489 /// Returns a view that lets you get information about how the runtime
490 /// is performing.
491 pub fn metrics(&self) -> crate::runtime::RuntimeMetrics {
492 self.handle.metrics()
493 }
494}
495
496impl Drop for Runtime {
497 fn drop(&mut self) {
498 match &mut self.scheduler {
499 Scheduler::CurrentThread(current_thread) => {
500 // This ensures that tasks spawned on the current-thread
501 // runtime are dropped inside the runtime's context.
502 let _guard = context::try_set_current(&self.handle.inner);
503 current_thread.shutdown(&self.handle.inner);
504 }
505 #[cfg(feature = "rt-multi-thread")]
506 Scheduler::MultiThread(multi_thread) => {
507 // The threaded scheduler drops its tasks on its worker threads, which is
508 // already in the runtime's context.
509 multi_thread.shutdown(&self.handle.inner);
510 }
511 }
512 }
513}
514
515impl std::panic::UnwindSafe for Runtime {}
516
517impl std::panic::RefUnwindSafe for Runtime {}
518
519fn display_eq(d: impl std::fmt::Display, s: &str) -> bool {
520 use std::fmt::Write;
521
522 struct FormatEq<'r> {
523 remainder: &'r str,
524 unequal: bool,
525 }
526
527 impl<'r> Write for FormatEq<'r> {
528 fn write_str(&mut self, s: &str) -> std::fmt::Result {
529 if !self.unequal {
530 if let Some(new_remainder) = self.remainder.strip_prefix(s) {
531 self.remainder = new_remainder;
532 } else {
533 self.unequal = true;
534 }
535 }
536 Ok(())
537 }
538 }
539
540 let mut fmt_eq = FormatEq {
541 remainder: s,
542 unequal: false,
543 };
544 let _ = write!(fmt_eq, "{d}");
545 fmt_eq.remainder.is_empty() && !fmt_eq.unequal
546}
547
548/// Checks whether the given error was emitted by Tokio when shutting down its runtime.
549///
550/// # Examples
551///
552/// ```
553/// # #[cfg(not(target_family = "wasm"))]
554/// # {
555/// use tokio::runtime::Runtime;
556/// use tokio::net::TcpListener;
557///
558/// fn main() {
559/// let rt1 = Runtime::new().unwrap();
560/// let rt2 = Runtime::new().unwrap();
561///
562/// let listener = rt1.block_on(async {
563/// TcpListener::bind("127.0.0.1:0").await.unwrap()
564/// });
565///
566/// drop(rt1);
567///
568/// rt2.block_on(async {
569/// let res = listener.accept().await;
570/// assert!(res.is_err());
571/// assert!(tokio::runtime::is_rt_shutdown_err(res.as_ref().unwrap_err()));
572/// });
573/// }
574/// # }
575/// ```
576pub fn is_rt_shutdown_err(err: &io::Error) -> bool {
577 if let Some(inner) = err.get_ref() {
578 err.kind() == io::ErrorKind::Other
579 && inner.source().is_none()
580 && display_eq(inner, RUNTIME_SHUTTING_DOWN_ERROR)
581 } else {
582 false
583 }
584}