4.6.并发

并发与并行是计算机科学中异常重要的两个主题,并且在当今生产环境中也十分热门。计算机正拥有越来越多的核心,然而很多程序员还没有准备好去完全的利用它们。

Rust的内存安全功能也适用于并发环境。甚至并发的Rust程序也会是内存安全的,并且没有数据竞争。Rust的类型系统也能胜任,并给你强大的能力在编译时推论出并发代码。

在我们讨论Rust提供的并发功能之前,理解一些问题是很重要的:Rust非常底层以至于所有这些都是由标准库,而不是由语言提供的。这意味着如果你不喜欢一些Rust处理并发的方面,你可以自己实现一个。mio是现实生活中这个原则的实践。

背景:SendSync

并发难以推理。在Rust中,我们有一个强大,静态类型系统来帮助我们推理我们的代码。Rust自身提供了两个特性来帮助我们理解可能是并发的代码的意思。

Send

第一个我们要谈到的特性是Send。当一个T类型实现了Send,它向编译器表明这个类型的所有权可以在进程间安全的转移。

强制这个限制是很重要的。例如,我们有一个连接两个线程的通道,我们想要能够向通道发送些数据到另一个线程。因此,我们要确保这个类型实现了Send

想反,如果我们通过FFI封装了一个不是线程安全的库,我们并不想实现Send,并且因此编译器会帮助我们强制它不会离开当前线程。

Sync

第二个特性是Sync。当一个类型T实现了Sync,它向编译器表明这个类型在多线程并发时没有导致内存不安全的可能性。

例如,使用一个原子引用来共享可变数据是线程安全的。Rust提供了一个这样的类型,Arc,并且它实现了Sync,所以它可以安全的在线程间共享。

这两个特性允许你使用类型系统来确保你代码在并发环境的特性。在我们演示为什么之前,我们需要先学会如何创建一个并发Rust程序!

线程

Rust标准库提供了一个“线程”库,它允许你并行的执行Rust代码。这是一个使用std::thread的基本例子:

use std::thread;

fn main() {
    thread::scoped(|| {
        println!("Hello from a thread!");
    });
}

Thread::scoped()方法接受一个闭包,它将会在一个新线程中执行。它叫做scoped因为这个线程返回一个联合守护(join guard):

use std::thread;

fn main() {
    let guard = thread::scoped(|| {
        println!("Hello from a thread!");
    });

    // guard goes out of scope here
}

guard离开作用域,它会阻塞执行直到线程结束。如果我们不想要这个行为,我们可以使用thread::spawn()

use std::thread;
use std::old_io::timer;
use std::time::Duration;

fn main() {
    thread::spawn(|| {
        println!("Hello from a thread!");
    });

    timer::sleep(Duration::milliseconds(50));
}

我们需要在这里sleep是因为当main()结束,它会杀死所有正在执行的线程。

scoped有一个有意思的类型标记:

fn scoped<"a, T, F>(self, f: F) -> JoinGuard<"a, T>
    where T: Send + "a,
          F: FnOnce() -> T,
          F: Send + "a

特别的,F,那个我们传递到新线程执行的闭包。它有两个限制:它必须是一个从()TFnOnceFnOnce允许这个闭包从父线程获取任何它提到的数据。另一个限制是F必须实现Send。我们不能传递这个所有权除非这个类型认为这是可以的。

很多语言有多线程执行的能力,不过是很不安全的。这里有完整的书籍关于如何避免在共享可变状态下出现错误。Rust也用它的类型系统帮助我们,通过在编译时避免数据竞争。让我们具体讨论下如何在线程间共享数据。

安全的共享可变状态(Safe Shared Mutable State)

根据Rust的类型系统,我们有个听起来很假的概念叫做:“安全的共享可变状态”。很多程序猿都同意共享可变状态是灰常,灰常坏的。

有人曾说道:

共享可变状态是一切罪恶的根源。大部分语言尝试解决这个问题的“可变”部分,而Rust则尝试解决“共享”部分。

同样所有权系统也通过防止不当的使用指针来帮助我们排除数据竞争,最糟糕的并发bug之一。

作为一个例子,这是一个在很多语言中会产生数据竞争的Rust版本程序。它不能编译:

use std::thread;

fn main() {
    let mut data = vec![1u32, 2, 3];

    for i in 0..3 {
        thread::spawn(move || {
            data[i] += 1;
        });
    }

    thread::sleep_ms(50);
}

这会给我们一个错误:

8:17 error: capture of moved value: `data`
        data[i] += 1;
        ^~~~

在这个例子中:我们知道我们的代码_应该_是安全的,不过Rust并不确定。并且它确实不安全:如果每个线程中都有一个data的引用,并且这些线程获取了引用的所有权,我们就有了3个所有者!这是很糟糕的。我们可以用Arc<T>类型来修复它,它是一个原子引用计数的指针。“原子”部分是指它可以安全的跨线程共享。

Arc<T>假设它的内容有另一个属性来确保它可以跨线程共享:它假设它的内容实现了Sync。不过在我们的例子中,我们想要可以改变它的值。我们需要一个同一时间只有一个人可以修改它的值的类型。为此,我们可以使用Mutex<T>类型。下面是我们代码的第二版。它还是不能工作,不过是因为另外的原因:

use std::thread;
use std::sync::Mutex;

fn main() {
    let mut data = Mutex::new(vec![1u32, 2, 3]);

    for i in 0..3 {
        let data = data.lock().unwrap();
        thread::spawn(move || {
            data[i] += 1;
        });
    }

    thread::sleep_ms(50);
}

下面是错误:

:9:9: 9:22 error: the trait `core::marker::Send` is not implemented for the type `std::sync::mutex::MutexGuard>` [E0277]
:11         thread::spawn(move || {
                  ^~~~~~~~~~~~~
:9:9: 9:22 note: `std::sync::mutex::MutexGuard>` cannot be sent between threads safely
:11         thread::spawn(move || {
                  ^~~~~~~~~~~~~

你看,Mutex有一个有如下标记的lock方法:

fn lock(&self) -> LockResult>

因为MutexGuard<T>没有实现Send,我们不能传送守护穿过线程边界,它给了我们这个错误。

我们可以用Arc<T>修复这个问题。下面是一个可以工作的版本:

use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    let data = Arc::new(Mutex::new(vec![1u32, 2, 3]));

    for i in 0..3 {
        let data = data.clone();
        thread::spawn(move || {
            let mut data = data.lock().unwrap();
            data[i] += 1;
        });
    }

    thread::sleep_ms(50);
}

我们现在在Arc上调用clone(),它增加了其内部计数。这个句柄接着被移动到新线程。让我们更仔细的检查一个线程代码:

thread::spawn(move || {
    let mut data = data.lock().unwrap();
    data[i] += 1;
});

首先,我们调用lock(),它获取了互斥锁。因为这可能失败,它返回一个Result<T, E>,并且因为这仅仅是一个例子,我们unwrap()结果来获得一个数据的引用。现实代码在这里应该有更健壮的错误处理。下面我们可以随意修改它,因为我们持有锁。

然而,定时器部分显得有点奇怪。我们选择等待了一个合理的时间,不过这也完全有可能我们等了太久,我们可以等更少的时间。也有可能我们等了太短,这样我们并没有实际完成计算。

Rust的标准库提供了更多同步两个线程的机制。让我们看看其中一个:通道。

通道(Channels)

下面是我们代码使用通道同步的版本,而不是等待特定时间:

use std::sync::{Arc, Mutex};
use std::thread;
use std::sync::mpsc;

fn main() {
    let data = Arc::new(Mutex::new(0u32));

    let (tx, rx) = mpsc::channel();

    for _ in 0..10 {
        let (data, tx) = (data.clone(), tx.clone());

        thread::spawn(move || {
            let mut data = data.lock().unwrap();
            *data += 1;

            tx.send(());
        });
    }

    for _ in 0..10 {
        rx.recv();
    }
}

我们使用mpsc::channel()方法创建了一个新的通道。我们仅仅向通道中send了一个简单的(),然后等待它们10个都返回。

因为这个通道只是发送了一个通用信号,我们也可以通过通道发送任何实现了Send的数据!

use std::thread;
use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel();

    for _ in 0..10 {
        let tx = tx.clone();

        thread::spawn(move || {
            let answer = 42u32;

            tx.send(answer);
        });
    }

   rx.recv().ok().expect("Could not receive answer");
}

u32实现了Send因为我们可以拷贝它。所以我们创建了一个线程,让它计算结果,然后通过通道send()给我们结果。

恐慌(Panics)

panic!会使当前执行线程崩溃。你可以使用Rust的线程来作为一个简单的隔离机制:

use std::thread;

let result = thread::spawn(move || {
    panic!("oops!");
}).join();

assert!(result.is_err());

我们的Thread返回一个Result,它允许我们检查我们的线程是否发生了恐慌。

文章导航