Rust 并发编程 (二)
原子
在 Rust 中,原子操作可以作为标准原子类型的方法使用,这些原子类型存在于 std::sync::atomic 中。它们的名称都以 Atomic 开头,例如 AtomicI32 或 AtomicUsize
每个原子操作都有一个 std::sync::atomic::Ordering 类型的参数,它决定了我们对操作的相对顺序有什么保证。保证最少的最简单的变体是 Relaxed 。 Relaxed 仍然保证单个原子变量的一致性,但不保证不同变量之间的相对操作顺序。
原子加载和存储操作
我们要看的前两个原子操作是最基本的: load 和 store 。它们的函数签名如下,以 AtomicI32 为例:
impl AtomicI32 {
pub fn load(&self, ordering: Ordering) -> i32;
pub fn store(&self, value: i32, ordering: Ordering);
}
load 方法以原子方式加载存储在原子变量中的值, store 方法以原子方式将新值存储在其中。请注意 store 方法是如何采用共享引用 ( &T ) 而不是独占引用 ( &mut T ),即使它修改了值。
示例:停止标志
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering::Relaxed;
fn main() {
static STOP: AtomicBool = AtomicBool::new(false);
// Spawn a thread to do the work.
let background_thread = thread::spawn(|| {
while !STOP.load(Relaxed) {
some_work();
}
});
// Use the main thread to listen for user input.
for line in std::io::stdin().lines() {
match line.unwrap().as_str() {
"help" => println!("commands: help, stop"),
"stop" => break,
cmd => println!("unknown command: {cmd:?}"),
}
}
// Inform the background thread it needs to stop.
STOP.store(true, Relaxed);
// Wait until the background thread finishes.
background_thread.join().unwrap();
}
示例:进度报告
use std::sync::atomic::AtomicUsize;
fn main() {
let num_done = AtomicUsize::new(0);
thread::scope(|s| {
// A background thread to process all 100 items.
s.spawn(|| {
for i in 0..100 {
process_item(i); // Assuming this takes some time.
num_done.store(i + 1, Relaxed);
}
});
// The main thread shows status updates, every second.
loop {
let n = num_done.load(Relaxed);
if n == 100 { break; }
println!("Working.. {n}/100 done");
thread::sleep(Duration::from_secs(1));
}
});
println!("Done!");
}
同步化(Synchronization)
fn main() {
let num_done = AtomicUsize::new(0);
let main_thread = thread::current();
thread::scope(|s| {
// A background thread to process all 100 items.
s.spawn(|| {
for i in 0..100 {
process_item(i); // Assuming this takes some time.
num_done.store(i + 1, Relaxed);
main_thread.unpark(); // Wake up the main thread.
}
});
// The main thread shows status updates.
loop {
let n = num_done.load(Relaxed);
if n == 100 { break; }
println!("Working.. {n}/100 done");
thread::park_timeout(Duration::from_secs(1));
}
});
println!("Done!");
}
我们已经通过 thread::current() 获得了主线程的句柄,后台线程在每次状态更新后都会使用这个句柄来取消主线程的停放。主线程现在使用 park_timeout 而不是 sleep ,这样它就可以被中断。
示例:惰性初始化(Lazy Initialization)
为了简单起见,我们假设 x 永远不会为零,这样我们就可以在计算之前使用零作为占位符。
use std::sync::atomic::AtomicU64;
fn get_x() -> u64 {
static X: AtomicU64 = AtomicU64::new(0);
let mut x = X.load(Relaxed);
if x == 0 {
x = calculate_x();
X.store(x, Relaxed);
}
x
}
获取和修改操作
现在我们已经看到了基本的 load 和 store 操作的一些用例,让我们继续进行更有趣的操作:获取和修改(fetch-and-modify)操作。这些操作修改原子变量,但也加载(获取)原始值,作为单个原子操作
impl AtomicI32 {
pub fn fetch_add(&self, v: i32, ordering: Ordering) -> i32;
pub fn fetch_sub(&self, v: i32, ordering: Ordering) -> i32;
pub fn fetch_or(&self, v: i32, ordering: Ordering) -> i32;
pub fn fetch_and(&self, v: i32, ordering: Ordering) -> i32;
pub fn fetch_nand(&self, v: i32, ordering: Ordering) -> i32;
pub fn fetch_xor(&self, v: i32, ordering: Ordering) -> i32;
pub fn fetch_max(&self, v: i32, ordering: Ordering) -> i32;
pub fn fetch_min(&self, v: i32, ordering: Ordering) -> i32;
pub fn swap(&self, v: i32, ordering: Ordering) -> i32; // "fetch_store"
}
下面是一个快速演示,展示了 fetch_add 如何在操作之前返回值:
use std::sync::atomic::AtomicI32;
let a = AtomicI32::new(100);
let b = a.fetch_add(23, Relaxed);
let c = a.load(Relaxed);
assert_eq!(b, 100);
assert_eq!(c, 123);
这些操作的返回值并不总是相关的。如果您只需要将操作应用于原子值,但对值本身不感兴趣,则完全可以忽略返回值。
示例:来自多个线程的进度报告
我们可以为每个线程使用单独的 AtomicUsize ,并将它们全部加载到主线程中,然后将它们相加,但更简单的解决方案是使用单个 AtomicUsize 来跟踪所有线程中已处理项目的总数。
fn main() {
let num_done = &AtomicUsize::new(0);
thread::scope(|s| {
// Four background threads to process all 100 items, 25 each.
for t in 0..4 {
s.spawn(move || {
for i in 0..25 {
process_item(t * 25 + i); // Assuming this takes some time.
num_done.fetch_add(1, Relaxed);
}
});
}
// The main thread shows status updates, every second.
loop {
let n = num_done.load(Relaxed);
if n == 100 { break; }
println!("Working.. {n}/100 done");
thread::sleep(Duration::from_secs(1));
}
});
println!("Done!");
}
示例:统计
继续这个通过原子报告其他线程正在做什么的概念,让我们扩展我们的示例,以收集和报告一些关于处理一个项目所花费的时间的统计数据。
fn main() {
let num_done = &AtomicUsize::new(0);
let total_time = &AtomicU64::new(0);
let max_time = &AtomicU64::new(0);
thread::scope(|s| {
// 四个后台线程处理 100 个项目,每个 25 个。
for t in 0..4 {
s.spawn(move || {
for i in 0..25 {
let start = Instant::now();
process_item(t * 25 + i); // 假设这需要一些时间。
let time_taken = start.elapsed().as_micros() as u64;
num_done.fetch_add(1, Relaxed);
total_time.fetch_add(time_taken, Relaxed);
max_time.fetch_max(time_taken, Relaxed);
}
});
}
// 主线程每秒显示一次状态更新。
loop {
let total_time = Duration::from_micros(total_time.load(Relaxed));
let max_time = Duration::from_micros(max_time.load(Relaxed));
let n = num_done.load(Relaxed);
if n == 100 { break; }
if n == 0 {
println!("Working.. nothing done yet.");
} else {
println!(
"Working.. {n}/100 done, {:?} average, {:?} peak",
total_time / n as u32,
max_time,
);
}
thread::sleep(Duration::from_secs(1));
}
});
println!("Done!");
}
比较和交换操作
最先进和灵活的原子操作是比较和交换操作。此操作检查原子值是否等于给定值,只有在这种情况下,它才会用新值替换它,所有操作都以原子方式进行。它会返回之前的值并告诉我们它是否替换了它。
它的签名比我们目前看到的要复杂一些。以 AtomicI32 为例,它看起来是这样的:
impl AtomicI32 {
pub fn compare_exchange(
&self,
expected: i32,
new: i32,
success_order: Ordering,
failure_order: Ordering
) -> Result<i32, i32>;
}
暂时忽略内存顺序,它与以下实现基本相同,除了它都是作为单个不可分割的原子操作发生的:
impl AtomicI32 {
pub fn compare_exchange(&self, expected: i32, new: i32) -> Result<i32, i32> {
// 实际上,加载、比较和存储,
// 所有这些都是作为单个原子操作发生的。
let v = self.load();
if v == expected {
// 值符合预期。
// 替换它并报告成功。
self.store(new);
Ok(v)
} else {
// 该值不符合预期。
// 保持不变并报告失败。
Err(v)
}
}
}
使用它,我们可以从原子变量加载一个值,执行我们喜欢的任何计算,然后如果原子变量在此期间没有改变,则只存储新计算的值。如果我们把它放在一个循环中以在它确实发生变化时重试,我们可以使用它来实现所有其他原子操作,使它成为最通用的操作。
fn increment(a: &AtomicU32) {
let mut current = a.load(Relaxed); // 1
loop {
let new = current + 1; // 2
match a.compare_exchange(current, new, Relaxed, Relaxed) { // 3
Ok(_) => return, // 4
Err(v) => current = v, // 5
}
}
}
总结
原子操作是不可分割的;他们要么已经完全完成,要么尚未发生。
Rust 中的原子操作是通过 std::sync::atomic 中的原子类型完成的,例如 AtomicI32 。
并非所有原子类型都适用于所有平台。
当涉及多个变量时,原子操作的相对顺序很棘手。第 3 章中有更多内容。
简单的加载和存储非常适合非常基本的线程间通信,例如停止标志和状态报告。
惰性初始化可以作为一种竞争来完成,而不会导致数据竞争。
获取和修改操作允许进行一小组基本的原子修改,这在多个线程修改同一个原子变量时特别有用。
原子加法和减法在溢出时默默地环绕。
比较和交换操作是最灵活和通用的,并且是进行任何其他原子操作的构建块。
弱的比较和交换操作可能会稍微更有效率。