共享数据
共享数据
本章节主要内容:
多线程共享数据的问题
使用互斥量保护共享数据
保护共享数据的其它方案
有关线程安全的其它问题
在上一章内容,我们对于线程的基本使用和管理,可以说已经比较了解了,甚至深入阅读了部分的 std::thread
源码。所以如果你好好学习了上一章,本章也完全不用担心。
我们本章,就要开始聊共享数据的那些事。
条件竞争
在多线程的情况下,每个线程都抢着完成自己的任务。在大多数情况下,即使会改变执行顺序,也是良性竞争,这是无所谓的。比如两个线程都要往标准输出输出一段字符,谁先谁后并不会有什么太大影响。
void f() { std::cout << "❤️\n"; }
void f2() { std::cout << "😢\n"; }
int main(){
std::thread t{ f };
std::thread t2{ f2 };
t.join();
t2.join();
}
std::cout
的 operator<< 调用是线程安全的,不会被打断。即:同步的 C++ 流保证是线程安全的(从多个线程输出的单独字符可能交错,但无数据竞争)
只有在涉及多线程读写相同共享数据的时候,才会导致“恶性的条件竞争”。
std::vector<int>v;
void f() { v.emplace_back(1); }
void f2() { v.erase(v.begin()); }
int main() {
std::thread t{ f };
std::thread t2{ f2 };
t.join();
t2.join();
std::cout << v.size() << '\n';
}
比如这段代码就是典型的恶性条件竞争,两个线程共享一个 vector
,并对它进行修改。可能导致许多问题,比如 f2
先执行,此时 vector
还没有元素,导致抛出异常。又或者 f
执行了一半,调用了 f2()
,等等。
当然了,也有可能先执行 f,然后执行 f2,最后打印了 0,程序老老实实执行完毕。
但是我们显然不能寄希望于这种操作系统的调度。
而且即使不是一个添加元素,一个删除元素,全是 emplace_back
添加元素,也一样会有问题,由于 std::vector 不是线程安全的容器,因此当多个线程同时访问并修改 v 时,可能会发生未定义的行为。具体来说,当两个线程同时尝试向 v 中添加元素时,但是 emplace_back
函数却是可以被打断的,执行了一半,又去执行另一个线程。可能会导致数据竞争,从而引发未定义的结果。
当某个表达式的求值写入某个内存位置,而另一求值读或修改同一内存位置时,称这些表达式冲突。拥有两个冲突的求值的程序就有数据竞争,除非
- 两个求值都在同一线程上,或者在同一信号处理函数中执行,或
- 两个冲突的求值都是原子操作(见 std::atomic),或
- 一个冲突的求值发生早于 另一个(见 std::memory_order)
如果出现数据竞争,那么程序的行为未定义。
标量类型等都同理,有数据竞争,未定义行为:
int cnt = 0;
auto f = [&]{cnt++;};
std::thread t1{f}, t2{f}, t3{f}; // 未定义行为
使用互斥量
互斥量(Mutex),又常被称为互斥锁、互斥体(或者直接被称作“锁”),是一种用来保护临界区[1]的特殊对象,其相当于实现了一个公共的“标志位”。它可以处于锁定(locked)状态,也可以处于解锁(unlocked)状态:
如果互斥量是锁定的,通常说某个特定的线程正持有这个锁。
如果没有线程持有这个互斥量,那么这个互斥量就处于解锁状态。
概念从来不是我们的重点,用一段对比代码为你直观的展示互斥量的作用:
void f() {
std::cout << std::this_thread::get_id() << '\n';
}
int main() {
std::vector<std::thread> threads;
for (std::size_t i = 0; i < 10; ++i)
threads.emplace_back(f);
for (auto& thread : threads)
thread.join();
}
这段代码你多次运行它会得到毫无规律和格式的结果,我们可以使用互斥量解决这个问题:
#include <mutex> // 必要标头
std::mutex m;
void f() {
m.lock();
std::cout << std::this_thread::get_id() << '\n';
m.unlock();
}
int main() {
std::vector<std::thread>threads;
for (std::size_t i = 0; i < 10; ++i)
threads.emplace_back(f);
for (auto& thread : threads)
thread.join();
}
当多个线程执行函数 f
的时候,只有一个线程能成功调用 lock()
给互斥量上锁,其他所有的线程 lock()
的调用将阻塞执行,直至获得锁。第一个调用 lock()
的线程得以继续往下执行,执行我们的 std::cout
输出语句,不会有任何其他的线程打断这个操作。直到线程执行 unlock()
,就解锁了互斥量。
那么其他线程此时也就能再有一个成功调用 lock
...
至于到底哪个线程才会成功调用,这个是由操作系统调度决定的。
看一遍描述就可以了,简而言之,被 lock()
和 unlock()
包含在其中的代码是线程安全的,同一时间只有一个线程执行,不会被其它线程的执行所打断。
std::lock_guard
不过一般不推荐这样显式的 lock()
与 unlock()
,我们可以使用 C++11 标准库引入的“管理类” std::lock_guard
:
void f() {
std::lock_guard<std::mutex> lc{ m };
std::cout << std::this_thread::get_id() << '\n';
}
那么问题来了,std::lock_guard
是如何做到的呢?它是怎么实现的呢?首先顾名思义,这是一个“管理类”模板,用来管理互斥量的上锁与解锁,我们来看它在 MSVC STL 的实现:
_EXPORT_STD template <class _Mutex>
class _NODISCARD_LOCK lock_guard { // class with destructor that unlocks a mutex
public:
using mutex_type = _Mutex;
explicit lock_guard(_Mutex& _Mtx) : _MyMutex(_Mtx) { // construct and lock
_MyMutex.lock();
}
lock_guard(_Mutex& _Mtx, adopt_lock_t) noexcept // strengthened
: _MyMutex(_Mtx) {} // construct but don't lock
~lock_guard() noexcept {
_MyMutex.unlock();
}
lock_guard(const lock_guard&) = delete;
lock_guard& operator=(const lock_guard&) = delete;
private:
_Mutex& _MyMutex;
};
这段代码极其简单,首先管理类,自然不可移动不可复制,我们定义复制构造与复制赋值为弃置函数,同时阻止了移动等函数的隐式定义。
它只保有一个私有数据成员,一个引用,用来引用互斥量。
构造函数中初始化这个引用,同时上锁,析构函数中解锁,这是一个非常典型的 RAII
式的管理。
同时它还提供一个有额外std::adopt_lock_t
参数的构造函数 ,如果使用这个构造函数,则构造函数不会上锁。
所以有的时候你可能会看到一些这样的代码:
void f(){
//code..
{
std::lock_guard<std::mutex> lc{ m };
// 涉及共享资源的修改的代码...
}
//code..
}
使用 {}
创建了一个块作用域,限制了对象 lc
的生存期,进入作用域构造 lock_guard
的时候上锁(lock),离开作用域析构的时候解锁(unlock)。
- 我们要尽可能的让互斥量上锁的粒度小,只用来确保必须的共享资源的线程安全。
“粒度”通常用于描述锁定的范围大小,较小的粒度意味着锁定的范围更小,因此有更好的性能和更少的竞争。
我们举一个例子:
std::mutex m;
void add_to_list(int n, std::list<int>& list) {
std::vector<int> numbers(n + 1);
std::iota(numbers.begin(), numbers.end(), 0);
int sum = std::accumulate(numbers.begin(), numbers.end(), 0);
{
std::lock_guard<std::mutex> lc{ m };
list.push_back(sum);
}
}
void print_list(const std::list<int>& list){
std::lock_guard<std::mutex> lc{ m };
for(const auto& i : list){
std::cout << i << ' ';
}
std::cout << '\n';
}
std::list<int> list;
std::thread t1{ add_to_list,i,std::ref(list) };
std::thread t2{ add_to_list,i,std::ref(list) };
std::thread t3{ print_list,std::cref(list) };
std::thread t4{ print_list,std::cref(list) };
t1.join();
t2.join();
t3.join();
t4.join();
完整代码测试。
先看 add_to_list
,只有 list.push_back(sum)
涉及到了对共享数据的修改,需要进行保护,我们用 {}
包起来了。
假设有线程 A、B执行函数 add_to_list()
:线程 A 中的 numbers、sum 与线程 B 中的,不是同一个,希望大家分清楚,自然不存在数据竞争,也不需要上锁。线程 A、B执行完了前面求 0-n
的计算,只有一个线程能在 lock_guard
的构造函数中成功调用 lock() 给互斥量上锁。假设线程 A 成功调用 lock(),那么线程 B 的 lock() 调用就阻塞了,必须等待线程 A 执行完里面的代码,然后作用域结束,调用 lock_guard
的析构函数,解锁 unlock(),此时线程 B 就可以进去执行了,避免了数据竞争,不存在一个对象同时被多个线程修改。
函数 print_list()
就更简单了,打印 list
,给整个函数上锁,同一时刻只能有一个线程执行。
我们的使用代码是多个线程执行这两个函数,两个函数共享了一个锁,这样确保了当执行函数 print_list()
打印的时候,list 的状态是确定的。打印函数 print_list
和 add_to_list
函数的修改操作同一时间只能有一个线程在执行。print_list()
不可能看到正在被add_to_list()
修改的 list。
至于到底哪个函数哪个线程会先执行,执行多少次,这些都由操作系统调度决定,也完全有可能连续 4 次都是执行函数 print_list
的线程成功调用 lock
,会打印出了一样的值,这都很正常。
C++17 添加了一个新的特性,类模板实参推导, std::lock_guard
可以根据传入的参数自行推导,而不需要写明模板类型参数:
std::mutex m;
std::lock_guard lc{ m }; // std::lock_guard<std::mutex>
并且 C++17 还引入了一个新的“管理类”:std::scoped_lock
,它相较于 lock_guard
的区别在于,它可以管理多个互斥量。不过对于处理一个互斥量的情况,它和 lock_guard
几乎完全相同。
std::mutex m;
std::scoped_lock lc{ m }; // std::scoped_lock<std::mutex>
我们在后续管理多个互斥量,会详细了解这个类。
try_lock
try_lock
是互斥量中的一种尝试上锁的方式。与常规的 lock
不同,try_lock
会尝试上锁,但如果锁已经被其他线程占用,则不会阻塞当前线程,而是立即返回。
它的返回类型是 bool
,如果上锁成功就返回 true
,失败就返回 false
。
这种方法在多线程编程中很有用,特别是在需要保护临界区的同时,又不想线程因为等待锁而阻塞的情况下。
std::mutex mtx;
void thread_function(int id) {
// 尝试加锁
if (mtx.try_lock()) {
std::cout << "线程:" << id << " 获得锁" << std::endl;
// 临界区代码
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模拟临界区操作
mtx.unlock(); // 解锁
std::cout << "线程:" << id << " 释放锁" << std::endl;
} else {
std::cout << "线程:" << id << " 获取锁失败 处理步骤" << std::endl;
}
}
如果有两个线程运行这段代码,必然有一个线程无法成功上锁,要走 else 的分支。
std::thread t1(thread_function, 1);
std::thread t2(thread_function, 2);
t1.join();
t2.join();
可能的运行结果:
线程:1 获得锁
线程:2 获取锁失败 处理步骤
线程:1 释放锁
保护共享数据
互斥量主要也就是为了保护共享数据,上一节的使用互斥量也已经为各位展示了一些。
然而使用互斥量来保护共享数据也并不是在函数中加上一个 std::lock_guard
就万事大吉了。有的时候只需要一个指针或者引用,就能让这种保护形同虚设。
class Data{
int a{};
std::string b{};
public:
void do_something(){
// 修改数据成员等...
}
};
class Data_wrapper{
Data data;
std::mutex m;
public:
template<class Func>
void process_data(Func func){
std::lock_guard<std::mutex> lc{m};
func(data); // 受保护数据传递给函数
}
};
Data* p = nullptr;
void malicious_function(Data& protected_data){
p = &protected_data; // 受保护的数据被传递到外部
}
Data_wrapper d;
void foo(){
d.process_data(malicious_function); // 传递了一个恶意的函数
p->do_something(); // 在无保护的情况下访问保护数据
}
成员函数模板 process_data
看起来一点问题也没有,使用 std::lock_guard
对数据做了保护,但是调用方传递了 malicious_function
这样一个恶意的函数,使受保护数据传递给外部,可以在没有被互斥量保护的情况下调用 do_something()
。
我们传递的函数就不该是涉及外部副作用的,就应该是单纯的在受互斥量保护的情况下老老实实调用 do_something()
操作受保护的数据。
- 简而言之:切勿将受保护数据的指针或引用传递到互斥量作用域之外,不然保护将形同虚设。
process_data
的确算是没问题,用户非要做这些事情也是防不住的,我们只是告诉各位可能的情况。
死锁:问题与解决
试想一下,有一个玩具,这个玩具有两个部分,必须同时拿到两部分才能玩。比如一个遥控汽车,需要遥控器和玩具车才能玩。有两个小孩,他们都想玩这个玩具。当其中一个小孩拿到了遥控器和玩具车时,就可以尽情玩耍。当另一个小孩也想玩,他就得等待另一个小孩玩完才行。再试想,遥控器和玩具车被放在两个不同的地方,并且两个小孩都想要玩,并且一个拿到了遥控器,另一个拿到了玩具车。问题就出现了,除非其中一个孩子决定让另一个先玩,他把自己的那个部分给另一个小孩。但如果他们都不愿意,那么这个遥控汽车就谁都没有办法玩。
我们当然不在乎小孩抢玩具,我们要聊的是线程对锁的竞争:两个线程需要对它们所有的互斥量做一些操作,其中每个线程都有一个互斥量,且等待另一个线程的互斥量解锁。因为它们都在等待对方释放互斥量,没有线程工作。 这种情况就是死锁。
- 多个互斥量才可能遇到死锁问题。
避免死锁的一般建议是让两个互斥量以相同的顺序上锁,总在互斥量 B 之前锁住互斥量 A,就通常不会死锁。反面示例
std::mutex m1,m2;
std::size_t n{};
void f(){
std::lock_guard<std::mutex> lc1{ m1 };
std::lock_guard<std::mutex> lc2{ m2 };
++n;
}
void f2() {
std::lock_guard<std::mutex> lc1{ m2 };
std::lock_guard<std::mutex> lc2{ m1 };
++n;
}
f
与 f2
因为互斥量上锁顺序不同,就有死锁风险。函数 f
先锁定 m1
,然后再尝试锁定 m2
,而函数 f2
先锁定 m2
再锁定 m1
。如果两个线程同时运行,它们就可能会彼此等待对方释放其所需的锁,从而造成死锁。
简而言之,有可能函数 f 锁定了 m1,函数 f2 锁定了 m2,函数 f 要往下执行,给 m2 上锁,所以在等待 f2 解锁 m2,然而函数 f2 也在等待函数 f 解锁 m1 它才能往下执行。所以死锁。测试代码。
但是有的时候即使固定锁顺序,依旧会产生问题。当有多个互斥量保护同一个类的对象时,对于相同类型的两个不同对象进行数据的交换操作,为了保证数据交换的正确性,就要避免其它线程修改,确保每个对象的互斥量都锁住自己要保护的区域。如果按照前面的的选择一个固定的顺序上锁解锁,则毫无意义,比如:
struct X{
X(const std::string& str) :object{ str } {}
friend void swap(X& lhs, X& rhs);
private:
std::string object;
std::mutex m;
};
void swap(X& lhs, X& rhs) {
if (&lhs == &rhs) return;
std::lock_guard<std::mutex> lock1{ lhs.m };
std::lock_guard<std::mutex> lock2{ rhs.m };
swap(lhs.object, rhs.object);
}
考虑用户调用的时候将参数交换,就会产生死锁:
X a{ "🤣" }, b{ "😅" };
std::thread t{ [&] {swap(a, b); } }; // 1
std::thread t2{ [&] {swap(b, a); } }; // 2
1
执行的时候,先上锁 a 的互斥量,再上锁 b 的互斥量。
2
执行的时候,先上锁 b 的互斥量,再上锁 a 的互斥量。
完全可能线程 A 执行 1 的时候上锁了 a 的互斥量,线程 B 执行
2
上锁了 b 的互斥量。线程 A 往下执行需要上锁 b 的互斥量,线程 B 则要上锁 a 的互斥量执行完毕才能解锁,哪个都没办法往下执行,死锁。测试代码。
其实也就是回到了第一个示例的问题。
C++ 标准库有很多办法解决这个问题,可以使用 std::lock
,它能一次性锁住多个互斥量,并且没有死锁风险。修改 swap 代码后如下:
void swap(X& lhs, X& rhs) {
if (&lhs == &rhs) return;
std::lock(lhs.m, rhs.m); // 给两个互斥量上锁
std::lock_guard<std::mutex> lock1{ lhs.m,std::adopt_lock };
std::lock_guard<std::mutex> lock2{ rhs.m,std::adopt_lock };
swap(lhs.object, rhs.object);
}
因为前面已经使用了 std::lock
上锁,所以后的 std::lock_guard
构造都额外传递了一个 std::adopt_lock
参数,让其选择到不上锁的构造函数。函数退出也能正常解锁。
std::lock
给 lhs.m
或 rhs.m
上锁时若抛出异常,则在重抛前对任何已锁的对象调用 unlock()
解锁,也就是 std::lock
要么将互斥量都上锁,要么一个都不锁。
C++17 新增了 std::scoped_lock
,提供此函数的 RAII 包装,通常它比裸调用 std::lock
更好。
所以我们前面的代码可以改写为:
void swap(X& lhs, X& rhs) {
if (&lhs == &rhs) return;
std::scoped_lock guard{ lhs.m,rhs.m };
swap(lhs.object, rhs.object);
}
对此类有兴趣或任何疑问,建议阅读std::scoped_lock
的源码实现与解析
使用 std::scoped_lock
可以将所有 std::lock
替换掉,减少错误发生。
然而它们的帮助都是有限的,一切最终都是依靠开发者使用与管理。
死锁是多线程编程中令人相当头疼的问题,并且死锁经常是不可预见,甚至难以复现,因为在大部分时间里,程序都能正常完成工作。我们可以通过一些简单的规则,约束开发者的行为,帮助写出“无死锁”的代码。
避免嵌套锁
线程获取一个锁时,就别再获取第二个锁。每个线程只持有一个锁,自然不会产生死锁。如果必须要获取多个锁,使用
std::lock
。避免在持有锁时调用外部代码
这个建议是很简单的:因为代码是外部提供的,所以没办法确定外部要做什么。外部程序可能做任何事情,包括获取锁。在持有锁的情况下,如果用外部代码要获取一个锁,就会违反第一个指导意见,并造成死锁(有时这是无法避免的)。当写通用代码时(比如保护共享数据中的
Date
类)。这不是接口设计者可以处理的,只能寄希望于调用方传递的代码是能正常执行的。使用固定顺序获取锁
如同第一个示例那样,固定的顺序上锁就不存在问题。
std::unique_lock
灵活的锁
std::unique_lock
是 C++11 引入的一种通用互斥包装器,它相比于 std::lock_guard
更加的灵活。当然,它也更加的复杂,尤其它还可以与我们下一章要讲的条件变量一起使用。使用它可以将之前使用 std::lock_guard
的 swap
改写一下:
void swap(X& lhs, X& rhs) {
if (&lhs == &rhs) return;
std::unique_lock<std::mutex> lock1{ lhs.m, std::defer_lock };
std::unique_lock<std::mutex> lock2{ rhs.m, std::defer_lock };
std::lock(lock1, lock2);
swap(lhs.object, rhs.object);
}
解释这段代码最简单的方式就是直接展示标准库的源码,首先,我们要了解 std::defer_lock
是“不获得互斥体的所有权”。没有所有权自然构造函数就不会上锁,但不止如此。我们还要先知道 std::unique_lock 保有的数据成员(都以 MSVC STL 为例):
private:
_Mutex* _Pmtx = nullptr;
bool _Owns = false;
如你所见很简单,一个互斥量的指针,还有一个就是表示对象是否拥有互斥量所有权的 bool 类型的对象 _Owns
了。我们前面代码会调用构造函数:
unique_lock(_Mutex& _Mtx, defer_lock_t) noexcept
: _Pmtx(_STD addressof(_Mtx)), _Owns(false) {} // construct but don't lock
如你所见,只是初始化了数据成员而已,注意,这个构造函数没有给互斥量上锁,且 _Owns
为 false
表示没有互斥量所有权。并且 std::unique_lock
是有 lock()
、try_lock()
、unlock()
成员函数的,所以可以直接传递给 std::lock
、 进行调用。这里还需要提一下 lock()
成员函数的代码:
void lock() { // lock the mutex
_Validate();
_Pmtx->lock();
_Owns = true;
}
如你所见,正常上锁,并且把 _Owns
设置为 true
,即表示当前对象拥有互斥量的所有权。那么接下来看析构函数:
~unique_lock() noexcept {
if (_Owns) {
_Pmtx->unlock();
}
}
必须得是当前对象拥有互斥量的所有权析构函数才会调用 unlock() 解锁互斥量。我们的代码因为调用了 lock
,所以 _Owns
设置为 true
,函数结束对象析构的时候会解锁互斥量。
设计挺奇怪的对吧,这个所有权语义。其实上面的代码还不够简单直接,我们再举个例子:
std::mutex m;
int main() {
std::unique_lock<std::mutex> lock{ m,std::adopt_lock };
lock.lock();
}
这段代码运行会抛出异常,原因很简单,因为 std::adopt_lock
只是不上锁,但是有所有权,即 _Owns
设置为 true
了,当运行 lock()
成员函数的时候,调用了 _Validate()
进行检测,也就是:
void _Validate() const { // check if the mutex can be locked
if (!_Pmtx) {
_Throw_system_error(errc::operation_not_permitted);
}
if (_Owns) {
_Throw_system_error(errc::resource_deadlock_would_occur);
}
}
满足第二个 if,因为 _Owns
为 true
所以抛出异常,别的标准库也都有类似设计。很诡异的设计对吧,正常。除非我们写成:
lock.mutex()->lock();
也就是说 std::unique_lock
要想调用 lock()
成员函数,必须是当前没有所有权。
所以正常的用法其实是,先上锁了互斥量,然后传递 std::adopt_lock
构造 std::unique_lock
对象表示拥有互斥量的所有权,即可在析构的时候正常解锁。如下:
std::mutex m;
int main() {
m.lock();
std::unique_lock<std::mutex> lock { m,std::adopt_lock };
}
简而言之:
- 使用
std::defer_lock
构造函数不上锁,要求构造之后上锁 - 使用
std::adopt_lock
构造函数不上锁,要求在构造之前互斥量上锁 - 默认构造会上锁,要求构造函数之前和构造函数之后都不能再次上锁
我们前面提到了 std::unique_lock
更加灵活,那么灵活在哪?很简单,它拥有 lock()
和 unlock()
成员函数,所以我们能写出如下代码:
void f() {
//code..
std::unique_lock<std::mutex> lock{ m };
// 涉及共享资源的修改的代码...
lock.unlock(); // 解锁并释放所有权,析构函数不会再 unlock()
//code..
}
而不是像之前 std::lock_guard
一样使用 {}
。
另外再聊一聊开销吧,其实倒也还好,多了一个 bool
,内存对齐,x64 环境也就是 16
字节。这都不是最重要的,主要是复杂性和需求,通常建议优先 std::lock_guard
,当它无法满足你的需求或者显得代码非常繁琐,那么可以考虑使用 std::unique_lock
。
在不同作用域传递互斥量
首先我们要明白,互斥量满足互斥体 (Mutex)的要求,不可复制不可移动。所谓的在不同作用域传递互斥量,其实只是传递了它们的指针或者引用罢了。可以利用各种类来进行传递,比如前面提到的 std::unique_lock
。
std::unique_lock
可以获取互斥量的所有权,而互斥量的所有权可以通过移动操作转移给其他的 std::unique_lock
对象。有些时候,这种转移(就是调用移动构造)是自动发生的,比如当函数返回 std::unique_lock
对象。另一种情况就是得显式使用 std::move
。
请勿对移动语义和转移所有权抱有错误的幻想,我们说的无非是调用
std::unique_lock
的移动构造罢了:_NODISCARD_CTOR_LOCK unique_lock(unique_lock&& _Other) noexcept : _Pmtx(_Other._Pmtx), _Owns(_Other._Owns) { _Other._Pmtx = nullptr; _Other._Owns = false; }
将数据成员赋给新对象,原来的置空,这就是所谓的 “所有权”转移,切勿被词语迷惑。
std::unique_lock
是只能移动不可复制的类,它移动即标志其管理的互斥量的所有权转移了。
一种可能的使用是允许函数去锁住一个互斥量,并将互斥量的所有权转移到调用者上,所以调用者可以在这个锁保护的范围内执行代码。
std::unique_lock<std::mutex> get_lock(){
extern std::mutex some_mutex;
std::unique_lock<std::mutex> lk{ some_mutex };
return lk;
}
void process_data(){
std::unique_lock<std::mutex> lk{ get_lock() };
// 执行一些任务...
}
return lk
这里会调用移动构造,将互斥量的所有权转移给调用方, process_data
函数结束的时候会解锁互斥量。
我相信你可能对 extern std::mutex some_mutex
有疑问,其实不用感到奇怪,这是一个互斥量的声明,可能别的翻译单元(或 dll 等)有它的定义,成功链接上。我们前面也说了:“所谓的在不同作用域传递互斥量,其实只是传递了它们的指针或者引用罢了”,所以要特别注意互斥量的生存期。
extern 说明符只能搭配变量声明和函数声明(除了类成员或函数形参)。它指定外部链接,而且技术上不影响存储期,但它不能用来定义自动存储期的对象,故所有 extern 对象都具有静态或线程存储期。
如果你简单写一个 std::mutex some_mutex
那么函数 process_data
中的 lk
会持有一个悬垂指针。
举一个使用
extern std::mutex
的完整运行示例。当然,其实理论上你new std::mutex
也是完全可行...... 🤣🤣
std::unique_lock
是灵活的,同样允许在对象销毁之前就解锁互斥量,调用 unlock()
成员函数即可,不再强调。
保护共享数据的初始化过程
保护共享数据并非必须使用互斥量,互斥量只是其中一种常见的方式而已,对于一些特殊的场景,也有专门的保护方式,比如对于共享数据的初始化过程的保护。我们通常就不会用互斥量,这会造成很多的额外开销。
我们不想为各位介绍其它乱七八糟的各种保护初始化的方式,我们只介绍三种:双检锁(错误)、使用 std::call_once
、静态局部变量初始化从 C++11 开始是线程安全。
双检锁(错误)线程不安全
void f(){ if(!ptr){ // 1 std::lock_guard<std::mutex> lk{ m }; if(!ptr){ // 2 ptr.reset(new some); // 3 } } ptr->do_something(); // 4 }
① 是查看指针是否为空,空才需要初始化,才需要获取锁。指针为空,当获取锁后会再检查一次指针②(这就是双重检查),避免另一线程在第一次检查后再做初始化,并且让当前线程获取锁。
然而这显然没用,因为有潜在的条件竞争。未被锁保护的读取操作①没有与其他线程里被锁保护的写入操作③进行同步,因此就会产生条件竞争。
简而言之:一个线程知道另一个线程已经在执行③,但是此时还没有创建 some 对象,而只是分配内存对指针写入。那么这个线程在①的时候就不会进入,直接执行了
ptr->do_something()
④,得不到正确的结果,因为对象还没构造。如果你觉得难以理解,那就记住
ptr.reset(new some);
并非是不可打断不可交换的固定指令。这种错误写法在一些单例中也非常的常见。如果你的同事或上司写出此代码,一般不建议指出,因为不见得你能教会他们,不要“没事找事”,只要不影响自己即可。
C++ 标准委员会也认为处理此问题很重要,所以标准库提供了
std::call_once
和std::once_flag
来处理这种情况。比起锁住互斥量并显式检查指针,每个线程只需要使用std::call_once
就可以。使用std::call_once
比显式使用互斥量消耗的资源更少,特别是当初始化完成之后。std::shared_ptr<some> ptr; std::once_flag resource_flag; void init_resource(){ ptr.reset(new some); } void foo(){ std::call_once(resource_flag, init_resource); // 线程安全的一次初始化 ptr->do_something(); }
以上代码
std::once_flag
对象是全局命名空间作用域声明,如果你有需要,它也可以是类的成员。用于搭配std::call_once
使用,保证线程安全的一次初始化。std::call_once
只需要接受可调用 (Callable)对象即可,也不要求一定是函数。“初始化”,那自然是只有一次。但是
std::call_once
也有一些例外情况(比如异常)会让传入的可调用对象被多次调用,即“多次”初始化:std::once_flag flag; int n = 0; void f(){ std::call_once(flag, [] { ++n; std::cout << "第" << n << "次调用\n"; throw std::runtime_error("异常"); }); } int main(){ try{ f(); } catch (std::exception&){} try{ f(); } catch (std::exception&){} }
测试链接。正常情况会保证传入的可调用对象只调用一次,即初始化只有一次。异常之类的是例外。
这种行为很合理,因为异常代表操作失败,需要进行回溯和重置状态,符合语义和设计。
静态局部变量初始化在 C++11 是线程安全
class my_class; inline my_class& get_my_class_instance(){ static my_class instance; // 线程安全的初始化过程 初始化严格发生一次 return instance; }
即使多个线程同时访问
get_my_class_instance
函数,也只有一个线程会执行 instance 的初始化,其它线程会等待初始化完成。这种实现方式是线程安全的,不用担心数据竞争。此方式也在单例中多见,被称作“Meyers Singleton”单例,是简单合理的做法。
其实还有不少其他的做法或者反例,但是觉得没必要再聊了,这些已经足够了,再多下去就冗余了。且本文不是详尽的文档,而是“教程”。
保护不常更新的数据结构
试想一下,你有一个数据结构存储了用户的设置信息,每次用户打开程序的时候,都要进行读取,且运行时很多地方都依赖这个数据结构需要读取,所以为了效率,我们使用了多线程读写。这个数据结构很少进行改变,而我们知道,多线程读取,是没有数据竞争的,是安全的,但是有些时候又不可避免的有修改和读取都要工作的时候,所以依然必须得使用互斥量进行保护。
然而使用 std::mutex
的开销是过大的,它不管有没有发生数据竞争(也就是就算全是读的情况)也必须是老老实实上锁解锁,只有一个线程可以运行。如果你学过其它语言或者操作系统,相信这个时候就已经想到了:“读写锁”。
C++ 标准库自然为我们提供了: std::shared_timed_mutex
(C++14)、 std::shared_mutex
(C++17)。它们的区别简单来说,前者支持更多的操作方式,后者有更高的性能优势。
std::shared_mutex
同样支持 std::lock_guard
、std::unique_lock
。和 std::mutex
做的一样,保证写线程的独占访问。而那些无需修改数据结构的读线程,可以使用 std::shared_lock<std::shared_mutex>
获取访问权,多个线程可以一起读取。
class Settings {
private:
std::map<std::string, std::string> data_;
mutable std::shared_mutex mutex_; // “M&M 规则”:mutable 与 mutex 一起出现
public:
void set(const std::string& key, const std::string& value) {
std::lock_guard<std::shared_mutex> lock{ mutex_ };
data_[key] = value;
}
std::string get(const std::string& key) const {
std::shared_lock<std::shared_mutex> lock(mutex_);
auto it = data_.find(key);
return (it != data_.end()) ? it->second : ""; // 如果没有找到键返回空字符串
}
};
std::shared_timed_mutex
具有 std::shared_mutex
的所有功能,并且额外支持超时功能。所以以上代码可以随意更换这两个互斥量。
std::recursive_mutex
线程对已经上锁的 std::mutex
再次上锁是错误的,这是未定义行为。然而在某些情况下,一个线程会尝试在释放一个互斥量前多次获取,所以提供了std::recursive_mutex
。
std::recursive_mutex
是 C++ 标准库提供的一种互斥量类型,它允许同一线程多次锁定同一个互斥量,而不会造成死锁。当同一线程多次对同一个 std::recursive_mutex
进行锁定时,只有在解锁与锁定次数相匹配时,互斥量才会真正释放。但它并不影响不同线程对同一个互斥量进行锁定的情况。不同线程对同一个互斥量进行锁定时,会按照互斥量的规则进行阻塞,
#include <iostream>
#include <thread>
#include <mutex>
std::recursive_mutex mtx;
void recursive_function(int count) {
// 递归函数,每次递归都会锁定互斥量
mtx.lock();
std::cout << "Locked by thread: " << std::this_thread::get_id() << ", count: " << count << std::endl;
if (count > 0) {
recursive_function(count - 1); // 递归调用
}
mtx.unlock(); // 解锁互斥量
}
int main() {
std::thread t1(recursive_function, 3);
std::thread t2(recursive_function, 2);
t1.join();
t2.join();
}
运行测试。
lock
:线程可以在递归互斥体上重复调用lock
。在线程调用unlock
匹配次数后,所有权才会得到释放。unlock
:若所有权层数为 1(此线程对 lock() 的调用恰好比unlock()
多一次 )则解锁互斥量,否则将所有权层数减少 1。
我们重点的强调了一下这两个成员函数的这个概念,其实也很简单,总而言之就是 unlock
必须和 lock
的调用次数一样,才会真正解锁互斥量。
同样的,我们也可以使用 std::lock_guard
、std::unique_lock
帮我们管理 std::recursive_mutex
,而非显式调用 lock
与 unlock
:
void recursive_function(int count) {
std::lock_guard<std::recursive_mutex> lc{ mtx };
std::cout << "Locked by thread: " << std::this_thread::get_id() << ", count: " << count << std::endl;
if (count > 0) {
recursive_function(count - 1);
}
}
运行测试。
new
、delete
是线程安全的吗?
如果你的标准达到 C++11,要求下列函数是线程安全的:
new
运算符和delete
运算符的库版本- 全局
new
运算符和delete
运算符的用户替换版本 - std::calloc、std::malloc、std::realloc、std::aligned_alloc (C++17 起)、std::free
所以以下函数在多线程运行是线程安全的:
void f(){
T* p = new T{};
delete p;
}
内存分配、释放操作是线程安全,构造和析构不涉及共享资源。而局部对象 p
对于每个线程来说是独立的。换句话说,每个线程都有其自己的 p
对象实例,因此它们不会共享同一个对象,自然没有数据竞争。
如果 p
是全局对象(或者外部的,只要可被多个线程读写),多个线程同时对其进行访问和修改时,就可能会导致数据竞争和未定义行为。因此,确保全局对象的线程安全访问通常需要额外的同步措施,比如互斥量或原子操作。
T* p = nullptr;
void f(){
p = new T{}; // 存在数据竞争
delete p;
}
即使 p
是局部对象,如果构造函数(析构同理)涉及读写共享资源,那么一样存在数据竞争,需要进行额外的同步措施进行保护。
int n = 1;
struct X{
X(int v){
::n += v;
}
};
void f(){
X* p = new X{ 1 }; // 存在数据竞争
delete p;
}
一个直观的展示是,我们可以在构造函数中使用
std::cout
,看到无序的输出,例子。
值得注意的是,如果是自己重载 operator new
、operator delete
替换了库的全局版本,那么它的线程安全就要我们来保证。
// 全局的 new 运算符,替换了库的版本
void* operator new (std::size_t count){
return ::operator new(count);
}
以上代码是线程安全的,因为 C++11 保证了 new 运算符的库版本,即 ::operator new
是线程安全的,我们直接调用它自然不成问题。如果你需要更多的操作,就得使用互斥量之类的方式保护了。
总而言之,new
表达式线程安全要考虑三方面:operator new
、构造函数、修改指针。
delete
表达式线程安全考虑两方面:operator delete
、析构函数。
C++ 只保证了 operator new
、operator delete
这两个方面的线程安全(不包括用户定义的),其它方面就得自己保证了。前面的内容也都提到了。
线程存储期
线程存储期(也有人喜欢称作“线程局部存储”)的概念源自操作系统,是一种非常古老的机制,广泛应用于各种编程语言。线程存储期的对象在线程开始时分配,并在线程结束时释放。每个线程拥有自己独立的对象实例,互不干扰。在 C++11中,引入了thread_local
关键字,用于声明具有线程存储期的对象。不少开发者喜欢直接将声明为线程存储期的对象称为:线程变量;也与全局变量、CPU 变量,在一起讨论。
以下是一段示例代码,展示了 thread_local
关键字的使用:
int global_counter = 0;
thread_local int thread_local_counter = 0;
void print_counters(){
std::cout << "global:" << global_counter++ << '\n';
std::cout << "thread_local:" << thread_local_counter++ << '\n';
}
int main(){
std::thread{ print_counters }.join();
std::thread{ print_counters }.join();
}
运行结果:
global:0
thread_local:0
global:1
thread_local:0
这段代码很好的展示了 thread_local
关键字的使用以及它的作用。每一个线程都有独立的 thread_local_counter
对象,它们不是同一个。
我知道你会有问题:“那么 C++11 之前呢?”那时开发者通常使用 POSIX 线程(Pthreads)或 Win32 线程的接口,或者依赖各家编译器的扩展。例如:
- POSIX:使用
pthread_key_t
和相关的函数(pthread_key_create
、pthread_setspecific
、pthread_getspecific
和pthread_key_delete
)来管理线程局部存储。 - Win32:使用 TLS(Thread Local Storage)机制,通过函数
TlsAlloc
、TlsSetValue
、TlsGetValue
和TlsFree
来实现线程局部存储。 - GCC:使用
__thread
。 - MSVC:使用
__declspec(thread)
。
POSIX 与 Win32 接口的就不再介绍了,有兴趣参见我们的链接即可。我们就拿先前的代码改成使用 GCC 与 MSVC 的编译器扩展即可。
__thread int thread_local_counter = 0; // GCC
__declspec(thread) int thread_local_counter = 0; // MSVC
MSVC 无法使用 GCC 的编译器扩展,GCC 也肯定无法使用 MSVC 的扩展,不过 Clang 编译器可以,它支持 __thread
与 __declspec(thread)
两种。Clang 默认情况就支持 GCC 的编译器扩展,如果要支持 MSVC,需要设置 -fms-extensions
编译选项。
要注意的是,这些扩展并不是标准的 C++ 语言特性,它们的跨平台性和可移植性较差,我们应当使用 C++ 标准的 thread_local
。
了解其它 API 以及编译器扩展有助于理解历史上线程存储期的演进。同时扩展知识面。
注意事项
需要注意的是,在 MSVC 的实现中,如果
std::async
策略为launch::async
,但却并不是每次都创建一个新的线程,而是从线程池获取线程。这意味着无法保证线程局部变量在任务完成时会被销毁。如果线程被回收并用于新的std::async
调用,则旧的线程局部变量仍然存在。因此,建议不要将线程局部变量与std::async
一起使用。文档。虽然还没有讲
std::async
,不过还是可以注意一下这个问题,我们用一个简单的示例为你展示:int n = 0; struct X { ~X() { ++n; } }; thread_local X x{}; void use_thread_local_x() { // 如果不写此弃值表达式,那么在 Gcc 与 Clang 编译此代码,会优化掉 x (void)x; } int main() { std::cout << "使用 std::thread: \n"; std::thread{ use_thread_local_x }.join(); std::cout << n << '\n'; std::cout << "使用 std::async: \n"; std::async(std::launch::async, use_thread_local_x); std::cout << n << '\n'; }
在不同编译器上的输出结果:
- Linux/Windows GCC、Clang:会输出
1
、2
,因为线程变量x
在每个任务中被正确销毁析构。- Windows MSVC:会输出
1
、1
,因为线程被线程池复用,线程依然活跃,线程变量 x 也就还未释放。
CPU 变量
CPU 变量的概念很好理解。就像线程变量为每个线程提供独立的对象实例,互不干扰一样,CPU 变量也是如此。在创建 CPU 变量时,系统上的每个 CPU [2] 都会获得该变量的一个副本。
在 Linux 内核中,从 2.6[3] 版本开始引入了 Per-CPU 变量(Per-CPU variables)功能。Per-CPU 变量是为每个处理器单独分配的变量副本,旨在减少多处理器访问共享数据时的同步开销,提升性能。每个处理器只访问自己的变量副本,不需要进行同步操作,避免了数据竞争,增强了并行处理能力。
在 Windows 内核中,没有直接对应的 Per-CPU 变量机制。
本节是偏向概念的认识,而非实际进行内核编程,C++ 语言层面也并未提供此抽象。理解 CPU 变量的概念对于系统编程和内核开发非常重要。这些概念在面试和技术讨论中常常出现,掌握这些知识不仅有助于应对面试问题,也能提升对多处理器系统性能优化的理解。
局部、全局、线程、CPU 变量的对比与使用
在并发编程中,不同的变量有不同的使用场景和特点。以下是局部变量、全局变量、线程变量、CPU变量的对比:
局部变量(不考虑静态局部)
- 它拥有自动存储期,随作用域开始分配,结束时释放。每个线程、每次调用都有独立实例,完全独立,几乎无需同步。
全局变量
线程变量
- 拥有 线程(thread) 存储期。它的存储在线程开始时分配,并在线程结束时解分配。每个线程拥有它自身的对象实例。只有声明为 thread_local 的对象拥有此存储期(不考虑非标准用法)。它的初始化需要考虑局部与非局部两种情况:
CPU变量
- 它在标准 C++ 中无对应抽象实现,是操作系统内核功能。它主要依赖于当前系统内核来进行使用,也无法跨平台。基本概念与线程变量类似:CPU 变量是为每个处理器单独分配的变量副本。
- 局部变量适合临时数据,作用域结束自动释放,几乎[5]无需同步。
- 全局变量适合整个程序的共享状态,但需要使用同步设施进行保护。
- 线程变量适合线程的独立状态,通常[6]无需同步。
- CPU 变量的使用是少见的,主要用于内核开发和追求极致性能的高并发场景,减少 CPU 同步开销。
总而言之,结合实际使用即可,把这四种变量拿出来进行对比,增进理解,加深印象。
总结
本章讨论了多线程的共享数据引发的恶性条件竞争会带来的问题。并说明了可以使用互斥量(std::mutex
)保护共享数据,并且要注意互斥量上锁的“粒度”。C++标准库提供了很多工具,包括管理互斥量的管理类(std::lock_guard
),但是互斥量只能解决它能解决的问题,并且它有自己的问题(死锁)。同时我们讲述了一些避免死锁的方法和技术。还讲了一下互斥量所有权转移。然后讨论了面对不同情况保护共享数据的不同方式,使用 std::call_once()
保护共享数据的初始化过程,使用读写锁(std::shared_mutex
)保护不常更新的数据结构。以及特殊情况可能用到的互斥量 recursive_mutex
,有些人可能喜欢称作:递归锁。然后聊了一下 new
、delete
运算符的库函数实际是线程安全的。最后介绍了一下线程存储期、CPU 变量,和各种变量进行了一个对比。
下一章,我们将开始讲述同步操作,会使用到 Futures、条件变量等设施。
"临界区"指的是一个访问共享资源的程序片段,而这些共享资源又无法同时被多个线程访问的特性。在临界区中,通常会使用同步机制,比如我们要讲的互斥量(Mutex)。 ↩︎
“每个 CPU”,指的是系统中的每个物理处理器或每个逻辑处理器(如果超线程被启用)。 ↩︎
之所以说是“几乎”,是因为局部对象的构造、析构,或其它成员函数也可能修改共享数据、全局状态。如果它们不是线程安全的,同样可能产生数据竞争,例如,某类型
X
的构造函数会自增一个全局变量n
,那么即使局部对象本身是独立的,但由于构造函数修改了共享数据,依然会产生数据竞争。不过实践中这种情况较少,即使涉及到全局的状态,通常其本身也是线程安全的,例如前文提到的new
、delete
线程安全的问题。 ↩︎之所以说“通常”而不是一定,是因为理论上线程变量一样可能产生数据竞争(例如有一个全局的指针指向了一个线程局部变量,其它线程通过这个指针读写线程局部变量而不附加同步,自然会产生数据竞争),只不过实践中通常不会那样做,所以我们不额外提及。 ↩︎