《C++ Concurrentcy in Action 2nd》 第四章阅读笔记。
在多线程编程中,不仅需要知道处理数据的一致性问题,还需要熟悉线程之间的同步机制。
1. 等待某个事件/条件
当一个线程需要等待其他线程完成某些任务时,可以使用 C++ 提供的条件变量 (condition variable)。
1.1. 使用条件变量
C++ 在 condition_variable
库中实现了两种条件变量:
std::condition_variable
: 仅限于和 std::mutex
一起使用,性能比 std::condition_variable_any
好
std::condition_variable_any
:可以和与 std::mutex
类似的对象一起使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
// Listing 4.1 Waiting for data to process with std::condition_variable
std::mutex mut;
std::queue<data_chunk> data_queue; // phase 1
std::condition_variable data_cond;
void data_preparation_thread()
{
while (more_data_to_prepare())
{
data_chunk const data = prepare_data();
std::lock_guard<std::mutex> lk(mut);
data_queue.push(data); // phase 2
data_cond.notify_one(); // phase 3
}
}
void data_processing_thread()
{
while (true)
{
std::unique_lock<std::mutex> lk(mut); // phase 4
data_cond.wait(lk, []{return !data_queue.empty();}); // phase 5
data_chunk data = data_queue.front();
data_queue.pop();
lk.unlock(); // phase 6
process(data);
if (is_last_chunk(data)) break;
}
}
|
我们首先分析 data_preparation_thread()
,如果 data
准备完成,则会对 mut
上锁,然后 data
压入队列 (phase 2),随后调用条件变量实例 data_cond
的成员函数 notify_one()
通知等待该条件变量的线程,这里我们使用了 std::lock_guard
,保证在通知等待线程后便释放互斥锁,可以避免在等待线程被唤醒后还需等待互斥锁的性能损失。
在 data_processing_thread()
中,在 phase 4 使用 std::unique_lock
而不使用 std::lock_guard
是因为我们可能需要在 phase 5 执行 unlock
和 lock
操作,可以从 wait()
的一个简单实现(如下)看出这点,当然还有一个好处是,我们可以在对数据进行处理前主动释放锁 (phase 6)。
1
2
3
4
5
6
7
|
template<typename Predicate>
void minimal_wait(std::unique_lock<std::mutex>& lk, Predicate pred) {
while (!pred()) {
lk.unlock();
lk.lock();
}
}
|
注意:上面 wait()
的简单实现没有考虑与 notify_one()
和 notify_all()
的协作,仅为示意用。
1.2. 带有条件变量的线程安全队列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
// Listing 4.2 std::unique interface
template <class T, class Container = std::deque<T>>
class queue {
public:
explicit queue(const Container&);
explicit queue(Container&& = Container());
template <class Alloc> explicit queue(const Alloc&);
template <class Alloc> queue(const Container&, const Alloc&);
template <class Alloc> queue(Container&& const Alloc&);
template <class Alloc> queue(queue&&, const Alloc&);
void swap(queue& q);
bool empty() const;
size_type size() const;
T& front();
const T& front() const;
T& back();
const T& back() const;
void push(const T& x);
void pop();
template <class... Args> void emplace(Args... args);
};
|
1
2
3
4
5
6
7
8
9
10
11
12
|
// Listing 4.3 The interface of your threadsafe_queue
#include <memory>
template<typename T>
class threadsafe_queue
{
public:
threadsafe_queue();
threadsafe_queue(const threadsafe_queue&);
threadsafe_queue& operator=(const threadsafe_queue&) = delete;
}
|