C++ - Multithread

Table of Contents

背景

C++98/03不支持多线程编程, 所以必须借助第三方库或目标操作系统中的多线程API. 这使得编写跨平台的多线程程序很困难. C++11引入了标准的多线程库, 使得多线程程序跨平台更加容易. 目前的C++11标准仅针对CPU, 不支持GPU.

多线程编程的难点之一是将算法并行化, 这个过程和算法的类型高度相关. 此外, 要防止竞争条件和死锁, 并考虑缓存的一致性.

竞争条件
当多个线程要读/写共享内存位置时, 可能会发生竞争条件.
死锁
线程因为等待另一个阻塞线程锁定的资源而造成的无限阻塞.
撕裂
部分数据已写入内存, 但还有部分数据没有写入. 如果另一个线程在此时读取数据, 就会得到不一致的数据.
缓存的一致性
如果一个线程写入了一段数据, 该线程会立即看到新数据, 但这并不意味着所有线程都会立即看到新数据. 所以, 在读写多个线程时, 即使是最简单的数据类型, 也需要同步.

线程

C++线程库定义在头文件 <thread> 中.

通过函数指针创建线程

标准C++的 thread 类的构造函数是一个可变参数模板, 可以接受任意数目的参数. 第一个参数为函数名称, 其余参数为函数的参数. 如

#include <thread>

using namespace std;

void f(int i, double j, bool k);

int main(int argc, char *argv[])
{
    thread t(f, 1, 2.0, true);
    t.join();
    return 0;
}

默认情况下, 从不同线程中访问cout是线程安全的, 没有任何数据竞争, 除非在第一个输出或输入操作之前调用了 cout.sync_with_stdio(false).

通过函数对象创建线程

使用函数指针技术, 向线程传递信息的唯一方式是向函数传递参数. 而使用函数对象, 可向函数对象类添加成员变量, 并可以采用任何方式初始化和使用这些变量. 如

#include <thread>

using namespace std;

class A
{
public:
    A(int i, double j, bool k);
    void operator()(int i);
};

int main(int argc, char *argv[])
{
    thread t(A(1));
    t.join();
    return 0;
}

函数对象总是复制到线程的某个内部存储中. 如果要在函数对象的某个特定实例上执行 operator() 而非进行复制, 那么应该使用头文件 <functional> 中的 std::ref()std::cref() 传入该实例的引用或常引用.

通过lambda表达式创建线程

#include <thread>

using namespace std;

int main(int argc, char *argv[])
{
    int i = 3;
    thread t([&i]{i += 1;});
    t.join();
    return 0;
}

通过成员函数创建线程

第一个参数为成员函数指针, 第二个参数为类对象指针, 其余参数为成员函数的参数.

#include <thread>

using namespace std;

class A
{
public:
    A(int i, double j, bool k);
    void process(int i);
};

int main(int argc, char *argv[])
{
    A a(1, 2.0, true);
    thread t{&A::process, &a, 3};
    return 0;
}

线程本地存储

通过关键字 thread_local, 可将任何变量标记为线程本地数据, 即每个线程都有这个变量的独立副本, 而且这个变量能在线程的整个生命周期中持续存在. 对于每个线程, 该变量正好初始化一次.

如果 thread_local 变量在函数作用域中声明, 那么这个变量的行为和声明为静态变量是一致的, 只不过每个线程都有自己独立的副本, 而且不论这个函数在线程中调用多少次, 每个线程只初始化一次这个变量.

以下代码中, 多个线程共享唯一的j副本, 而每个线程都有自己的i副本.

thread_local int i;
int j;
void process();

取消线程

标准没有在一个线程中取消另一个线程的机制. 实现这一目标的最好方法是提供两个线程都支持的某种通信机制. 最简单的机制是有一个共享变量, 目标线程定期检查这个变量, 判断是否应该终止. 其他线程可以设置这个共享变量, 间接指示线程关闭. 应避免读写这个共享变量带来的竞争条件和缓存一致性问题.

从线程获得结果

通过 future 能够方便地从线程获得结果, 也能方便地处理线程中发生的错误.

复制和重新抛出异常

每个线程都可以抛出自己的异常, 但它们必须在自己的线程内捕获异常. 一个线程抛出的异常不能在另一个线程中捕获. 使用 future 可以自动在线程之间传递异常.

原子操作

原子类型允许原子访问, 这意味着不需要额外的同步机制就可以执行并发的读写操作. 使用原子类型需要包含头文件 <atomic>. 标准为所有基本类型定义了相应的原子类型.

定义的原子类型 等效原子类型
atomic_bool atomic<bool>
atomic_char atomic<char>
atomic_schar atomic<signed char>
atomic_uchar atomic<unsigned char>
atomic_int atomic<int>
atomic_uint atomic<unsigned int>
atomic_long atomic<long>
atomic_ulong atomic<unsigned long>
atomic_llong atomic<long long>
atomic_ullong atomic<unsigned long long>
atomic_wchar_t atomic<wchar_t>

在多线程中访问一段数据时, 原子也可以解决缓存一致性, 内存排序, 编译器优化等问题. 基本上, 不使用原子或显式的同步机制, 就不可能安全地在多线程中读写同一段数据.

  • atomic<T>::fetch_add() 获取一个原子类型的当前值, 将给定的递增值添加到这个原子值, 然后返回原始的未递增的值.
  • 原子类型支持原子操作: fetch_add(), fetch_sub(), fetch_and(), fetch_or(), fetch_xor(), ++, --, +=, -=, &=, ^=|=.
  • 原子指针类型支持原子操作: fetch_add(), fetch_sub(), ++, --, +=, -=.

互斥

互斥体类

  • 与互斥相关的类都定义在头文件 <mutex> 中, 而且都在 std 名字空间中.
  • 不要在任何互斥体类上手动调用下列锁定和解锁方法. 与所有资源一样, 互斥锁是资源, 它们几乎总是应该使用 resource acquisition is initialization, RAII 范例获得. 标准定义了一些RAII锁定类. 使用它们对避免死锁很重要. 锁对象离开作用域时, 它们会自动释放互斥体, 所以不需要手动调用 unlock().

非定时的互斥体类

标准库中有两个互斥体类:

std::mutex
标准的具有排他所有权语义的互斥体类.
std::recursive_mutex
已经获得递归互斥体所有权的线程可以再次调用 lock()try_lock(). 调用线程调用 unlock() 的次数应该等于获得这个递归互斥体锁的次数.

这两个互斥体类都支持下列方法:

lock()
调用线程将尝试获得锁, 并且阻塞直到获得锁. 这个方法会无限期地阻塞. 如果希望设置线程阻塞的最长时间, 应该使用定时的互斥体类.
try_lock()
调用线程将尝试获得锁. 如果当前锁被其他线程持有, 这个调用会立即返回. 如果成功获得锁, 返回true, 否则返回false.
unlock()
释放由调用线程持有的锁, 使另一个线程能够获得锁.

定时的互斥体类

标准库提供了三个定时的互斥体类:

std::timed_mutex
已经拥有timedmutex的线程不允许再次调用这个互斥体上的 try_lock_for(rel_time)try_lock_until(abs_time). 否则可能会导致死锁.
std::recursive_timed_mutex
已经拥有递归互斥体的线程允许再次对同一个互斥体调用锁定 try_lock_for(rel_time)try_lock_until(abs_time). 调用线程调用 unlock() 的次数应该等于获得这个递归互斥体锁的次数.
std::shared_timed_mutex
支持共享锁(也叫读写锁)拥有权的概念. 线程可以获得锁的独占拥有权和共享拥有权. 独占拥有权也称为写入锁, 只有在其他线程都没有独占或共享拥有权的情况下, 才能获得独占拥有权. 共享拥有权也成为读取锁, 如果其他线程都没有独占拥有权, 但允许其他线程获得共享拥有权, 就可以获得共享拥有权. 已经拥有 shared_timed_mutex 的线程不允许再次尝试获得这个互斥体的锁. 否则可能会导致死锁. 特别地, 该互斥体类还拥有如下与共享拥有权相关的方法:
lock_shared()
调用线程尝试获得共享拥有权锁, 并且阻塞直到获得锁.
try_lock_shared()
调用线程尝试获得共享拥有权锁. 如果独占锁当前被其他线程持有, 这个调用会立即返回false; 如果成功获得锁, 这个调用会返回true.
try_lock_shared_for(rel_time)
调用线程尝试在给定的相对时间内获得共享拥有权锁. 如果在给定超时之前不能获得这个锁, 这个调用失败并且返回false; 如果在给定超时之前获得了这个锁, 这个调用成功并且返回true.
try_lock_shared_until(abs_time)
调用线程将尝试获得共享拥有权锁, 直到系统时间等于或超过指定的绝对时间. 如果可以在此时间之前获得这个锁, 调用返回true. 如果系统时间超过了给定的绝对时间, 则函数不再尝试获得锁, 并返回false.
unlock_shared()
释放共享拥有权.

这三个互斥体类除了支持普通 lock(), try_lock()unlock() 方法外, 还支持以下方法:

try_lock_for(rel_time)
调用线程尝试在给定的相对时间内获得这个锁. 如果在给定超时之前不能获得这个锁, 这个调用失败并且返回false; 如果在给定超时之前获得了这个锁, 这个调用成功并且返回true.
try_lock_until(abs_time)
调用线程将尝试获得锁, 直到系统时间等于或超过指定的绝对时间. 如果可以在此时间之前获得这个锁, 调用返回true. 如果系统时间超过了给定的绝对时间, 则函数不再尝试获得锁, 并返回false.

锁类是一个RAII类, 可以更方便正确地获得和释放互斥体上的锁. 锁类的构造函数会自动获得给定的互斥体; 锁类的析构函数会自动释放关联的互斥体.

lock_guard

简单的锁, 其构造函数接受一个互斥体引用, 尝试获得互斥体的锁, 并且阻塞直到获得锁.

unique_lock

较复杂的锁, 它允许将获得锁的时间延迟到计算需要时, 远在声明之后.

shared_lock

在底层的共享互斥体上调用与共享拥有权相关的方法, 其构造函数和方法与 unique_lock 相同, 但获得是共享锁, 而不是独占锁.

获得多个锁

C++有两个锁泛型函数, 定义在名字空间std中, 可用于同时获得多个互斥体对象上的锁, 而不会出现死锁.

template <class L1, class L2, class... L3> void lock(L1&, L2&, L3&...);
按未指定的顺序锁定所有给定的互斥体对象, 而不会出现死锁. 如果一个互斥锁调用抛出异常, 那么会对所有已经获得的锁调用 unlock().
template <class L1, class L2, class... L3> int try_lock(L1&, L2&, L3&...);
按照顺序调用每个给定互斥体对象的 try_lock(), 试图获得所有互斥体对象的锁. 如果所有互斥体对象的 try_lock() 调用都成功, 那么这个函数返回-1. 如果任何互斥体对象的 try_lock() 调用失败, 该函数返回调用失败互斥体的位置索引 (从0开始), 并对所有已经获得的锁调用 unlock().

std::call_once()

结合使用 std::call_once()std::once_flag 可以确保某个函数或方法正好只调用一次, 不论有多少个线程试图调用 call_once() 都是如此. 只有一个 call_once() 调用能真正调用给定函数或方法; 这个调用称为有效的 call_once() 调用. 某个特定的 once_flag 实例的有效调用在对同一个 once_flag 实例的所有后续调用之前完成. 在同一个 once_flag 实例上调用 call_once() 的其他线程都会阻塞, 直到有效调用结束.

条件变量

条件变量允许一个线程阻塞, 直到另一个线程设置了某个条件或系统时间到达了某个指定的时间. 条件变量允许显式的线程间通信. 头文件 <condition_variable> 中定义了如下两类条件变量. 等待条件变量的线程可以在另一个线程调用 notify_one()notify_all() 时醒过来, 或者在系统时间超过给定的时间时醒过来, 也可能不合时宜地醒过来. 这意味着即使没有其他线程调用任何通知方法, 线程也会醒过来. 因此, 当线程等待一个条件变量并醒过来时, 就需要检查它是否是因为获得通知而醒过来. 一种检查方法是使用接受谓词参数的 wait(), wait_for(), 或 wait_until().

std::condition_variable

只能等待 unique_lock<mutex> 的条件变量. 支持以下方法:

notify_one()
唤醒等待这个条件变量的线程之一.
notify_all()
唤醒所有等待这个条件变量的线程.
wait(unique_lock<mutex> &lk)
调用 wait() 的线程应该已经获得了lk的锁. 调用 wait() 的效果是以原子方式调用 lk.unlock() 并阻塞线程, 等待通知. 当线程被另一个线程中的 notify_one()notify_all() 调用解除阻塞时, 这个函数会再次调用 lk.lock(), 可能会被这个锁阻塞后返回. 该函数还可以额外接受一个谓词参数.
wait_for(unique_lock<mutex> &lk, const chrono::duration<Rep, Period> &rel_time)
类似于 wait() 方法, 区别在于这个线程会被 notify_one(), notify_all() 调用解除阻塞, 也可能在给定超时时间到达后解除阻塞. 该函数还可以额外接受一个谓词参数.
wait_until(unique_lock<mutex> &lk, const chrono::time_point<Clock, Duration> &abs_time)
类似于 wait() 方法, 区别在于这个线程会被 notify_one(), notify_all() 调用解除阻塞, 也可能在系统时间超过给定的绝对时间时解除阻塞. 该函数还可以额外接受一个谓词参数.

std::condition_variable_any

可以等待任何对象的条件变量, 包括自定义的锁类型. 该类支持的方法和 condition_variable 类相同, 区别在于 condition_variable_any 可以接受任何类型的锁类, 而不只是 unique_lock<mutex>. 锁类应有 lock()unlock() 方法.

future

当然, 应该总是尝试在线程本身中处理异常, 不应该让异常离开线程. std::future 能够将未捕获到的异常转移到另一个线程中, 然后另一个线程可以任意处置这个异常.

结合使用 std::futurestd::promise 更容易取得同一个线程中或另一个线程中的函数返回结果. 一旦在同一线程中或在另一个线程中运行的函数计算出希望返回的结果, 就把这个结果放在promise中. 然后可以通过future来获取这个值. 可将future/promise对想象为线程间传递结果的通信信道: promise是结果的输入端; future是输出端. 使用 std::packaged_task 可以简化这个任务, std::packaged_task 自动将future和promise联系在一起, 其接受一个要执行的函数, 并返回可以用于检索结果的future.

如果想让C++运行时更多地控制是否创建一个线程进行某个计算, 可以使用 std::async(). 调用async()锁返回的future会在其析构函数中阻塞, 知道结果可用为止. async可以通过两种方法调用提供的函数:

  • 创建一个新的线程, 异步运行提供的函数.
  • 在返回的future上调用 get() 方法时运行提供的函数.

如果没有通过额外参数来调用async(), 运行时库会根据一些因素(如系统中处理器的数目和并行数目)从两种方法中自动选择一种方法. 也可以指定 launch::async (创建新线程)或 launch::deferred (使用当前线程)策略参数, 强行运行时分别选择第一种方法或第二种方法.