https://intoraw.xyz/blog/feed.xml

[Tokio] Blocking Pool

2025-03-19

这篇文章主要介绍 tokio 中的 BlockingPool 的运行机制。

Tokio Runtime 结构

Tokio 的 Runtime 结构如下

pub struct Runtime {
    /// Task scheduler
    scheduler: Scheduler,

    /// Handle to runtime, also contains driver handles
    handle: Handle,

    /// Blocking pool handle, used to signal shutdown
    blocking_pool: BlockingPool,
}

在 tokio runtime 启动的时候会首先创建 BlockingPool 线程池,后续所有 spawn_blocking的 task 都是运行在该线程池上。 同样,异步的 executor 也是运行在这个线程池上。

Blocking Pool

BlockingPool 结构如下

pub(crate) struct BlockingPool {
    spawner: Spawner,
    shutdown_rx: shutdown::Receiver,
}

BlockingPool 主要包含两个结构

  • Spawner: 用于 spawn task。
  • shutdown_rx: 用于处理 blocking pool 的关闭流程

BlockingPool只在 Runtime 创建的时候创建,创建之后,BlockingPool 内的 Spawner 会 clone 一份传递给 Runtime 的 handle,用于 spawn task。 BlockingPool 这个结构剩下的作用只是一个句柄,存储在 Runtime 结构中,用于runtime 关闭的时候优雅关闭线程池中的线程。

    impl Builder {
        fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
            // ...

            // Create the blocking pool
            let blocking_pool =
                blocking::create_blocking_pool(self, self.max_blocking_threads + core_threads);
            let blocking_spawner = blocking_pool.spawner().clone();

            let (scheduler, handle, launch) = MultiThread::new(
                // ...
                blocking_spawner,
                // ...
            );

            let handle = Handle { inner: scheduler::Handle::MultiThread(handle) };

            // Spawn the thread pool workers
            let _enter = handle.enter();
            launch.launch();

            Ok(Runtime::from_parts(Scheduler::MultiThread(scheduler), handle, blocking_pool))
        }

Spawner

其中 Spanner 是 Clone 的,内部结构如下:

#[derive(Clone)]
pub(crate) struct Spawner {
    inner: Arc<Inner>,
}

struct Inner {
    /// State shared between worker threads.
    shared: Mutex<Shared>,

    /// Pool threads wait on this.
    condvar: Condvar,

    /// Spawned threads use this name.
    thread_name: ThreadNameFn,

    /// Spawned thread stack size.
    stack_size: Option<usize>,

    /// Call after a thread starts.
    after_start: Option<Callback>,

    /// Call before a thread stops.
    before_stop: Option<Callback>,

    // Maximum number of threads.
    thread_cap: usize,

    // Customizable wait timeout.
    keep_alive: Duration,

    // Metrics about the pool.
    metrics: SpawnerMetrics,
}

其中 Inner 结构进一步的将多个 worker 线程会出现读写冲突的部分放到了 Shared 中,并用 Mutex 保护。Inner 中剩下的结构都是被 worker 线程只读。

struct Shared {
    queue: VecDeque<Task>,
    num_notify: u32,
    shutdown: bool,
    shutdown_tx: Option<shutdown::Sender>,
    /// Prior to shutdown, we clean up `JoinHandles` by having each timed-out
    /// thread join on the previous timed-out thread. This is not strictly
    /// necessary but helps avoid Valgrind false positives, see
    /// <https://github.com/tokio-rs/tokio/commit/646fbae76535e397ef79dbcaacb945d4c829f666>
    /// for more information.
    last_exiting_thread: Option<thread::JoinHandle<()>>,
    /// This holds the `JoinHandles` for all running threads; on shutdown, the thread
    /// calling shutdown handles joining on these.
    worker_threads: HashMap<usize, thread::JoinHandle<()>>,
    /// This is a counter used to iterate `worker_threads` in a consistent order (for loom's
    /// benefit).
    worker_thread_index: usize,
}

Shared 的部分包含

  • queue: 所有 worker 共享的全局队列
  • num_notify: 为了避免condvar 虚假唤醒,num_notify 为 0 表示是虚假唤醒。
  • shutdown: 当 BlockingPool要 shutdown 的时候,标记为 true,worker 在代码逻辑中穿插着检查 shutdown 的逻辑,如果为 true 则主动 shutdown _
  • shutdown_tx : 是一个 Arc<oneshot::Sender<()>>,类似 RAII ,每个启动的 worker 线程会clone 一份,当 worker 线程执行结束之后会调用drop,当所有的 shutdown_tx 都被 drop 完的时候,_shutdown_rx_调用 recv 会立刻返回。

关于 shutdown::Sender 和 shutdown::Receiver

先看代码

//! A shutdown channel.
//!
//! Each worker holds the `Sender` half. When all the `Sender` halves are
//! dropped, the `Receiver` receives a notification.
#[derive(Debug, Clone)]
pub(super) struct Sender {
    _tx: Arc<oneshot::Sender<()>>,
}

#[derive(Debug)]
pub(super) struct Receiver {
    rx: oneshot::Receiver<()>,
}

shutdown::Sender 和 shutdown::Receiver 里面其实报了一个 tokio 的 oneshot channel. 这里有个问题,当需要等待 shutdown 结束,也就是调用 Receiver::wait,是一个同步的调用,但是里面的 channel 是一个异步的 future,如何在一个同步函数中调用一个异步函数?

这是一个经典问题。一般方法就是runtime::block_on,但 tokio 本身就是 runtime,怎么搞?

可以看下代码

impl Receiver {
    /// Blocks the current thread until all `Sender` handles drop.
    ///
    /// If `timeout` is `Some`, the thread is blocked for **at most** `timeout`
    /// duration. If `timeout` is `None`, then the thread is blocked until the
    /// shutdown signal is received.
    ///
    /// If the timeout has elapsed, it returns `false`, otherwise it returns `true`.
    pub(crate) fn wait(&mut self, timeout: Option<Duration>) -> bool {
        use crate::runtime::context::try_enter_blocking_region;

        if timeout == Some(Duration::from_nanos(0)) {
            return false;
        }

        let mut e = match try_enter_blocking_region() {
            Some(enter) => enter,
            _ => {
                if std::thread::panicking() {
                    // Don't panic in a panic
                    return false;
                } else {
                    panic!(
                        "Cannot drop a runtime in a context where blocking is not allowed. \
                        This happens when a runtime is dropped from within an asynchronous context."
                    );
                }
            }
        };

        // The oneshot completes with an Err
        //
        // If blocking fails to wait, this indicates a problem parking the
        // current thread (usually, shutting down a runtime stored in a
        // thread-local).
        if let Some(timeout) = timeout {
            e.block_on_timeout(&mut self.rx, timeout).is_ok()
        } else {
            let _ = e.block_on(&mut self.rx);
            true
        }
    }
}

可以看到,创建一个 BlockingRegionGuard,作用是保证 block_on_timeout 这个操作的调用不能出现在一个Runtime中,类似 runtime::block_on。因为该操作 会导致线程级别的挂起,阻塞当前线程。

其中block_on_timeout实现如下

impl BlockingRegionGuard {
    /// Blocks the thread on the specified future for **at most** `timeout`
    ///
    /// If the future completes before `timeout`, the result is returned. If
    /// `timeout` elapses, then `Err` is returned.
    pub(crate) fn block_on_timeout<F>(&mut self, f: F, timeout: Duration) -> Result<F::Output, ()>
    where
        F: std::future::Future,
    {
        use crate::runtime::park::CachedParkThread;
        use std::task::Context;
        use std::task::Poll::Ready;
        use std::time::Instant;

        let mut park = CachedParkThread::new();
        let waker = park.waker().map_err(|_| ())?;
        let mut cx = Context::from_waker(&waker);

        pin!(f);
        let when = Instant::now() + timeout;

        loop {
            if let Ready(v) = crate::runtime::coop::budget(|| f.as_mut().poll(&mut cx)) {
                return Ok(v);
            }

            let now = Instant::now();

            if now >= when {
                return Err(());
            }

            park.park_timeout(when - now);
        }
    }
}

注意这里的 f 是一个 Future。 这里先创建了一个 CachedParkThread,为了简化,可以理解为这个结构中包含了一个 condvar,然后 waker 就是调用condvar.notify_all() 最后通过 waker 创建了 cx,调用 future::Poll(cx),如果 poll 是 pending 状态,就调用park.park_timeout(when - now); 将当前 worker 线程挂起,直到 poll 返回 Ready 或者超时。

关于Spawner 中的 Mutex

可以看到所有的 Worker 都会通过 Mutex 访问 Shared,由于task 队列是放在 Shared 中,这样每次tokio::spawn_blocking(f)的时候都会获取锁。 这个问题在 tokio 上也有相关的 issue

解决思路其实也比较简单,类似 tokio 中 non_blocking task 的 executor,每个 worker 拥有一个自己的有限长度的队列,spawn 的 task 优先放到私有队列中,当私有队列满了,放到steal 队列(global,多个 worker 共享)。

但为什么社区没有跟进这个 issue 就不得而知了。

需要注意的是,tokio 的 fs 模块中的异步其实是假异步,它将同步的 fs 调用通过 tokio::spawn_blocking 提交到了BlockingPool 中,这样导致在高并发访问 fs 的情况下可能会出现由于 Mutex 导致的竞争问题。

Spawner 任务提交的流程

Spawner 提交任务流程也比较常规

  • 创建 Task 结构,同时返回 task 的 JoinHandle
  • Spawn task
    • 将 task 放到队列中
    • 如果当前有 idle 的worker,则通过 condvar唤醒
    • 如果当前没有 idle 的 worker
      • 如果当前 worker 数目小于 thread pool 的容量,则启动一个新的 worker
      • 如果当前 worker 数目已经到达 thread pool 的容量,则直接返回。

启动 worker 的流程:

    fn spawn_thread(
        &self,
        shutdown_tx: shutdown::Sender,
        rt: &Handle,
        id: usize,
    ) -> io::Result<thread::JoinHandle<()>> {
        let mut builder = thread::Builder::new().name((self.inner.thread_name)());

        if let Some(stack_size) = self.inner.stack_size {
            builder = builder.stack_size(stack_size);
        }

        let rt = rt.clone();

        builder.spawn(move || {
            // Only the reference should be moved into the closure
            // 进入 runtime:设置当前 worker 线程的 thread_local 的 handle 为给定的 rt
            let _enter = rt.enter();
            // worker loop 函数
            rt.inner.blocking_spawner().inner.run(id);
            // worker 执行结束后,释放 shutdown_tx 的引用计数
            drop(shutdown_tx);
        })
    }

worker 执行的函数如下,主要逻辑: 不断从队列中 pop task 执行,当没有 task 执行的时候进入休眠self.condvar.wait_timeout(shared, self.keep_alive).unwrap(); 默认休眠 10s,如果在休眠期间没有被唤醒,则休眠结束后结束 worker 的执行。

impl Inner {
    fn run(&self, worker_thread_id: usize) {
        if let Some(f) = &self.after_start {
            f();
        }

        let mut shared = self.shared.lock();
        let mut join_on_thread = None;

        'main: loop {
            // BUSY
            while let Some(task) = shared.queue.pop_front() {
                self.metrics.dec_queue_depth();
                drop(shared);
                task.run();

                shared = self.shared.lock();
            }

            // IDLE
            self.metrics.inc_num_idle_threads();

            while !shared.shutdown {
                let lock_result = self.condvar.wait_timeout(shared, self.keep_alive).unwrap();

                shared = lock_result.0;
                let timeout_result = lock_result.1;

                if shared.num_notify != 0 {
                    // We have received a legitimate wakeup,
                    // acknowledge it by decrementing the counter
                    // and transition to the BUSY state.
                    shared.num_notify -= 1;
                    break;
                }

                // Even if the condvar "timed out", if the pool is entering the
                // shutdown phase, we want to perform the cleanup logic.
                if !shared.shutdown && timeout_result.timed_out() {
                    // We'll join the prior timed-out thread's JoinHandle after dropping the lock.
                    // This isn't done when shutting down, because the thread calling shutdown will
                    // handle joining everything.
                    let my_handle = shared.worker_threads.remove(&worker_thread_id);
                    join_on_thread = std::mem::replace(&mut shared.last_exiting_thread, my_handle);

                    break 'main;
                }

                // Spurious wakeup detected, go back to sleep.
            }

            if shared.shutdown {
                // Drain the queue
                while let Some(task) = shared.queue.pop_front() {
                    self.metrics.dec_queue_depth();
                    drop(shared);

                    task.shutdown_or_run_if_mandatory();

                    shared = self.shared.lock();
                }

                // Work was produced, and we "took" it (by decrementing num_notify).
                // This means that num_idle was decremented once for our wakeup.
                // But, since we are exiting, we need to "undo" that, as we'll stay idle.
                self.metrics.inc_num_idle_threads();
                // NOTE: Technically we should also do num_notify++ and notify again,
                // but since we're shutting down anyway, that won't be necessary.
                break;
            }
        }

        // Thread exit
        self.metrics.dec_num_threads();

        // num_idle should now be tracked exactly, panic
        // with a descriptive message if it is not the
        // case.
        let prev_idle = self.metrics.dec_num_idle_threads();
        assert!(
            prev_idle >= self.metrics.num_idle_threads(),
            "num_idle_threads underflowed on thread exit"
        );

        if shared.shutdown && self.metrics.num_threads() == 0 {
            self.condvar.notify_one();
        }

        drop(shared);

        if let Some(f) = &self.before_stop {
            f();
        }

        if let Some(handle) = join_on_thread {
            let _ = handle.join();
        }
    }
}

BlockingPool 的优雅关闭

当 BlockingPool 创建出来之后,就分成了两部分, BlockingPool 结构作为 Handle,用于关闭线程池。一部分是 Worker 线程(被 thread::spawn 出来之后获取 Inner 的 Clone 执行,他们和 BlockingPool 结构共享 Inner 结构。

BlockingPool 的关闭分两部分:

  • 发送关闭信号:利用 shutdowncondvar
    • 直接将 Shared 结构中的 shutdown 标记为 true。在 worker 执行的 loop 中会穿插检查 shutdown 信号的逻辑,如果为 true 则进入关闭流程。
    • 对于处于休眠状态的 worker线程,则通过 condvar.notify_all的方式唤醒,worker 唤醒之后会检查 shutdown 来确认是否进入关闭流程
  • 等待所有 worker 执行结束
    • 调用 shutdown_rx.wait 等待 worker 的 fn 执行结束。
    • 调用 thread::JoinHandle::join 等待 worker 线程执行结束。

需要注意的问题

BlockingPool 的调度是 FIFO 的,并且每个 Task 执行是不可打断的,在创建 Task 的时候需要注意,Task 中是否存在获取锁没有释放的问题,如果存在,当 BlockingPool 的worker 消耗完之后,会造成死锁。