多线程编程

最后更新于:2023年2月24日 下午

在 C++11 之前,C++ 本身是不支持多线程的,这也导致了现在很多 stl 并不保证线程安全。在此之前要想写多进程/线程 编程,基本就是平台相关,根据宏定义,调用对应的操作系统的多线程支持(当然了 C++ 中也提供 native_handler() 来支持平台相关的操作)。之前写过一点计算密集型的 cpp 多线程代码,后来工作了,才知道事物密集型多线程更为常用

以下内容为《C++-Concurrency-In-Action-2ed-zh-v0.2》笔记,在线电子书:C++ 并发编程实战 第二版 (C++ Concurrency in Action - SECOND EDITION)

多线程的弊端

  • 空间:每个线程是独立的堆栈,操作一般会给每个线程分配 1M 的空间
  • 复杂:写多线程需要考虑 线程安全,锁,等问题,带来了算法复杂性,导致代码维护程度变大
  • 滥用变慢:线程是独立的操作系统时间分配单元,因此滥用多线程,反而可能导致性能下降

当你决定用多线程的时候,一定要设计的特别细心,思路清晰,否则 bug 无穷无尽。很多单线程安全的代码,一到多线程马上不安全了。

多线程的好处

  • 关注点分离:在事件密集型的程序中,不同的事务不同的进程处理,有效了降低了工程复杂度
  • IO 阻塞:IO 处理一般都很难,而且等待时间不可预测,如果一直等待必然是不合算的。此时可以做很多初始化的预处理操作,等价 IO 操作的进程等到了用户的操作,再处理用户的操作才是合理的做法
  • 充分利用 CPU:在真实的事务中应该尽量避免使用计算密集型的多线程处理,但是编译器编译,矩阵乘法等一系列系统级别和数学计算的,确实可以利用多线程

如果涉及超级计算的,应该使用多进程,多机互联,进程间通信的方式处理(我不会)

多线程的方式

  • 转线程到统一线程执行
  • RCU(read-copy-update)
  • atomic 下标分流

进程,线程,协程

进程是资源分配的最小单位,线程是 CPU 调度的最小单位。知乎 上有人把进程比做动车,线程比做车厢挺合适的。它们都可以做到真正意思下的并行。

协程并不能并行,正如它的名字一样,协助程序罢了。它获取了该线程的资源,处理完后会把所有权返回给它所在的线程,已经加入 C++20,但是是给库开发者使用的,要 C++23 才适合一般开发者使用,到那时就有类似于 Python yield 表达式一样的生成器(generator) 了

这么看携程也可以做到关注点分离呀,是不是在工程中更加适合

入门

由于公司所用版本问题,很多时候还是要区分一下哪些是 C++11, C++14, C++17 有的。

以下内容默认包含头文件 #include <thread>,标准为 C++11,若是 C++14,或 C++17 特有的,特别标注

线程启动

普通方式

1
2
void doSomeWork();
std::thread myTheard(doSomeWork);

类方式

1
2
3
4
5
6
7
8
class BackgroundTask {
public:
void operator()() const {
doSomething();
}
};
BackgroundTask f;
std::thread myThread(f);

注意上述代码最后两行不能写成 std::thread myThread(BackgroundTask());,否则编译器会以为你是在声明一个叫 myThread 的函数,它的输入为 BackgoundTask 的函数,输出为一个线程。

可以用 std::thread myThread(BackgroundTask{}); 或者再多打个括号(不推荐)解决

Lambda 方式

1
2
3
std::thread myThread([]() {
doSomethingElse();
});

新线程启动之后,需要主线程决定要不要等它结束。当 std::thread 对象销毁之前还没做出决定,程序就会 crash

汇入(join)或分离(detach)

我们可以选择汇入(join)等待 或者分离(detach)不等待,也可以通过条件来控制是否等待,但是一定不要忘了做出选择,为了避免遗忘或者异常导致这种情况,可以使用 RAII(Resource Acquisition is initialization) 写在析构函数里,例如 自己手写 或用 boost 中的 thread_guard 和 scoped_thread 或用 gsl::joining_thread 或 C++20 std::jthread(也可以自己手写,或参考书中的写法). 但是如果主函数在某个位置需要等某个线程,那么必然要写在这个位置之前。

detach 的程序千万不要是用原函数中的局部变量,否则大概率会 crash

join 和 detach 加起来调用的次数必须为 1,可用 joinable 查看是否为 1

thread_local 变量

书中关于此有错误,再次添加一个可以直接运行的例子说明一切

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#include <bits/stdc++.h>
#define cerr(x) std::cerr << (#x) << " is " << (x) << '\n'

thread_local int x;
class X {
public:
static thread_local std::string s;
static thread_local int y;
};
thread_local int X::y = 2;
thread_local std::string X::s;
void foo() {
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
cerr(X::y);
x = 3;
cerr(x);
X::y= 123;
X::s = "123";
cerr(X::y);
cerr(X::s);
}

int main() {
std::thread t(foo);
cerr(X::y);
x = 4;
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
t.join();
cerr(x);
X::y = 456;
X::s = "456";
cerr(X::y);
cerr(X::s);
return 0;
}

线程数量

可以用 std::thread::hardware_concurrency() 获取硬件线程数,返回 0 表示获取失败,可以通过这个值确定在计算密集型的的程序中如何选择线程开启个数

线程 ID

可以通过 std::this_thread::get_id() 返回 std::thread::id,然后确定某个内容在某个线程下执行

设置线程优先级

进程优先级,线程优先级,libevent 优先级

共享数据

涉及到共享的内容,如果不修改就不会麻烦发生,一旦涉及到修改,就要小心翼翼

如何共享数据?

  • 全局变量
  • 类静态变量
  • 局部变量用 lambda 来创建线程

条件竞争(race condition)

一旦 bug 出现,很难复现从而定位问题,这也是多线程难于维护的原因之一

  • 恶性竞争: 在一个线程修改到一半的时候,另一个线程突然打断进来修改,这样会导致数据的永久性损坏(例如删除双向链表的一个节点),必须要避免
  • 良性竞争:两个线程可能会争夺执行的顺序,但是不会形成恶性竞争,这种无害的竞争,一般不用管

互斥量

std::mutex 创造互斥量,用成员函数 lock(), unlock() 上锁解锁,但是手动搞的话,就每次要加锁解锁很烦,所以还是用 RAII 的思想用 std::lock_guard,例如

1
2
3
4
5
6
7
8
9
10
11
12
13
class TestMutex {
static std::mutex mutex_;
static std::list<int> list_;
public:
void add(int value) {
std::lock_guard<std::mutex> guard(mutex_);
list_.emplace_back(value);
}
bool contains(int value) {
std::lock_guard<std::mutex> guard(mutex_);
return std::find(list_.begin(), list_.end(), value) != list_.end();
}
};

C++17 有模板类型推导可以省去不少内容

注意虽然加了互斥量可以避免不少恶性竞争,但是还是不完美:当某个成员函数返回的是保护数据的指针或者引用时,也会破坏数据。因此我们需要对接口慎重设计来确保互斥量能锁住数访问,不留后门(书中有例子)

当然了可以通过作用域让 std::lock_guard 提前析构或者使用更加灵活的 std::unique_lock,但是 std::unique_lock 会占用比较多的空间,并且比 std::lock_guard 稍慢一些。保证灵活性要付出代价 另外 std::unique_lock 实例没有与自身相关的互斥量,一个互斥量的所有权可以通过移动操作,在不同的实例中进行传递

C++17对这种情况提供了支持,std::scoped_lock<> 一种新的RAII类型模板类型,与 std::lock_guard<> 的功能等价,只是这个新类型能接受不定数量的互斥量类型作为模板参数,以及相应的互斥量(数量和类型)作为构造参数

避免死锁

  • 避免嵌套锁
  • 避免在持有锁时调用用户提供的代码
  • 使用固定顺序获取锁:但是如果出现 swap 可能还是会死锁,因此可以用 std::lock 同时对多个互斥量上锁
  • 使用锁的层次结构(需要自己写一下简单实现,或者直接用书中的实现)

锁的粒度

在满足需求的前提下,越小越好

保护共享数据的初始化过程

假设你有一个共享源,构建代价很昂贵,它可能会打开一个数据库连接或分配出很多的内存。

C++标准库提供了std::once_flagstd::call_once() 来处理这种情况。比起锁住互斥量并显式的检查指针,每个线程只需要使用 std::call_once() 就可以,在 std::call_once() 的结束时,就能安全的知道指针已经被其他的线程初始化了。使用 std::call_once() 比显式使用互斥量消耗的资源更少,特别是当初始化完成后

值得注意的是,std::mutexstd::once_flag 的实例不能拷贝和移动,需要通过显式定义相应的成员函数,对这些类成员进行操作,std::call_once() 的原理猜测是,进入时设置一个已经有线程正在初始化的 flag(此时回去等),等这个线程结束了又一个初始化结束的(此时不用等)

保护不常更新的数据结构

“读者-作者锁”,因为其允许两种不同的使用方式:一个“作者”线程独占访问和共享访问,让多个“读者”线程并发访问。对此 C++17 标准库提供了两种非常好的互斥量:std::shared_mutexstd::shared_timed_mutex。C++14只提供了 std::shared_timed_mutexstd::shared_mutex 有更高的性能优势,自然的不支持更多的操作)

唯一的限制:当任一线程拥有一个共享锁时,这个线程就会尝试获取一个独占锁,直到其他线程放弃他们的锁;同样的,当任一线程拥有一个独占锁时,其他线程就无法获得共享锁或独占锁,直到第一个线程放弃其拥有的锁。

  • 锁使用 mutable std::shared_mutex
  • 单一作者使用 std::lock_guard<>
  • 多读者使用 std::shared_lock<>

嵌套锁

不推荐使用,应该想办法换一种方式来避免

C++标准库提供了 std::recursive_mutex 类,可以搞嵌套锁,这是因为一个成员函数可能会调用其他的成员函数,这就导致可能出现嵌套锁。但是这种做法过于草率,并且不合理。

同步

有事我们需要等待一个较长时间的任务,比如 IO 操作。此时我们可以隔一段时间尝试一下看 OI 操作有没有完成,但是更优的方式是使用系统提供的条件变量

条件变量

C++标准库对条件变量有两套实现:std::condition_variablestd::condition_variable_any 前者仅限于与 std::mutex 一起工作,而后者可以和任何满足最低标准的互斥量一起工作,当然了任何灵活性都是有代价的。

举例

线程获取返回值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#include <bits/stdc++.h>
#define watch(x) std::cout << (#x) << " is " << (x) << std::endl
using LL = long long;

// 多线程函数获取返回值的方式
class A {
public:
int operator() (const int &a, const int & b, std::promise<int> &promiseObj) const {
promiseObj.set_value(a + b);
return a + b;
}
};

int main() {
std::ios::sync_with_stdio(false);
std::cin.tie(nullptr);
A a;
std::promise<int> promiseObj;
std::future<int> futureObj = promiseObj.get_future();
std::thread th1(a, 1, 2, std::ref(promiseObj));
th1.join();
watch(futureObj.get()); // 注意只能 get 一次。
return 0;
}

上述为最基本的做法,使用 std::packaged_taskstd::future 是更优雅的做法,下面有例子。

线程交互之消费者问题

根据 此文章 修改而来

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#include <bits/stdc++.h>
#include <unistd.h>

// 生产者消费者问题
void solve() {
std::queue<int> q;
std::mutex mu;
std::condition_variable cond;
const int C = 3; // 最大产品量
const int ST = 2; // 休眠时间
auto producer = [&]() {
std::srand(std::time(nullptr));
while (1) {
if(q.size() < C) { // 限流
int data = std::rand();
std::unique_lock locker(mu);
q.push(data);
std::cout << "Saving " << data << std::endl;
cond.notify_one(); // 通知取
}
sleep(ST);
}
};
auto consumer = [&]() {
while (1) {
std::unique_lock locker(mu);
while (q.size() < C) cond.wait(locker); // 满了之后一次取完
while (!q.empty()) {
int data = q.front();
q.pop();
std::cout << "withdraw " << data << std::endl;
}
sleep(ST);
}
};
std::thread t1(producer);
std::thread t2(consumer);
t1.join();
t2.join();
}

int main() {
solve();
return 0;
}

以下为计算密集型的多线程使用举例

累加函数的并行算法

STL 中 std::accumulate 并不是并行的,可能是考虑到对于一般的类的加法并不一定满足结合律,从而没法并行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
#include <bits/stdc++.h>
#define watch(x) std::cout << (#x) << " is " << (x) << std::endl

// https://www.bookstack.cn/read/CPP-Concurrency-In-Action-2ed-2019/content-chapter8-8.4-chinese.md
template<typename Iterator, typename T>
class AccumulateBlock {
public:
void operator()(Iterator start, Iterator end, T &r) {
r = std::accumulate(start, end, r);
}
};

template<typename Iterator, typename T>
T accumulateParallel(Iterator start, Iterator end, T init) {
int n = std::distance(start, end);
if (n == 0) return init;
const int minPerThread = 25; // 单个线程最小处理数据长度。
const int maxThread = (n + minPerThread - 1) / minPerThread;
const int hardwareThread = std::thread::hardware_concurrency();
int threadsNum = std::min(hardwareThread == 0 ? 1 : hardwareThread, maxThread);
watch(threadsNum);
int blockSize = n / threadsNum;
std::vector<T> results(threadsNum);
std::vector<std::thread> threads(threadsNum - 1);
Iterator blockStart = start;
for (int i = 0; i < threads.size(); ++i) {
Iterator blockEnd = blockStart;
std::advance(blockEnd, blockSize);
threads[i] = std::thread(AccumulateBlock<Iterator, T>(), blockStart, blockEnd, std::ref(results[i]));
blockStart = blockEnd;
}
AccumulateBlock<Iterator,T>()(blockStart, end, results.back());
std::for_each(threads.begin(), threads.end(), std::mem_fn(&std::thread::join));
return std::accumulate(results.begin(), results.end(), init);
}

int main() {
// freopen("in", "r", stdin);
std::ios::sync_with_stdio(false);
std::cin.tie(nullptr);
int n = 5e7 + 2;
std::vector<int> a(n);
// watch(RAND_MAX);
std::srand(std::time(nullptr));
for (auto &x : a) x = rand();

auto start2 = std::clock();
auto r2 = std::accumulate(a.begin(), a.end(), 0LL);
watch(r2);
std::cout << "Time used: " << (std::clock() - start2) << "ms" << std::endl;

auto start = std::clock();
auto r = accumulateParallel(a.begin(), a.end(), 0LL);
watch(r);
std::cout << "Time used: " << (std::clock() - start) << "ms" << std::endl;

return 0;
}

还真能变很多(我电脑 6 核 6 线程的,时间运行为 1/4),不过上述代码是不安全的,因为可能 result 的结果并没有更新好就使用了!以下是安全的做法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
#include <bits/stdc++.h>
#define watch(x) std::cout << (#x) << " is " << (x) << std::endl

// https://www.bookstack.cn/read/CPP-Concurrency-In-Action-2ed-2019/content-chapter8-8.4-chinese.md
template<typename Iterator, typename T>
class AccumulateBlock {
public:
T operator()(Iterator start, Iterator end) {
return std::accumulate(start, end, T());
}
};

template<typename Iterator, typename T>
T accumulateParallel(Iterator start, Iterator end, T init) {
int n = std::distance(start, end);
if (n == 0) return init;
const int minPerThread = 25; // 单个线程最小处理数据长度。
const int maxThread = (n + minPerThread - 1) / minPerThread;
const int hardwareThread = std::thread::hardware_concurrency();
int threadsNum = std::min(hardwareThread == 0 ? 1 : hardwareThread, maxThread);
// watch(threadsNum);
int blockSize = n / threadsNum;
std::vector<std::future<T>> futures(threadsNum - 1);
std::vector<std::thread> threads(threadsNum - 1);
auto blockStart = start;
for (int i = 0; i < threads.size(); ++i) {
Iterator blockEnd = blockStart;
std::advance(blockEnd, blockSize);
std::packaged_task<T(Iterator, Iterator)> task{AccumulateBlock<Iterator, T>()};
futures[i] = task.get_future();
threads[i] = std::thread(std::move(task), blockStart, blockEnd);
blockStart = blockEnd;
}
T r = AccumulateBlock<Iterator,T>()(blockStart, end) + init;
// 最后在加入线程貌似会更快,我也不懂为啥
std::for_each(threads.begin(), threads.end(), std::mem_fn(&std::thread::join));
for (auto &x : futures) r += x.get();
return r;
}

int main() {
// freopen("in", "r", stdin);
std::ios::sync_with_stdio(false);
std::cin.tie(nullptr);
int n = 5e7 + 2;
std::vector<int> a(n);
// watch(RAND_MAX);
std::srand(std::time(nullptr));
for (auto &x : a) x = rand();

auto start = std::clock();
auto r = accumulateParallel(a.begin(), a.end(), 0LL);
watch(r);
std::cout << "Time used: " << (std::clock() - start) << "ms" << std::endl;

auto start2 = std::clock();
auto r2 = std::accumulate(a.begin(), a.end(), 0LL);
watch(r2);
std::cout << "Time used: " << (std::clock() - start2) << "ms" << std::endl;
return 0;
}

一起查找,有线程找到就全部结束

用到了递归中调用多线程,async 会自动根据当前空闲线程判断是否生成子线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
#include <bits/stdc++.h>
#define watch(x) std::cout << (#x) << " is " << (x) << std::endl

// https://www.bookstack.cn/read/CPP-Concurrency-In-Action-2ed-2019/content-chapter8-8.5-chinese.md
template<typename Iterator, typename MatchType>
Iterator findParallelCore(Iterator start, Iterator end, MatchType match, std::atomic<bool> &done) {
try {
const int n = std::distance(start, end);
const int minPerThread = 25;
if (n < minPerThread * 2) {
while (start != end && !done.load()) {
if (*start == match) {
done = true;
return start;
}
++start;
}
return end;
} else {
auto mid = start;
std::advance(mid, n / 2);
std::future<Iterator> asyncResult =
std::async(findParallelCore<Iterator, MatchType>, mid, end, match, std::ref(done));
Iterator directResult = findParallelCore(start, mid, match, done);
return directResult == mid ? asyncResult.get() : directResult;
}
} catch (...) {
done = true;
throw;
}
}
template<typename Iterator, typename MatchType>
Iterator findParallel(Iterator start, Iterator end, MatchType match) {
std::atomic<bool> done(false);
return findParallelCore(start, end, match, done);
}

int main() {
// freopen("in", "r", stdin);
std::ios::sync_with_stdio(false);
std::cin.tie(nullptr);
int n = 1e3 + 2;
std::vector<int> a(n);
// watch(RAND_MAX);
std::srand(std::time(nullptr));
for (auto &x : a) x = rand();
auto start = std::clock();
auto r = findParallel(a.begin(), a.end(), 123);
watch(std::distance(a.begin(), r));
if (r != a.end()) watch(*r);
std::cout << "Time used: " << (std::clock() - start) << "ms" << std::endl;
return 0;
}

由于 std::find 本身是并行的,因此没必要和它比较效率。

并行版的 Karatsuba 算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
#include <bits/stdc++.h>
#define watch(x) std::cout << (#x) << " is " << (x) << std::endl
using LL = long long;

// 任意模数多项式乘法 O(n^{\log_2 3})
using VL = std::vector<LL>;
VL KaratsubaParallel(VL a, VL b, LL p) {
if (a.size() < b.size()) std::swap(a, b);
auto mulS = [&](VL a, VL b) {
int n = a.size(), m = b.size(), sz = n + m - 1;
std::vector<__int128> c(sz);
for (int i = 0; i < n; ++i) {
for (int j = 0; j < m; ++j) {
c[i + j] += a[i] * b[j];
}
}
VL r(sz);
for (int i = 0; i < sz; ++i) r[i] = c[i] % p;
return r;
};
const int N = 65;
std::function<VL(VL, VL, int)> mul = [&](VL a, VL b, int n) -> VL {
if (n < N) return mulS(a, b);
int n2 = n / 2, n1 = n - 1;
VL a2 = VL(a.begin() + n2, a.end());
VL b2 = VL(b.begin() + n2, b.end());
a.resize(n2); b.resize(n2);
VL ap = a2, bp = b2;
for (int i = 0; i < n2; ++i) if ((ap[i] += a[i]) >= p) ap[i] -= p;
for (int i = 0; i < n2; ++i) if ((bp[i] += b[i]) >= p) bp[i] -= p;
std::future<VL> abThread = std::async(mul, a, b, n2);
VL a2b = mul(ap, bp, n2);
VL ab = abThread.get();
VL a2b2 = mul(a2, b2, n2);
for (int i = 0; i < n1; ++i) {
if ((a2b[i] -= ab[i]) < 0) a2b[i] += p;
if ((a2b[i] -= a2b2[i]) < 0) a2b[i] += p;
}
auto r = ab;
r.emplace_back(0);
r.insert(r.end(), a2b2.begin(), a2b2.end());
for (int i = 0; i < n1; ++i) if ((r[i + n2] += a2b[i]) >= p) r[i + n2] -= p;
return r;
};
int n = a.size(), m = b.size(), tot = n + m - 1, sz = 1;
if (m < N || n / m * 2 > m) return mulS(a, b);
while (sz < n) sz *= 2;
a.resize(sz), b.resize(sz);
auto r = mul(a, b, sz);
r.resize(tot);
return r;
} // 模板例题:https://www.luogu.com.cn/problem/P4245

int main() {
// freopen("in", "r", stdin);
// freopen("out", "w", stdout);
std::ios::sync_with_stdio(false);
std::cin.tie(nullptr);
int n, m;
LL p;
std::cin >> n >> m >> p;
VL a(n + 1), b(m + 1);
for (auto &x : a) {
std::cin >> x;
x %= p;
}
for (auto &x : b) {
std::cin >> x;
x %= p;
}
VL c = KaratsubaParallel(a, b, p);
for (auto &x : c) std::cout << x << " ";
std::cout << "\n";
return 0;
}