什么是 RCU
RCU (Read-Copy-Update) 是一种高效的同步机制,主要用于解决读多写少场景下的并发访问问题。它是 Linux 内核中广泛使用的同步技术,由 Paul McKenney 在 2001 年引入。RCU 允许多个读者无锁并发访问共享数据,同时写者可以并发修改数据,而不会导致数据不一致。
RCU 的核心思想
RCU 的核心思想可以概括为三个步骤:
- 读取 (Read): 读者通过 RCU 保护的指针访问数据结构,无需获取锁
- 复制 (Copy): 当需要修改数据时,写者先创建数据的一个副本并修改这个副本
- 更新 (Update): 写者通过原子操作更新指向数据的指针,使其指向修改后的副本
RCU 的特点与优势
- 读者零开销: 读操作不需要锁、内存屏障或原子操作,性能接近于无同步的单线程代码
- 读写并发: 读者和写者可以并发执行,没有互斥问题
- 可扩展性: 随着CPU核心数增加,RCU的性能扩展性良好
- 实时性: 实时系统中特别有用,因为读操作不会被阻塞
RCU 在读多写少场景表现优异,读者开销极低,同时允许与写操作并发执行。
RCU 更新过程
整体工作流程
图表组成部分
Reader1 和 Reader2:表示两个不同时间进入的读者线程
Writer:表示执行更新操作的写者线程
Memory:表示内存/共享数据区域
初始状态:
内存中有一个全局指针 ptr 指向数据对象 data_v1
工作流程详解
- 读取阶段:
Reader1 首先调用 rcu_read_lock() 进入读临界区
然后通过 rcu_dereference(ptr) 安全地获取指针副本
接着读取指针指向的数据内容
- 复制阶段:
与此同时,Writer 创建数据对象 data_v1 的副本 data_v2
Writer 修改 data_v2 的内容
此时 Reader1 仍在使用原始数据 data_v1
- 第二读者进入:
Reader2 调用 rcu_read_lock() 进入读临界区
同样通过 rcu_dereference(ptr) 获取指针副本
此时它看到的仍然是原始数据 data_v1
- 指针更新:
Writer 调用 rcu_assign_pointer(ptr, data_v2) 原子地更新指针
此时内存中的 ptr 已经指向新数据 data_v2
但已经获取了旧指针的 Reader1 和 Reader2 仍然可以继续访问 data_v1
- 宽限期开始:
Reader1 调用 rcu_read_unlock() 离开读临界区
Reader2 仍然在使用 data_v1 数据
Writer 调用 synchronize_rcu() 开始等待所有读者完成
- 宽限期结束:
Reader2 调用 rcu_read_unlock() 离开读临界区
所有使用旧数据的读者都已完成操作
Writer 确认宽限期结束
- 安全清理:
Writer 安全地调用 free(data_v1) 释放旧数据
此时不会有任何读者再使用 data_v1,避免了内存安全问题
内存状态
- 初始状态: 所有读者通过全局指针访问原始数据对象
在下图中,全局指针 ptr
指向数据对象 v1,读者通过这个指针访问数据。
- 创建新版本: 写者创建数据的副本并进行修改
- 发布新版本: 写者原子地更新全局指针指向新数据对象
- 等待宽限期: 写者等待所有现有读者完成访问
- 回收旧版本: 旧版本的数据对象被安全释放
RCU API 在 Linux 内核中的使用
Linux 内核中的 RCU API 主要包括:
读者接口
/* 进入 RCU 读临界区 */
rcu_read_lock();
/* 安全地访问 RCU 保护的指针 */
p = rcu_dereference(ptr);
/* 在读临界区内可以安全访问 p 指向的数据 */
/* 退出 RCU 读临界区 */
rcu_read_unlock();
/* 其他读者接口变种 */
rcu_read_lock_bh(); /* 禁止底半部并进入读临界区 */
rcu_read_unlock_bh(); /* 退出读临界区并启用底半部 */
rcu_read_lock_sched(); /* 禁止抢占并进入读临界区 */
rcu_read_unlock_sched(); /* 退出读临界区并启用抢占 */
写者接口
/* 原子地更新 RCU 保护的指针 */
rcu_assign_pointer(ptr, new_data);
/* 同步 RCU - 阻塞等待所有读临界区完成 */
synchronize_rcu();
/* 异步回调释放 - 不会阻塞 */
call_rcu(&old_data->rcu_head, callback_function);
/* 延迟释放内存 (异步, call_rcu 的便捷封装) */
kfree_rcu(old_data, rcu_head);
/* 替换 RCU 保护的指针并返回旧指针 */
old_p = rcu_replace_pointer(ptr, new_data, lockdep_is_held(&lock));
使用示例
展开查看示例代码
struct foo {
int a;
char b;
struct rcu_head rcu; /* 用于 RCU 宽限期回调 */
};
/* RCU 保护的全局指针 - 带 __rcu 注解 */
struct foo __rcu *glob_ptr = NULL;
/* 读者函数 - 安全读取数据 */
void reader_function(void)
{
struct foo *p;
int a;
/* 进入 RCU 读临界区 */
rcu_read_lock();
/* 安全地获取指针 - 使用 rcu_dereference 进行内存屏障和编译器优化控制 */
p = rcu_dereference(glob_ptr);
if (p) {
/* 可以安全地访问 p 指向的数据 */
a = p->a;
/* 使用 a 做一些操作... */
printk(KERN_INFO "Read value: %d\n", a);
}
/* 退出 RCU 读临界区 */
rcu_read_unlock();
/* 在这里不能再访问 p 指向的数据 */
}
/* 仅进行指针检查,不访问数据 */
bool has_data(void)
{
/* 可以直接使用 rcu_access_pointer 进行简单检查而不访问数据 */
return rcu_access_pointer(glob_ptr) != NULL;
}
/* 写者回调函数 - 用于异步释放 */
static void rcu_free_callback(struct rcu_head *rcu)
{
/* container_of 宏获取包含 rcu_head 的 foo 结构体指针 */
struct foo *fp = container_of(rcu, struct foo, rcu);
/* 安全释放内存,此时宽限期已结束 */
kfree(fp);
}
/* 创建并发布新数据 */
int writer_update_function(int new_a, char new_b)
{
struct foo *new_fp, *old_fp;
/* 创建新数据结构 */
new_fp = kmalloc(sizeof(*new_fp), GFP_KERNEL);
if (!new_fp)
return -ENOMEM; /* 内核风格的错误处理 */
/* 更新新结构 */
new_fp->a = new_a;
new_fp->b = new_b;
/*
* 对于多个写者场景,需要写者互斥锁
* 但读者不需要获取此锁
*/
spin_lock(&writer_lock);
/* 获取旧指针 */
old_fp = rcu_dereference_protected(glob_ptr,
lockdep_is_held(&writer_lock));
/* 原子地更新全局指针,使其指向新数据 */
rcu_assign_pointer(glob_ptr, new_fp);
spin_unlock(&writer_lock);
/*
* 此时, 新读者会看到 new_fp,
* 而老读者会继续使用 old_fp
*/
/* 方法1: 同步等待所有读者完成 (阻塞) */
if (old_fp) {
/* 此函数会阻塞直到所有已存在的读临界区结束 */
synchronize_rcu();
/* 所有读临界区结束后,安全释放旧数据 */
kfree(old_fp);
}
return 0;
}
/* 使用异步释放的更新函数变体 */
int writer_update_async(int new_a, char new_b)
{
struct foo *new_fp, *old_fp;
/* 创建新数据结构 */
new_fp = kmalloc(sizeof(*new_fp), GFP_KERNEL);
if (!new_fp)
return -ENOMEM;
/* 更新新结构 */
new_fp->a = new_a;
new_fp->b = new_b;
spin_lock(&writer_lock);
old_fp = rcu_dereference_protected(glob_ptr,
lockdep_is_held(&writer_lock));
rcu_assign_pointer(glob_ptr, new_fp);
spin_unlock(&writer_lock);
/* 方法2: 异步释放 (非阻塞) */
if (old_fp)
call_rcu(&old_fp->rcu, rcu_free_callback);
return 0;
}
/* 使用 kfree_rcu 的简化版本 */
int writer_update_simple(int new_a, char new_b)
{
struct foo *new_fp, *old_fp;
/* 创建新数据结构 */
new_fp = kmalloc(sizeof(*new_fp), GFP_KERNEL);
if (!new_fp)
return -ENOMEM;
/* 更新新结构 */
new_fp->a = new_a;
new_fp->b = new_b;
spin_lock(&writer_lock);
old_fp = rcu_dereference_protected(glob_ptr,
lockdep_is_held(&writer_lock));
rcu_assign_pointer(glob_ptr, new_fp);
spin_unlock(&writer_lock);
/* 方法3: 便捷的异步释放方式 */
if (old_fp)
kfree_rcu(old_fp, rcu);
return 0;
}
/* 删除数据示例 */
void delete_function(void)
{
struct foo *old_fp;
spin_lock(&writer_lock);
old_fp = rcu_dereference_protected(glob_ptr,
lockdep_is_held(&writer_lock));
if (!old_fp) {
spin_unlock(&writer_lock);
return;
}
/* 将指针设为 NULL */
rcu_assign_pointer(glob_ptr, NULL);
spin_unlock(&writer_lock);
/* 等待宽限期结束后再释放内存 */
synchronize_rcu();
kfree(old_fp);
}
RCU 与其他机制的比较
同步机制 | 读开销 | 写开销 | 读写并发 | 可扩展性 | 适用场景 |
---|---|---|---|---|---|
RCU | 极低 | 中等 | 支持 | 极好 | 读多写少 |
读写锁 | 低 | 高 | 读读并发 | 一般 | 读多写少场景 |
自旋锁 | 中 | 中 | 不支持 | 较差 | 短时间保护 |
互斥锁 | 高 | 高 | 不支持 | 较差 | 需要阻塞处理场景 |