《C++ Concurrentcy in Action 2nd》 第四章阅读笔记。


在多线程编程中,不仅需要知道处理数据的一致性问题,还需要熟悉线程之间的同步机制。

1. 等待某个事件/条件

当一个线程需要等待其他线程完成某些任务时,可以使用 C++ 提供的条件变量 (condition variable)。

1.1. 使用条件变量

C++ 在 condition_variable 库中实现了两种条件变量:

  • std::condition_variable: 仅限于和 std::mutex 一起使用,性能比 std::condition_variable_any
  • std::condition_variable_any:可以和与 std::mutex 类似的对象一起使用
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// Listing 4.1 Waiting for data to process with std::condition_variable

std::mutex mut;
std::queue<data_chunk> data_queue;  // phase 1
std::condition_variable data_cond;

void data_preparation_thread()
{
  while (more_data_to_prepare())
  {
    data_chunk const data = prepare_data();
    std::lock_guard<std::mutex> lk(mut);
    data_queue.push(data);  // phase 2
    data_cond.notify_one(); // phase 3
  }
}

void data_processing_thread()
{
  while (true)
  {
    std::unique_lock<std::mutex> lk(mut); // phase 4
    data_cond.wait(lk, []{return !data_queue.empty();});  // phase 5
    data_chunk data = data_queue.front();
    data_queue.pop();
    lk.unlock();  // phase 6
    process(data);
    if (is_last_chunk(data))  break;
  }
}

我们首先分析 data_preparation_thread(),如果 data 准备完成,则会对 mut 上锁,然后 data 压入队列 (phase 2),随后调用条件变量实例 data_cond 的成员函数 notify_one() 通知等待该条件变量的线程,这里我们使用了 std::lock_guard,保证在通知等待线程后便释放互斥锁,可以避免在等待线程被唤醒后还需等待互斥锁的性能损失。

data_processing_thread() 中,在 phase 4 使用 std::unique_lock 而不使用 std::lock_guard 是因为我们可能需要在 phase 5 执行 unlocklock 操作,可以从 wait() 的一个简单实现(如下)看出这点,当然还有一个好处是,我们可以在对数据进行处理前主动释放锁 (phase 6)。

1
2
3
4
5
6
7
template<typename Predicate>
void minimal_wait(std::unique_lock<std::mutex>& lk, Predicate pred) {
  while (!pred()) {
    lk.unlock();
    lk.lock();
  }
}

注意:上面 wait() 的简单实现没有考虑与 notify_one()notify_all() 的协作,仅为示意用。

1.2. 带有条件变量的线程安全队列

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
// Listing 4.2 std::unique interface

template <class T, class Container = std::deque<T>>
class queue {
public:
  explicit queue(const Container&);
  explicit queue(Container&& = Container());
  template <class Alloc> explicit queue(const Alloc&);
  template <class Alloc> queue(const Container&, const Alloc&);
  template <class Alloc> queue(Container&& const Alloc&);
  template <class Alloc> queue(queue&&, const Alloc&);
  void swap(queue& q);
  bool empty() const;
  size_type size() const;
  T& front();
  const T& front() const;
  T& back();
  const T& back() const;
  void push(const T& x);
  void pop();
  template <class... Args> void emplace(Args... args);
};
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
// Listing 4.3 The interface of your threadsafe_queue

#include <memory>
template<typename T>
class threadsafe_queue
{
public:
  threadsafe_queue();
  threadsafe_queue(const threadsafe_queue&);
  threadsafe_queue& operator=(const threadsafe_queue&) = delete;
  
}