《C++ Concurrentcy in Action 2nd》 第三章阅读笔记。


1. 使用互斥锁保护共享数据

1.1. 在 C++ 中使用互斥锁

构造一个 std::mutex 的实例即可创建一个互斥锁,使用成员函数 lock()unlock() 可分别执行加锁和释放锁的操作。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
// Listing 3.1. Protecting a list with a mutex

#include <list>
#include <mutex>
#include <algorithm>

std::list<int> some_list;   // global variable
std::mutex some_mutex;

void add_to_list(int new_value)
{
  std::lock_guard<std::mutex> guard(some_mutex); // RAII idiom for a mutex
  some_list.push_back(new_value);
}

bool list_contains(int value_to_find)
{
  std::lock_guard<std::mutex> guard(some_mutex);
  return std::find(some_list.begin(), some_list.end(), value_to_find) != some_list.end();
}

在 C++17 中,由于添加了类模板中的模板参数自动推导,故可将 std::lock_guard<std::mutex> guard(some_mutex) 简化为 std::lock_guard guard(some_mutex)

1.2. 利用 OOP 的思想保护共享数据

在使用互斥锁保护共享数据时,特别要注意关于临界资源的指针和引用,其可能会在临界区外修改临界资源。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// Listing 3.2 Accidentally passing out a reference to protected data

class some_data
{
  int a;
  std::string b;
public:
  void do_something();
};

class data_wrapper
{
private:
  some_data data;
  std::mutex m;
public:
  template<typename Function> 
  void process_data(Function func)
  {
    std::lock_guard<std::mutex> l(m);
    func(data); // Pass "protected" data to user-supplied function
  }  
};

some_data* unprotected;

void malicious_function(some_data& protected_data)
{
  unprotected = &protected_data;
}

data_wrapper x;

void foo()
{
  x.process_data(malicious_function); // Pass in a malicious function
  unprotected->do_something(); // Unprotected access to protected data
}

对于共享数据,无论是作为函数的返回值,还是函数的实参,都不要传递其指针或引用至临界区外。

1.3. 发现接口固有的竞争情况

例如在双向链表中,如果想删除某个节点,仅仅对该节点上锁是不可行的。假设一个 stack 容器定义如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
// Listing 3.3 The interface to the std::stack container adapter

template<typename T, typename Container=std::deque<T>>
class stack
{
public:
  explicit stack(const Container&);
  explicit stack(Container&& = Container());
  template <class Alloc> explicit stack(const Alloc&);
  template <class Alloc> stack(const Container&, const Alloc&);
  template <class Alloc> stack(Container&&, const Alloc&);
  template <class Alloc> stack(stack&&, const Alloc&);
  bool empty() const;
  size_t size() const;
  T& top();
  T const& top() const;
  void push(T const&);
  void push(T&&);
  void pop();
  void swap(stack&&);
  template <class... Args> void emplace(Args&&... args);
};

在多线程环境下,上述定义中 empty()size() 接口得到的值不保证可用性。例如在下述代码中,在 phase 1 和 phase 2 之间可能有另外一个线程清空该 stack 。

1
2
3
4
5
6
7
stack<int> s;
if (!s.empty()) // phase 1
{
  int const value = s.top(); // phase 2
  s.pop();  // phase 3
  do_something(value);
}

除此之外,还有一种很常见的情况发生:

Thread A Thread B
if(!s.empty())
. if(!s.empty())
int const value = s.top()
. int const value = s.top()
s.pop()
do_something(value) s.pop()
. do_something(value)

在上述表格的执行顺序下,我们会 pop 出两个元素,但 do_something() 只处理了其中一个元素,而且我们还很难发现这个 bug 。但是如果将 top()pop() 两个接口合并的话,也会有新的问题:拷贝操作可能抛出异常(比如内存分配失败等),从而导致 stack 发生了修改,但我们没得到栈顶的元素。我们可以从几个方面试着解决这个问题:

  1. Pass in a reference

    我们可以选择给 pop() 函数传递一个变量的引用以避免内存分配时可能抛出的异常。该方法有两个前提条件:

    • 需要提前构造栈元素的实例
    • 需要栈元素支持赋值操作
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    
    template<typename T, typename Container=std::deque<T>>
    class stack
    {
    public:
      ...
      void pop(T& value);
      ...
    };
    
    std::vector<int> result;
    some_stack.pop(result);
    
  2. Require a no-throw copy constructor or move constructor

    有些类型的拷贝构造函数或者移动构造函数并不会抛出异常,所以对它们而言并不存在该异常安全问题。

  3. Return a pointer to the popped value

    返回指针虽然可以避免拷贝操作,但是需要注意内存泄露问题,可使用智能指针进行内存管理。

  4. Provide both option 1 and either option 2 or 3

    下面是线程安全的 threadsafe_stack 类型定义。

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    
    // Listing 3.4 An outline class definition for a thread-safe satck
    
    #include <exception>
    #include <memory> // For std::shared_ptr<T>
    
    struct empty_stack: std::exception
    {
      const char* what() const noexcept;
    };
    
    template<typename T>
    class threadsafe_stack
    {
    public:
      threadsafe_stack();
      threadsafe_stack(const threadsafe_stack&);
      threadsafe_stack& operator=(const threadsafe_stack&) = delete;
      void push(T new_value);
      std::shared_ptr<T> pop(); // option 3
      void pop(T& value); // option 1
      bool empty() const;  
    };
    
    
    // Listing 3.5 A fleshed-out class definition for a thread-safe satck
    
    #include <exception>
    #include <memory>
    #include <mutex>
    #include <stack>
    struct empty_stack: std::exception
    {
      const char* what() const noexcept;
    };
    
    template<typename T>
    class threadsafe_stack
    {
    private:
      std::stack<T> data;
      /* Mutable is used to specify that the member does not affect the externally 
        visible state of the class. */
      mutable std::mutex m; 
    
    public:
      threadsafe_stack(){}
      threadsafe_stack(const threadsafe_stack& other)
      {
        std::lock_guard<std::mutex> lock(other.m);
        data = other.data; // Copy performed in constructor body
      }
      threadsafe_stack& operator = (const threadsafe_stack&) = delete;
      void push(T new_value)
      {
        std::lock_guard<std::mutex> lock(m);
        data.push(std::move(new_value));
      }
      std::shared_ptr<T> pop()
      {
        std::lock_guard<std::mutex> lock(m);
        // Check for empty before trying to pop value
        if (data.empty()) throw empty_stack();
        // Allocate return value before modifying stack
        std::shard_ptr<T> const res(std::make_shared<T>(data.top()));
        data.pop();
        return res;
      }
      void pop(T& value)
      {
        std::lock_guard<std::mutex> lock(m);
        if (data.empty()) throw empty_stack();
        value = data.pop();
        data.pop();
      }
      bool empty() const
      {
        std::lock_guard<std::mutex> lock(m); // require m mutable here
        return data.empty();
      }  
    };
    

    Tips: Difference between C++03 throw() specifier C++11 noexcept.

1.4. 死锁

当一个操作中含有两个或多个互斥锁时,死锁就有可能会发生。如果可以同时加锁的话,此时可避免死锁的情况,针对于此,STL 的 std::lock 函数可同时对多个互斥锁进行加锁操作。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
// Listing 3.6 Using std::lock() and std::lock_guard in a swap operation

class some_big_object;
void swap(some_big_object& lhs, some_big_object& rhs);

class X
{
private:
  some_big_object some_detail;
  std::mutex m;
public:
  X(some_big_object const& sd) : some_detail(sd){}
  friend void swap(X& lhs, X& rhs)
  {
    if (&lhs == &rhs)
      return;
    std::lock(lhs.m, rhs.m);  // phase 1
    std::lock_guard<std::mutex> lock_a(lhs.m, std::adopt_lock); // phase 2
    std::lock_guard<std::mutex> lock_b(rhs.m, std::adopt_lock); // phase 3
    swap(lhs.some_detail, rhs.some_detail);
  }
};

phase 2 和 phase 3 中的 std::adopt_lock 表示 lock_guard 构造函数仅会取得互斥锁 $m$ 的所有权,不会对其进行加锁操作。C++ 17 提供了 和 lock_guard 功能类似但是支持可变模板参数的 scoped_lock,C++ 17 中可将 swap 函数改写成如下形式:

1
2
3
4
5
6
7
8
void swap(X& lhs, X& rhs)
{
  if (&lhs == &rhs)
    return;
  // std::scoped_lock<std::mutex, std::mutex> guard(lhs.m, rhs.m);
  std::scoped_lock guard(lhs.m, rhs,m);
  swap(lhs.some_detail, rhs.some_detail);
}

对于可以同时加锁的情况,避免死锁比较容易做到,但是在其他情况下,如果有效避免死锁的发生是相当困难的。

1.5. 避免死锁的几点建议

死锁不仅仅是伴随着互斥锁而出现,例如,在两个线程互相等待对方结束的情况下,也会发生死锁。

  1. Don’t acquire a lock if you already hold one. 使用场景受限,而且不能避免循环等待。

  2. Avoid calling user-supplied code while holding a lock.

  3. 如果必须要获得多个锁时,建议按固定顺序进行加锁操作,例如在双向链表中,加锁时只能以从头至尾的顺序。

  4. Use a lock hierarchy

    给互斥锁附加一个 hierarchy_value 字段,当线程持有互斥锁时,只能对 hierarchy_value 比当前锁更小的 mutex 加锁。

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    
    // Listing 3.7 Using a lock hierarchy to prevent deadlock
    hierarchical_mutex high_level_mutex(10000);
    hierarchical_mutex low_level_mutex(5000);
    hierarchical_mutex other_mutex(6000);
    
    int do_low_level_stuff();
    int low_level_func()
    {
      std::lock_guard<hierarchical_mutex> lk(low_level_mutex);
      return do_low_level_stuff();
    }
    
    void high_level_stuff(int some_param);
    void high_level_func()
    {
      std::lock_guard<hierachical_mutex> lk(high_level_mutex);
      high_level_stuff(low_level_func());
    }
    
    void thread_a()
    {
      high_level_func();
    }
    
    void do_other_stuff();
    void other_func()
    {
      high_level_func();
      do_other_stuff();
    }
    
    void thread_b()
    {
      std::lock_guard<hierachical_mutex> lk(other_mutex)
      other_stuff();
    }
    

    在上述示例中,thread_a() 遵守规则,运行良好;而 thread_b() 违反规则,故运行失败。下面是 hierarchical_mutex 的一个简单实现。

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    
    // Listing 3.8 A simple hierachical mutex
    
    class hierarchical_mutex
    {
      std::mutex internal_mutex;
      unsigned long const hierarchy_value;
      unsigned long previous_hierarchy_value;
      static thread_local unsigned long this_thread_hierarchy_value;
    
      void check_for_hierarchy_violation()
      {
        if (this_thread_hierarchy_value <= hierarchy_value)
        {
          throw std::logic_error("mutex hierarchy violated.");
        }
      }
    
      void update_hierarchy_value()
      {
        pervious_hierarchy_value = this_thread_hierarchy_value;
        this_thread_hierarchy_value = hierarchy_value;
      }
    
    public:
      explicit hierarchical_mutex(unsigned long value):
        hierarchy_value(value),
        previous_hierarchy_value(0) {}
      void lock()
      {
        check_for_hierarchy_violation();
        internal_mutex.lock();
        update_hierarchy_value();
      }
      void unlock()
      {
        if (this_thread_hierarchy_value != hierarchy_value)
          throw std::logic_error("mutex hierarchy violated.");
        this_thread_hierarchy_value = previous_hierarchy_value;
        internal_mutex.unlock();
      }
      bool try_lock()
      {
        check_for_hierarchy_violation();
        if (!internal_mutex.try_lock())
          return false;
        update_hierarchy_value();
        return true;
      }
    };
    thread_local unsigned long 
      hierarchical_mutex::this_thread_hierarchy_value(ULONG_MAX);
    

    Tips: thread_local

1.6. std::unique_lock

使用 std::unique_lock 可以将 Listing 3.6 改写为如下形式。其中 std::defer_lock 表示在构造 std::unqiue_lock 实例时,会让传入的 std::mutex 保持 unlocked。

std::unique_lock 的优缺点:

  • 优点:std::unique_lock 并不会总是持有 std::mutex 的所有权,使用更加灵活
  • 缺点:为了实现上面的优点,需要花费额外的空间和时间以存储更新相关状态数据,所以 std::unqiue_lockstd::lock_guard 慢且更占存储空间。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// Listing 3.9 Using std::lock() and std::unique_lock() in a swap operation

class some_big_object;
void swap(some_big_object& lhs, some_big_object& rhs);

class X
{
private:
  some_big_object some_detail;
  std::mutex m;

public:
  X(some_big_object const& sd):some_detail(sd) {}
  friend void swap(X& lhs, X& rhs)
  {
    if (&lhs == &rhs)
      return;
    // std::defer_lock leaves mutexes unlocked.
    std::unique_lock<std::mutex> lock_a(lhs.m, std::defer_lock);
    std::unique_lock<std::mutex> lock_b(rhs.m, std::defer_lock);
    // Mutexes are locked here
    // std::unique_lock objects could be passed to std::lock()
    std::lock(lock_a, lock_b);  
    swap(lhs.some_detail, rhs.some_detail);
  }
};

在上例中,我们发现 std::unique_lock 对象可以传入 std::lock() 函数,这是因为 std::unique_lock 实现了 lock()try_lock()unlock() 成员函数。除此之外,还可以通过调用 owns_lock() 函数查看 std::unique_lock 当前是否拥有 std::mutex

std::unique_lock 类型不支持 copy,但支持 move,例如:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
std::unique_lock<std::mutex> get_lock()
{
  extern std::mutex some_mutex;
  std::unique_lock<std::mutex> lk(some_mutex);
  prepare_data();
  return lk;
}

void process_data()
{
  std::unique_lock<std::mutex> lk(get_lock());
  do_something();
}

1.7. 合理设置互斥锁的粒度

通常说到粒度,大家都会想到数据大小相关的粒度的概念;除此之外,占有锁的时间的粒度大小也需要多加注意。比如,有时不必全程持有互斥锁,只在对共享数据进行操作时才上锁,操作完毕后及时释放。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
void get_add_process_data ()
{
  std::unique_lock<std::mutex> my_lock(the_mutex);
  some_class data_to_process = get_next_data_chunk();
  // Don't need mutex locked across call to process()
  my_lock.unlock();
  result_type result = process(data_to_process);
  // Relock mutex to write result
  my_lock.lock();
  write_result(data_to_process, result);
}

其次,我们需要尽可能地减少占有锁的时间。比如,如果对数据的副本进行操作也可以得到想要的结果,而且拷贝操作耗时很小时,我们可以选择操作副本。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// Listing 3.10 Locking one mutex at a time in a comparison operator

class Y
{
private:
  int some_detail;
  mutable std::mutex m;
  int get_detail() const
  {
    std::lock_guard<std::mutex> lock_a(m);
    return some_detail;
  }

public:
  Y(int sd):some_detail(sd) {}
  friend bool operator== (Y const& lhs, Y const& rhs)
  {
    if (&lhs == &rhs)
      return true;
    int const lhs_value = lhs.get_detail(); // phase 1
    int const rhs_value = rhs.get_detail(); // phase 2
    return lhs_value == rhs_value;
  }
};

上例中需要注意的是,在 phase 1 和 phase 2 之间,可能 lhs 的值发生了改变,所以返回的结果的有效性有一定损失。

2. 保护共享数据的其他场景/方法

互斥锁的确很通用,但在特定的场合下选择更适合的保护机制也是很有必要的。

2.1. 仅在初始化阶段需要保护的共享数据

假设你需要对某个对象使用 Lazy initialization 机制,在单线程下很容易实现,仅需要在使用之前检查是否已初始化即可:

1
2
3
4
5
6
7
8
9
std::shared_ptr<some_resource> resource_ptr;
void foo()
{
  if (!resource_ptr)
  {
    resource_ptr.reset(new some_resource); 
  }
  resource_ptr->do_something();
}

在多线程的场景下,初始化的操作需要进行保护,一个简单的实现如下所示:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// Listing 3.11 Thread-safe lazy initialization using a mutex

std::shared_ptr<some_resource> resource_ptr;
std::mutex resource_mutex;
void foo()
{
  // All threads are serialized here
  std::unique_lock<std::mutex> lk(resource_mutex); 
  if (!resource_ptr)
  {
    // Only the initialization needs protection
    resource_ptr.reset(new some_resource);
  }
  lk.unlock();
  resource_ptr->do_something();
}

在上述实现中,增加了不必要的加锁、释放锁的操作,而我们所需要保护的仅仅是初始化操作而已。为了解决这个问题,可能有人会想到 double-checked locking 模式,写出如下所示的代码,但是这不仅不会解决我们得问题,还会额外引发其他问题:线程在 phase 1 可随意访问共享对象,其与 phase 3 并不处于同步状态,很有可能在 phase 1 阶段发现指针非空便直接到了 phase 4 阶段,而此时对象 some_resource 可能还未构造完毕,从而造成未定义的行为。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// infamous double-checked locking pattern
void undefined_behaviour_with_double_checked_locking ()
{
  if (!resource_ptr)  // phase 1
  {
    std::lock_guard<std::mutex> lk(resource_mutex);
    if (!resource_ptr)  // phase 2
    {
      resource_ptr.reset(new some_resource);  // phase 3
    }
  }
  resource_ptr->do_something(); // phase 4
}

使用 C++ 提供的 std::once_flagstd::call_once 便可写出满足要求的代码,每个 std::once_flag 实例对应于不同的初始化过程。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
std::shared_ptr<some_resource> resource_ptr;
std::once_flag resourceZ_flag;
void init_resource()
{
  resource_ptr.reset(new some_resource);
}
void foo()
{
  // Initalization is called exactly once.
  std::call_once(resource_flag, init_resource);
  resource_ptr->do_something();
}

在类的成员函数使用 std::call_once() 时需要传入 this 指针,就像 std::thread 的构造函数或 std::bind() 一样,如下所示。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// Listing 3.12 Thread-safe lazy initialization of a class member using std::call_once

class X
{
private:
  connection_info connection_details;
  connection_handle connection;
  std::once_flag  connection_init_flag;
  void open_connection()
  {
    connection = connection_manager.open(connection_details);
  }

public:
  X(connection_info const& connection_details_) : 
        connection_details(connection_details_) {}
  void send_data(data_packet const& data)
  {
    std::call_once(connection_init_flag, &X::open_connection, this);  
    connection.send_data(data);
  }
  data_packet receive_data()
  {
    std::call_once(connection_init_flag, &X::open_connection, this);
    return connection.receive_data();
  }
};

在 C++11 之前,静态变量也有潜在的竞态情况,但在 C++11 之中这一点被修复了,静态局部变量的初始化仅会在一个线程中执行,所以在 C++11 之中,声明变量为 static 也可以起到和 std::call_once() 类似的效果。

1
2
3
4
5
6
7
class my_class;
my_class& get_my_class_instance()
{
  // Initialization guaranteed to be thread-safe.
  static my_class instance;
  return instance;
}

2.2. 保护极少更新的共享数据

对于这种情况,我们可以立马想到使用读写锁,C++17 提供两种读写锁:std::shared_mutexstd::shared_timed_mutex;C++14 仅提供 std::shared_time_mutex;而对于 C++11,如果想使用读写锁,就需要依赖 Boost library 提供的实现。

对于 std::shared_mutex(std::mutex_time_mutex同样),在使用 RAII 机制时:

  • 写锁:std::lock_guard<std::shared_mutex>std::unique_lock<std::shared_mutex>
  • 读锁:std::shared_lock<std::shared_mutex>
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// Listing 3.13 Protecting a data structure with std::shared_mutex

#include <map>
#include <string>
#include <mutex>
#include <shared_mutex>

class dns_entry;

class dns_cache
{
  std::map<std::string, dns_entry> entries;
  mutable std::shared_mutex entry_mutex;

public:
  dns_entry find_entry(std::string const& domain) const
  {
    std::shared_lock<std::shared_mutex> lk(entry_mutex);
    std::map<std::string, dns_entry>::const_iterator const it = entries.find(domain);
    return (it == entries.end()) ? dns_entry() : it->second;
  }

  void update_or_add_entry(std::string const& domain, dns_entry const& dns_details)
  {
    std::lock_guard<std::shared_mutex> lk(entry_mutex);
    entries[domain] = dns_details;
  }
};

2.3. 递归锁

虽然一般不推荐使用递归锁,但是想使用的话,C++ 提供了递归锁的实现:std::recursive_mutex