[Tokio] Blocking Pool
这篇文章主要介绍 tokio 中的 BlockingPool 的运行机制。
Tokio Runtime 结构#
Tokio 的 Runtime 结构如下
pub struct Runtime {
scheduler: Scheduler,
handle: Handle,
blocking_pool: BlockingPool,
}
在 tokio runtime 启动的时候会首先创建 BlockingPool 线程池,后续所有 spawn_blocking
的 task 都是运行在该线程池上。 同样,异步的 executor 也是运行在这个线程池上。
Blocking Pool#
BlockingPool 结构如下
pub(crate) struct BlockingPool {
spawner: Spawner,
shutdown_rx: shutdown::Receiver,
}
BlockingPool 主要包含两个结构
- Spawner: 用于 spawn task。
- shutdown_rx: 用于处理 blocking pool 的关闭流程
BlockingPool只在 Runtime 创建的时候创建,创建之后,BlockingPool 内的 Spawner 会 clone 一份传递给 Runtime 的 handle,用于 spawn task。 BlockingPool 这个结构剩下的作用只是一个句柄,存储在 Runtime 结构中,用于runtime 关闭的时候优雅关闭线程池中的线程。
impl Builder {
fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
let blocking_pool =
blocking::create_blocking_pool(self, self.max_blocking_threads + core_threads);
let blocking_spawner = blocking_pool.spawner().clone();
let (scheduler, handle, launch) = MultiThread::new(
blocking_spawner,
);
let handle = Handle { inner: scheduler::Handle::MultiThread(handle) };
let _enter = handle.enter();
launch.launch();
Ok(Runtime::from_parts(Scheduler::MultiThread(scheduler), handle, blocking_pool))
}
Spawner#
其中 Spanner 是 Clone 的,内部结构如下:
#[derive(Clone)]
pub(crate) struct Spawner {
inner: Arc<Inner>,
}
struct Inner {
shared: Mutex<Shared>,
condvar: Condvar,
thread_name: ThreadNameFn,
stack_size: Option<usize>,
after_start: Option<Callback>,
before_stop: Option<Callback>,
thread_cap: usize,
keep_alive: Duration,
metrics: SpawnerMetrics,
}
其中 Inner 结构进一步的将多个 worker 线程会出现读写冲突的部分放到了 Shared 中,并用 Mutex 保护。Inner 中剩下的结构都是被 worker 线程只读。
struct Shared {
queue: VecDeque<Task>,
num_notify: u32,
shutdown: bool,
shutdown_tx: Option<shutdown::Sender>,
last_exiting_thread: Option<thread::JoinHandle<()>>,
worker_threads: HashMap<usize, thread::JoinHandle<()>>,
worker_thread_index: usize,
}
Shared 的部分包含
- queue: 所有 worker 共享的全局队列
- num_notify: 为了避免condvar 虚假唤醒,num_notify 为 0 表示是虚假唤醒。
- shutdown: 当 BlockingPool要 shutdown 的时候,标记为 true,worker 在代码逻辑中穿插着检查 shutdown 的逻辑,如果为 true 则主动 shutdown _
- shutdown_tx : 是一个
Arc<oneshot::Sender<()>>
,类似 RAII ,每个启动的 worker 线程会clone 一份,当 worker 线程执行结束之后会调用drop,当所有的 shutdown_tx 都被 drop 完的时候,_shutdown_rx_调用 recv 会立刻返回。
关于 shutdown::Sender 和 shutdown::Receiver#
先看代码
#[derive(Debug, Clone)]
pub(super) struct Sender {
_tx: Arc<oneshot::Sender<()>>,
}
#[derive(Debug)]
pub(super) struct Receiver {
rx: oneshot::Receiver<()>,
}
shutdown::Sender 和 shutdown::Receiver 里面其实报了一个 tokio 的 oneshot channel. 这里有个问题,当需要等待 shutdown 结束,也就是调用 Receiver::wait
,是一个同步的调用,但是里面的 channel 是一个异步的 future,如何在一个同步函数中调用一个异步函数?
这是一个经典问题。一般方法就是runtime::block_on
,但 tokio 本身就是 runtime,怎么搞?
可以看下代码
impl Receiver {
pub(crate) fn wait(&mut self, timeout: Option<Duration>) -> bool {
use crate::runtime::context::try_enter_blocking_region;
if timeout == Some(Duration::from_nanos(0)) {
return false;
}
let mut e = match try_enter_blocking_region() {
Some(enter) => enter,
_ => {
if std::thread::panicking() {
return false;
} else {
panic!(
"Cannot drop a runtime in a context where blocking is not allowed. \
This happens when a runtime is dropped from within an asynchronous context."
);
}
}
};
if let Some(timeout) = timeout {
e.block_on_timeout(&mut self.rx, timeout).is_ok()
} else {
let _ = e.block_on(&mut self.rx);
true
}
}
}
可以看到,创建一个 BlockingRegionGuard,作用是保证 block_on_timeout
这个操作的调用不能出现在一个Runtime中,类似 runtime::block_on
。因为该操作 会导致线程级别的挂起,阻塞当前线程。
其中block_on_timeout
实现如下
impl BlockingRegionGuard {
pub(crate) fn block_on_timeout<F>(&mut self, f: F, timeout: Duration) -> Result<F::Output, ()>
where
F: std::future::Future,
{
use crate::runtime::park::CachedParkThread;
use std::task::Context;
use std::task::Poll::Ready;
use std::time::Instant;
let mut park = CachedParkThread::new();
let waker = park.waker().map_err(|_| ())?;
let mut cx = Context::from_waker(&waker);
pin!(f);
let when = Instant::now() + timeout;
loop {
if let Ready(v) = crate::runtime::coop::budget(|| f.as_mut().poll(&mut cx)) {
return Ok(v);
}
let now = Instant::now();
if now >= when {
return Err(());
}
park.park_timeout(when - now);
}
}
}
注意这里的 f 是一个 Future。 这里先创建了一个 CachedParkThread,为了简化,可以理解为这个结构中包含了一个 condvar,然后 waker 就是调用condvar.notify_all()
最后通过 waker 创建了 cx,调用 future::Poll(cx)
,如果 poll 是 pending 状态,就调用park.park_timeout(when - now);
将当前 worker 线程挂起,直到 poll 返回 Ready 或者超时。
关于Spawner 中的 Mutex#
可以看到所有的 Worker 都会通过 Mutex 访问 Shared,由于task 队列是放在 Shared 中,这样每次tokio::spawn_blocking(f)
的时候都会获取锁。 这个问题在 tokio 上也有相关的 issue。
解决思路其实也比较简单,类似 tokio 中 non_blocking task 的 executor,每个 worker 拥有一个自己的有限长度的队列,spawn 的 task 优先放到私有队列中,当私有队列满了,放到steal 队列(global,多个 worker 共享)。
但为什么社区没有跟进这个 issue 就不得而知了。
需要注意的是,tokio 的 fs 模块中的异步其实是假异步,它将同步的 fs 调用通过 tokio::spawn_blocking 提交到了BlockingPool 中,这样导致在高并发访问 fs 的情况下可能会出现由于 Mutex 导致的竞争问题。
Spawner 任务提交的流程#
Spawner 提交任务流程也比较常规
- 创建 Task 结构,同时返回 task 的 JoinHandle
- Spawn task
- 将 task 放到队列中
- 如果当前有 idle 的worker,则通过 condvar唤醒
- 如果当前没有 idle 的 worker
- 如果当前 worker 数目小于 thread pool 的容量,则启动一个新的 worker
- 如果当前 worker 数目已经到达 thread pool 的容量,则直接返回。
启动 worker 的流程:
fn spawn_thread(
&self,
shutdown_tx: shutdown::Sender,
rt: &Handle,
id: usize,
) -> io::Result<thread::JoinHandle<()>> {
let mut builder = thread::Builder::new().name((self.inner.thread_name)());
if let Some(stack_size) = self.inner.stack_size {
builder = builder.stack_size(stack_size);
}
let rt = rt.clone();
builder.spawn(move || {
let _enter = rt.enter();
rt.inner.blocking_spawner().inner.run(id);
drop(shutdown_tx);
})
}
worker 执行的函数如下,主要逻辑: 不断从队列中 pop task 执行,当没有 task 执行的时候进入休眠self.condvar.wait_timeout(shared, self.keep_alive).unwrap();
默认休眠 10s,如果在休眠期间没有被唤醒,则休眠结束后结束 worker 的执行。
impl Inner {
fn run(&self, worker_thread_id: usize) {
if let Some(f) = &self.after_start {
f();
}
let mut shared = self.shared.lock();
let mut join_on_thread = None;
'main: loop {
while let Some(task) = shared.queue.pop_front() {
self.metrics.dec_queue_depth();
drop(shared);
task.run();
shared = self.shared.lock();
}
self.metrics.inc_num_idle_threads();
while !shared.shutdown {
let lock_result = self.condvar.wait_timeout(shared, self.keep_alive).unwrap();
shared = lock_result.0;
let timeout_result = lock_result.1;
if shared.num_notify != 0 {
shared.num_notify -= 1;
break;
}
if !shared.shutdown && timeout_result.timed_out() {
let my_handle = shared.worker_threads.remove(&worker_thread_id);
join_on_thread = std::mem::replace(&mut shared.last_exiting_thread, my_handle);
break 'main;
}
}
if shared.shutdown {
while let Some(task) = shared.queue.pop_front() {
self.metrics.dec_queue_depth();
drop(shared);
task.shutdown_or_run_if_mandatory();
shared = self.shared.lock();
}
self.metrics.inc_num_idle_threads();
break;
}
}
self.metrics.dec_num_threads();
let prev_idle = self.metrics.dec_num_idle_threads();
assert!(
prev_idle >= self.metrics.num_idle_threads(),
"num_idle_threads underflowed on thread exit"
);
if shared.shutdown && self.metrics.num_threads() == 0 {
self.condvar.notify_one();
}
drop(shared);
if let Some(f) = &self.before_stop {
f();
}
if let Some(handle) = join_on_thread {
let _ = handle.join();
}
}
}
BlockingPool 的优雅关闭#
当 BlockingPool 创建出来之后,就分成了两部分, BlockingPool 结构作为 Handle,用于关闭线程池。一部分是 Worker 线程(被 thread::spawn 出来之后获取 Inner 的 Clone 执行,他们和 BlockingPool 结构共享 Inner 结构。
BlockingPool 的关闭分两部分:
- 发送关闭信号:利用 shutdown 和 condvar
- 直接将 Shared 结构中的 shutdown 标记为 true。在 worker 执行的 loop 中会穿插检查 shutdown 信号的逻辑,如果为 true 则进入关闭流程。
- 对于处于休眠状态的 worker线程,则通过
condvar.notify_all
的方式唤醒,worker 唤醒之后会检查 shutdown 来确认是否进入关闭流程
- 等待所有 worker 执行结束
- 调用
shutdown_rx.wait
等待 worker 的 fn 执行结束。 - 调用 thread::JoinHandle::join 等待 worker 线程执行结束。
需要注意的问题#
BlockingPool 的调度是 FIFO 的,并且每个 Task 执行是不可打断的,在创建 Task 的时候需要注意,Task 中是否存在获取锁没有释放的问题,如果存在,当 BlockingPool 的worker 消耗完之后,会造成死锁。