Tokio Cancellation Token
CancellationToken的 API 设计
Tokio的 CancellationToken 是用来取消一个 Tokio Task 执行的数据结构,区别于使用channel来取消 task 执行的方式,cancellation token 可以提供一种结构化的 cancel 方式。例如在一个 tokio task A 中 spawn 了若干个子 task,那么希望在 taskA cancel 的同时也 cancel 所有的子 task ,这时候就可以用 CancellationToken。
CancellationToken 的 基本 API 如下
/// 创建一个 child token
;
/// cancel 自己以及所有自己的 children
;
/// 返回一个 future,等待自己被 cancelled 。
/// 自己被 cancel 可以是自己的 token 调用了 cancel 或者自己的 parent 调用了 cancel
;
/// 消费掉自己,返回一个带所有权的 future,这样能避免一些生命周期的问题
;
/// 将自己的所有权转移给 DropGuard,当 DropGuard 的时候调用 cancel 同时 cancel所有的 children
;
所以 CancellationToken的 API 主要做三件事:
- 创建 children 的 token
- 当调用 cancel 的时候cancel 自己和 children,这里应该涉及到tree 的后序遍历
- 被 spawn 出去的 task 可以拿到 token,调用 cancelled 来等待被 cancel 。
区别于 Channel 的方法,把 sender 和 receiver 独立开,CancellationToken 本身是 Send + Sync 的。多个 Task 可以同时 cancel 一个 token 或者同时.await一个 Token 被 cancel。
所以 CancellationToken 的设计需要是
- 维护一个 tree 的结构来记录token 之间的关系
- CancellationToken 是 Send 和 Sync 的,多个 task可以同时对tree进行修改,所以内部需要是一个 Arc<Mutex<>>的结构
在这里有两个疑问,
- 如何维护这个 tree 的结构,除了记录 children 是否要记录 parent,如果记录 parent,同时有 Arc,是否会带来循环引用?
- 如果 token 被 drop 了,那么这个 token 的 children 是否会出现 orphaned 的情况,永远不会被 cancel?
- tree结构,同时有 Mutex,如何保证不死锁?
- 如果 cancelled 可以被多个 task 同时 poll,那么 cancelled 的返回的类型需要是 Send + Sync 的,这里如何实现?
带着这几个问题我们看下 CancellationToken 的代码。
结构定义
CancellationToken里面是一个 Arc 对同一个 TreeNode 的 cancel 和 cancelled 需要有唤醒机制,因此TreeNode中需要有一个 waker 。 如何维护 tree 的结构? 对 treenode 的修改,例如添加children,删除 treenode 节点等主要是 Inner 数据结构来维护。 这里有个疑问,为什么不把 waker 放到 inner 中,而是单独提出来?放到 inner 中就有一个 mutex 包着的问题。 另外一个问题,Inner 中为什么需要保留对 Parent 的 Arc?这样不会有Arc 的循环引用么? 众所周知在 Rust 中实现一个支持 Remove 功能的 Tree 是非常困难的 先看下 TreeNode 如何解决循环引用的问题。 这里本质上是要实现一个 concurrent 的 tree 结构 一般对于这种有环状依赖的关系,例如自引用或者其他,在 rust 中都要保持某种不变性,invariants, 例如数据访问或者其他,来约束数据的访问和修改,将这种环状依赖 break 掉。 CancellationToken 的 ownership? lib 的设计是,CancellationToken 是可以被 Clone 的,但要保证指向相同的 TreeNode,所以要有一个引用计数。 但是当 CancellationToken 在被 Drop 的时候,如果引用计数为 0,则需要 DropTreeNode Drop TreeNode 希望能够保证 TreeNode 的 Children 依然可以被当前 node 的 parent cancel 掉, 在 CancellationToken 中的做法是,将 children 挂在当前 node 的 parent 上。 问题来了,为什么在 Inner 中要维护一个 handle_count,而不是直接实现 TreeNode 的 Drop? TreeNode只有当 Arc 的计数为 0 的时候才会触发。这里有个问题, 如果是 CancellationHandle 的 Arc 主要是 Arc 我们来看下 CancellationToken 的做法 再看下 decrease_handle_refcount函数 为了避免死锁,规定所有的lock 的 order 都是先拿到 parent 的 lock 再去拿 node 的 lock 。但是在 detach node 的时候,可能会有另外一个线程也在 detach,为了拿到一个正确的 parent,需要先拿 node 的 lock,再从 parent 上 detach,在这里拿了 node lock 再去拿 parent 的 lock 的时候可能会有死锁,所以要 trylock,如果有死锁的 Error,则把 node unlock,然后再去 lock parent。 unlock node 的时候 node 的 parent 可能被其他线程修改,所以再用 Arc::ptr_eq确认是否中间有其他线程修改 parent 。 说实话感觉这里写的有些复杂,容易出错,不知道有没有更好的写法。 CancellationToken 使用tokio::sync::Notify作为通知机制,Notify 本身是 Send + Sync,并且Notified<'_> 也是 Send + Sync的,所以notify 是可以被多个future poll 的,具体可以参考 Notify 的实现,里面使用了 Atomic 和 Mutex 来保证状态的线程安全。
如何解决 TreeNode 的循环引用?
pub
死锁避免
如何做到同时被多个 Task poll
总结