ふわふわ時間

0%

CCIA-线程间同步

《C++ Concurrentcy in Action 2nd》 第四章阅读笔记。


在多线程编程中,不仅需要知道处理数据的一致性问题,还需要熟悉线程之间的同步机制。

等待某个事件/条件

当一个线程需要等待其他线程完成某些任务时,可以使用 C++ 提供的条件变量 (condition variable)。

使用条件变量

C++ 在 condition_variable 库中实现了两种条件变量:

  • std::condition_variable: 仅限于和 std::mutex 一起使用,性能比 std::condition_variable_any
  • std::condition_variable_any:可以和与 std::mutex 类似的对象一起使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// Listing 4.1 Waiting for data to process with std::condition_variable

std::mutex mut;
std::queue<data_chunk> data_queue; // phase 1
std::condition_variable data_cond;

void data_preparation_thread()
{
while (more_data_to_prepare())
{
data_chunk const data = prepare_data();
std::lock_guard<std::mutex> lk(mut);
data_queue.push(data); // phase 2
data_cond.notify_one(); // phase 3
}
}

void data_processing_thread()
{
while (true)
{
std::unique_lock<std::mutex> lk(mut); // phase 4
data_cond.wait(lk, []{return !data_queue.empty();}); // phase 5
data_chunk data = data_queue.front();
data_queue.pop();
lk.unlock(); // phase 6
process(data);
if (is_last_chunk(data)) break;
}
}

我们首先分析 data_preparation_thread(),如果 data 准备完成,则会对 mut 上锁,然后 data 压入队列 (phase 2),随后调用条件变量实例 data_cond 的成员函数 notify_one() 通知等待该条件变量的线程,这里我们使用了 std::lock_guard,保证在通知等待线程后便释放互斥锁,可以避免在等待线程被唤醒后还需等待互斥锁的性能损失。

data_processing_thread() 中,在 phase 4 使用 std::unique_lock 而不使用 std::lock_guard 是因为我们可能需要在 phase 5 执行 unlocklock 操作,可以从 wait() 的一个简单实现(如下)看出这点,当然还有一个好处是,我们可以在对数据进行处理前主动释放锁 (phase 6)。

1
2
3
4
5
6
7
template<typename Predicate>
void minimal_wait(std::unique_lock<std::mutex>& lk, Predicate pred) {
while (!pred()) {
lk.unlock();
lk.lock();
}
}

注意:上面 wait() 的简单实现没有考虑与 notify_one()notify_all() 的协作,仅为示意用。

带有条件变量的线程安全队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// Listing 4.2 std::unique interface

template <class T, class Container = std::deque<T>>
class queue {
public:
explicit queue(const Container&);
explicit queue(Container&& = Container());
template <class Alloc> explicit queue(const Alloc&);
template <class Alloc> queue(const Container&, const Alloc&);
template <class Alloc> queue(Container&& const Alloc&);
template <class Alloc> queue(queue&&, const Alloc&);
void swap(queue& q);
bool empty() const;
size_type size() const;
T& front();
const T& front() const;
T& back();
const T& back() const;
void push(const T& x);
void pop();
template <class... Args> void emplace(Args... args);
};
1
2
3
4
5
6
7
8
9
10
11
12
// Listing 4.3 The interface of your threadsafe_queue

#include <memory>
template<typename T>
class threadsafe_queue
{
public:
threadsafe_queue();
threadsafe_queue(const threadsafe_queue&);
threadsafe_queue& operator=(const threadsafe_queue&) = delete;

}