https://intoraw.xyz/blog/feed.xml

Tokio Cancellation Token

2024-05-05

CancellationToken的 API 设计

Tokio的 CancellationToken 是用来取消一个 Tokio Task 执行的数据结构,区别于使用channel来取消 task 执行的方式,cancellation token 可以提供一种结构化的 cancel 方式。例如在一个 tokio task A 中 spawn 了若干个子 task,那么希望在 taskA cancel 的同时也 cancel 所有的子 task ,这时候就可以用 CancellationToken。

CancellationToken 的 基本 API 如下

/// 创建一个 child token
pub fn child_token(&self) -> CancellationToken;

/// cancel 自己以及所有自己的 children
pub fn cancel(&self);

/// 返回一个 future,等待自己被 cancelled 。
/// 自己被 cancel 可以是自己的 token 调用了 cancel 或者自己的 parent 调用了 cancel
pub fn cancelled(&self) -> WaitForCancellationFuture<'_>;

/// 消费掉自己,返回一个带所有权的 future,这样能避免一些生命周期的问题
pub fn cancelled_owned(self) -> WaitForCancellationFutureOwned;

/// 将自己的所有权转移给 DropGuard,当 DropGuard 的时候调用 cancel 同时 cancel所有的 children
pub fn drop_guard(self) -> DropGuard;

所以 CancellationToken的 API 主要做三件事:

  • 创建 children 的 token
  • 当调用 cancel 的时候cancel 自己和 children,这里应该涉及到tree 的后序遍历
  • 被 spawn 出去的 task 可以拿到 token,调用 cancelled 来等待被 cancel 。

区别于 Channel 的方法,把 sender 和 receiver 独立开,CancellationToken 本身是 Send + Sync 的。多个 Task 可以同时 cancel 一个 token 或者同时.await一个 Token 被 cancel。

所以 CancellationToken 的设计需要是

  • 维护一个 tree 的结构来记录token 之间的关系
  • CancellationToken 是 Send 和 Sync 的,多个 task可以同时对tree进行修改,所以内部需要是一个 Arc<Mutex<>>的结构

在这里有两个疑问,

  1. 如何维护这个 tree 的结构,除了记录 children 是否要记录 parent,如果记录 parent,同时有 Arc,是否会带来循环引用?
  2. 如果 token 被 drop 了,那么这个 token 的 children 是否会出现 orphaned 的情况,永远不会被 cancel?
  3. tree结构,同时有 Mutex,如何保证不死锁?
  4. 如果 cancelled 可以被多个 task 同时 poll,那么 cancelled 的返回的类型需要是 Send + Sync 的,这里如何实现?

带着这几个问题我们看下 CancellationToken 的代码。

结构定义

pub struct CancellationToken {
    inner: Arc<tree_node::TreeNode>,
}

CancellationToken里面是一个 Arc TreeNode的结构

pub struct TreeNode {
    inner: Mutex<Inner>,
    waker: tokio::sync::Notify, // Notify : Send + Sync
}

对同一个 TreeNode 的 cancel 和 cancelled 需要有唤醒机制,因此TreeNode中需要有一个 waker 。

如何维护 tree 的结构? 对 treenode 的修改,例如添加children,删除 treenode 节点等主要是 Inner 数据结构来维护。

struct Inner {
    parent: Option<Arc<TreeNode>>, //这里是 TreeNode,而不是 inner,因为在 wake 的时候也要便利 tree 的结构
    parent_idx: usize, // node在 parent 的第几个 children
    children: Vec<Arc<TreeNode>>, // 自己的 children
    is_cancelled: bool,
    num_handles: usize, // CancellationToken对该 TreeNode 的引用计数
}

如何解决 TreeNode 的循环引用?

这里有个疑问,为什么不把 waker 放到 inner 中,而是单独提出来?放到 inner 中就有一个 mutex 包着的问题。

另外一个问题,Inner 中为什么需要保留对 Parent 的 Arc?这样不会有Arc 的循环引用么? 众所周知在 Rust 中实现一个支持 Remove 功能的 Tree 是非常困难的

先看下 TreeNode 如何解决循环引用的问题。

这里本质上是要实现一个 concurrent 的 tree 结构

一般对于这种有环状依赖的关系,例如自引用或者其他,在 rust 中都要保持某种不变性,invariants, 例如数据访问或者其他,来约束数据的访问和修改,将这种环状依赖 break 掉。

CancellationToken 的 ownership? lib 的设计是,CancellationToken 是可以被 Clone 的,但要保证指向相同的 TreeNode,所以要有一个引用计数。 但是当 CancellationToken 在被 Drop 的时候,如果引用计数为 0,则需要 DropTreeNode Drop TreeNode 希望能够保证 TreeNode 的 Children 依然可以被当前 node 的 parent cancel 掉, 在 CancellationToken 中的做法是,将 children 挂在当前 node 的 parent 上。 问题来了,为什么在 Inner 中要维护一个 handle_count,而不是直接实现 TreeNode 的 Drop?

TreeNode只有当 Arc 的计数为 0 的时候才会触发。这里有个问题,

如果是 CancellationHandle 的 Arc的 引用计数为 0 的时候,触发 TreeNode 的 drop,但是现在的情况下因为有循环引用问题,所以引用技术永远不会是 0。 所以要先破坏掉这个循环引用。只通过 Arc来看引用计数是不行的。 因为 Arc 这里,children 会把自己放到 parent 上。 但是 CancellationToken 的 Inner 的handle_count表示在用户的代码层面,有多少个 CancellationToken,这里不存在循环引用的状态,所以当 handle_count 为 0 的时候表示在用户的代码层面已经没有对这个 CancellationToken 的引用了,这时候就要触发 TreeNode 的 Drop 。 CancellationToken不会在 parent 层面多记录一次。

主要是 Arc 因为有循环引用的存在,无法真正的表示一个 TreeNode 在 CancellationToken 层面的所有权表示,所以需要有 handle_count 这个来记录。

我们来看下 CancellationToken 的做法

impl Clone for CancellationToken {
    fn clone(&self) -> Self {
        // 这个 tree node 被 cancellationtoken 引用计数 +  1
        tree_node::increase_handle_refcount(&self.inner);
        // 这个 cancellationToken 也引用 inner 的 Arc<TreeNode>
        CancellationToken {
            inner: self.inner.clone(),
        }
    }
}

impl Drop for CancellationToken {
    fn drop(&mut self) {
        // tree node 的 handle_count - 1
        tree_node::decrease_handle_refcount(&self.inner);
    }
}

再看下 decrease_handle_refcount函数

  • 如何释放 Arc 的资源:Arc 的资源只要破坏掉循环引用,自己就会回收
  1. 将 children 到 parent 的引用去掉,减少 parent 的引用:node.parent = None
  2. 将 parent 到 children 的引用去掉
  • 如何解决 orphan 的问题?
  1. 将 node 的 children 链接到 parent 上。这样 parent 在 drop 的时候,node 的 children 也会被释放
pub(crate) fn decrease_handle_refcount(node: &Arc<TreeNode>) {
    let num_handles = {
        let mut locked_node = node.inner.lock().unwrap();
        locked_node.num_handles -= 1;
        locked_node.num_handles
    };

    if num_handles == 0 {
        with_locked_node_and_parent(node, |mut node, parent| {
            // Remove the node from the tree
            match parent {
                Some(mut parent) => {
                    // As we want to remove ourselves from the tree,
                    // we have to move the children to the parent, so that
                    // they still receive the cancellation event without us.
                    // Moving them does not violate invariant #1.
                    move_children_to_parent(&mut node, &mut parent);

                    // Remove the node from the parent
                    remove_child(&mut parent, node);
                }
                None => {
                    // Due to invariant #1, we can assume that our
                    // children can no longer be cancelled through us.
                    // (as we now have neither a parent nor handles)
                    // Therefore we can disconnect them.
                    disconnect_children(&mut node);
                }
            }
        });
    }
}

死锁避免

为了避免死锁,规定所有的lock 的 order 都是先拿到 parent 的 lock 再去拿 node 的 lock 。但是在 detach node 的时候,可能会有另外一个线程也在 detach,为了拿到一个正确的 parent,需要先拿 node 的 lock,再从 parent 上 detach,在这里拿了 node lock 再去拿 parent 的 lock 的时候可能会有死锁,所以要 trylock,如果有死锁的 Error,则把 node unlock,然后再去 lock parent。 unlock node 的时候 node 的 parent 可能被其他线程修改,所以再用 Arc::ptr_eq确认是否中间有其他线程修改 parent 。 说实话感觉这里写的有些复杂,容易出错,不知道有没有更好的写法。

fn with_locked_node_and_parent<F, Ret>(node: &Arc<TreeNode>, func: F) -> Ret
where
    F: FnOnce(MutexGuard<'_, Inner>, Option<MutexGuard<'_, Inner>>) -> Ret,
{
    use std::sync::TryLockError;

    let mut locked_node = node.inner.lock().unwrap();

    // Every time this fails, the number of ancestors of the node decreases,
    // so the loop must succeed after a finite number of iterations.
    loop {
        // Look up the parent of the currently locked node.
        let potential_parent = match locked_node.parent.as_ref() {
            Some(potential_parent) => potential_parent.clone(),
            None => return func(locked_node, None),
        };

        // Lock the parent. This may require unlocking the child first.
        let locked_parent = match potential_parent.inner.try_lock() {
            Ok(locked_parent) => locked_parent,
            Err(TryLockError::WouldBlock) => {
                drop(locked_node);
                // Deadlock safety:
                //
                // Due to invariant #2, the potential parent must come before
                // the child in the creation order. Therefore, we can safely
                // lock the child while holding the parent lock.
                let locked_parent = potential_parent.inner.lock().unwrap();
                locked_node = node.inner.lock().unwrap();
                locked_parent
            }
            // https://github.com/tokio-rs/tokio/pull/6273#discussion_r1443752911
            #[allow(clippy::unnecessary_literal_unwrap)]
            Err(TryLockError::Poisoned(err)) => Err(err).unwrap(),
        };

        // If we unlocked the child, then the parent may have changed. Check
        // that we still have the right parent.
        if let Some(actual_parent) = locked_node.parent.as_ref() {
            if Arc::ptr_eq(actual_parent, &potential_parent) {
                return func(locked_node, Some(locked_parent));
            }
        }
    }
}

如何做到同时被多个 Task poll

CancellationToken 使用tokio::sync::Notify作为通知机制,Notify 本身是 Send + Sync,并且Notified<'_> 也是 Send + Sync的,所以notify 是可以被多个future poll 的,具体可以参考 Notify 的实现,里面使用了 Atomic 和 Mutex 来保证状态的线程安全。

总结

  1. 如何解决循环引用带来的 Arc 的问题,常用的解法是通过把 Arc::downgrad 的方式,但 CancellationToken 的设计带来一个新的思路,TreeNode 中记录一个真正标记用户代码层面的引用计数,当引用技术为 0 的时候,从 tree 中删除parent->node的引用和 node->parent的引用,破坏掉循环引用之后,剩下的交给 Arc来释放。
  2. 设计Invariant 对于解决 Lock 的循环依赖问题,按照某个顺序访问依赖图,没有环,就没有死锁。
  3. CancellationToken的实现之所以复杂,是因为他的 API 设计,CancellationToken 是 Send 和 Sync 的,任何线程都可以并发的调用 CancellationToken的 API。导致每个 对 Tree 修改的 API 都需要加锁。