ふわふわ時間

0%

CCIA-线程间共享数据

《C++ Concurrentcy in Action 2nd》 第三章阅读笔记。


使用互斥锁保护共享数据

在 C++ 中使用互斥锁

构造一个 std::mutex 的实例即可创建一个互斥锁,使用成员函数 lock()unlock() 可分别执行加锁和释放锁的操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// Listing 3.1. Protecting a list with a mutex

#include <list>
#include <mutex>
#include <algorithm>

std::list<int> some_list; // global variable
std::mutex some_mutex;

void add_to_list(int new_value)
{
std::lock_guard<std::mutex> guard(some_mutex); // RAII idiom for a mutex
some_list.push_back(new_value);
}

bool list_contains(int value_to_find)
{
std::lock_guard<std::mutex> guard(some_mutex);
return std::find(some_list.begin(), some_list.end(), value_to_find) != some_list.end();
}

在 C++17 中,由于添加了类模板中的模板参数自动推导,故可将 std::lock_guard<std::mutex> guard(some_mutex) 简化为 std::lock_guard guard(some_mutex)

利用 OOP 的思想保护共享数据

在使用互斥锁保护共享数据时,特别要注意关于临界资源的指针和引用,其可能会在临界区外修改临界资源。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// Listing 3.2 Accidentally passing out a reference to protected data

class some_data
{
int a;
std::string b;
public:
void do_something();
};

class data_wrapper
{
private:
some_data data;
std::mutex m;
public:
template<typename Function>
void process_data(Function func)
{
std::lock_guard<std::mutex> l(m);
func(data); // Pass "protected" data to user-supplied function
}
};

some_data* unprotected;

void malicious_function(some_data& protected_data)
{
unprotected = &protected_data;
}

data_wrapper x;

void foo()
{
x.process_data(malicious_function); // Pass in a malicious function
unprotected->do_something(); // Unprotected access to protected data
}

对于共享数据,无论是作为函数的返回值,还是函数的实参,都不要传递其指针或引用至临界区外。

发现接口固有的竞争情况

例如在双向链表中,如果想删除某个节点,仅仅对该节点上锁是不可行的。假设一个 stack 容器定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// Listing 3.3 The interface to the std::stack container adapter

template<typename T, typename Container=std::deque<T>>
class stack
{
public:
explicit stack(const Container&);
explicit stack(Container&& = Container());
template <class Alloc> explicit stack(const Alloc&);
template <class Alloc> stack(const Container&, const Alloc&);
template <class Alloc> stack(Container&&, const Alloc&);
template <class Alloc> stack(stack&&, const Alloc&);
bool empty() const;
size_t size() const;
T& top();
T const& top() const;
void push(T const&);
void push(T&&);
void pop();
void swap(stack&&);
template <class... Args> void emplace(Args&&... args);
};

在多线程环境下,上述定义中 empty()size() 接口得到的值不保证可用性。例如在下述代码中,在 phase 1 和 phase 2 之间可能有另外一个线程清空该 stack 。

1
2
3
4
5
6
7
stack<int> s;
if (!s.empty()) // phase 1
{
int const value = s.top(); // phase 2
s.pop(); // phase 3
do_something(value);
}

除此之外,还有一种很常见的情况发生:

Thread A Thread B
if(!s.empty())
. if(!s.empty())
int const value = s.top()
. int const value = s.top()
s.pop()
do_something(value) s.pop()
. do_something(value)

在上述表格的执行顺序下,我们会 pop 出两个元素,但 do_something() 只处理了其中一个元素,而且我们还很难发现这个 bug 。但是如果将 top()pop() 两个接口合并的话,也会有新的问题:拷贝操作可能抛出异常(比如内存分配失败等),从而导致 stack 发生了修改,但我们没得到栈顶的元素。我们可以从几个方面试着解决这个问题:

  1. Pass in a reference

    我们可以选择给 pop() 函数传递一个变量的引用以避免内存分配时可能抛出的异常。该方法有两个前提条件:

    • 需要提前构造栈元素的实例
    • 需要栈元素支持赋值操作

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    template<typename T, typename Container=std::deque<T>>
    class stack
    {
    public:
    ...
    void pop(T& value);
    ...
    };

    std::vector<int> result;
    some_stack.pop(result);

  2. Require a no-throw copy constructor or move constructor

    有些类型的拷贝构造函数或者移动构造函数并不会抛出异常,所以对它们而言并不存在该异常安全问题。

  3. Return a pointer to the popped value

    返回指针虽然可以避免拷贝操作,但是需要注意内存泄露问题,可使用智能指针进行内存管理。

  4. Provide both option 1 and either option 2 or 3

    下面是线程安全的 threadsafe_stack 类型定义。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    // Listing 3.4 An outline class definition for a thread-safe satck

    #include <exception>
    #include <memory> // For std::shared_ptr<T>

    struct empty_stack: std::exception
    {
    const char* what() const noexcept;
    };

    template<typename T>
    class threadsafe_stack
    {
    public:
    threadsafe_stack();
    threadsafe_stack(const threadsafe_stack&);
    threadsafe_stack& operator=(const threadsafe_stack&) = delete;
    void push(T new_value);
    std::shared_ptr<T> pop(); // option 3
    void pop(T& value); // option 1
    bool empty() const;
    };


    // Listing 3.5 A fleshed-out class definition for a thread-safe satck

    #include <exception>
    #include <memory>
    #include <mutex>
    #include <stack>
    struct empty_stack: std::exception
    {
    const char* what() const noexcept;
    };

    template<typename T>
    class threadsafe_stack
    {
    private:
    std::stack<T> data;
    /* Mutable is used to specify that the member does not affect the externally
    visible state of the class. */
    mutable std::mutex m;

    public:
    threadsafe_stack(){}
    threadsafe_stack(const threadsafe_stack& other)
    {
    std::lock_guard<std::mutex> lock(other.m);
    data = other.data; // Copy performed in constructor body
    }
    threadsafe_stack& operator = (const threadsafe_stack&) = delete;
    void push(T new_value)
    {
    std::lock_guard<std::mutex> lock(m);
    data.push(std::move(new_value));
    }
    std::shared_ptr<T> pop()
    {
    std::lock_guard<std::mutex> lock(m);
    // Check for empty before trying to pop value
    if (data.empty()) throw empty_stack();
    // Allocate return value before modifying stack
    std::shard_ptr<T> const res(std::make_shared<T>(data.top()));
    data.pop();
    return res;
    }
    void pop(T& value)
    {
    std::lock_guard<std::mutex> lock(m);
    if (data.empty()) throw empty_stack();
    value = data.pop();
    data.pop();
    }
    bool empty() const
    {
    std::lock_guard<std::mutex> lock(m); // require m mutable here
    return data.empty();
    }
    };

    Tips: Difference between C++03 throw() specifier C++11 noexcept.

死锁

当一个操作中含有两个或多个互斥锁时,死锁就有可能会发生。如果可以同时加锁的话,此时可避免死锁的情况,针对于此,STL 的 std::lock 函数可同时对多个互斥锁进行加锁操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// Listing 3.6 Using std::lock() and std::lock_guard in a swap operation

class some_big_object;
void swap(some_big_object& lhs, some_big_object& rhs);

class X
{
private:
some_big_object some_detail;
std::mutex m;
public:
X(some_big_object const& sd) : some_detail(sd){}
friend void swap(X& lhs, X& rhs)
{
if (&lhs == &rhs)
return;
std::lock(lhs.m, rhs.m); // phase 1
std::lock_guard<std::mutex> lock_a(lhs.m, std::adopt_lock); // phase 2
std::lock_guard<std::mutex> lock_b(rhs.m, std::adopt_lock); // phase 3
swap(lhs.some_detail, rhs.some_detail);
}
};

phase 2 和 phase 3 中的 std::adopt_lock 表示 lock_guard 构造函数仅会取得互斥锁 \(m\) 的所有权,不会对其进行加锁操作。C++ 17 提供了 和 lock_guard 功能类似但是支持可变模板参数的 scoped_lock,C++ 17 中可将 swap 函数改写成如下形式:

1
2
3
4
5
6
7
8
void swap(X& lhs, X& rhs)
{
if (&lhs == &rhs)
return;
// std::scoped_lock<std::mutex, std::mutex> guard(lhs.m, rhs.m);
std::scoped_lock guard(lhs.m, rhs,m);
swap(lhs.some_detail, rhs.some_detail);
}

对于可以同时加锁的情况,避免死锁比较容易做到,但是在其他情况下,如果有效避免死锁的发生是相当困难的。

避免死锁的几点建议

死锁不仅仅是伴随着互斥锁而出现,例如,在两个线程互相等待对方结束的情况下,也会发生死锁。

  1. Don't acquire a lock if you already hold one. 使用场景受限,而且不能避免循环等待。

  2. Avoid calling user-supplied code while holding a lock.

  3. 如果必须要获得多个锁时,建议按固定顺序进行加锁操作,例如在双向链表中,加锁时只能以从头至尾的顺序。

  4. Use a lock hierarchy

    给互斥锁附加一个 hierarchy_value 字段,当线程持有互斥锁时,只能对 hierarchy_value 比当前锁更小的 mutex 加锁。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    // Listing 3.7 Using a lock hierarchy to prevent deadlock
    hierarchical_mutex high_level_mutex(10000);
    hierarchical_mutex low_level_mutex(5000);
    hierarchical_mutex other_mutex(6000);

    int do_low_level_stuff();
    int low_level_func()
    {
    std::lock_guard<hierarchical_mutex> lk(low_level_mutex);
    return do_low_level_stuff();
    }

    void high_level_stuff(int some_param);
    void high_level_func()
    {
    std::lock_guard<hierachical_mutex> lk(high_level_mutex);
    high_level_stuff(low_level_func());
    }

    void thread_a()
    {
    high_level_func();
    }

    void do_other_stuff();
    void other_func()
    {
    high_level_func();
    do_other_stuff();
    }

    void thread_b()
    {
    std::lock_guard<hierachical_mutex> lk(other_mutex)
    other_stuff();
    }

    在上述示例中,thread_a() 遵守规则,运行良好;而 thread_b() 违反规则,故运行失败。下面是 hierarchical_mutex 的一个简单实现。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    // Listing 3.8 A simple hierachical mutex

    class hierarchical_mutex
    {
    std::mutex internal_mutex;
    unsigned long const hierarchy_value;
    unsigned long previous_hierarchy_value;
    static thread_local unsigned long this_thread_hierarchy_value;

    void check_for_hierarchy_violation()
    {
    if (this_thread_hierarchy_value <= hierarchy_value)
    {
    throw std::logic_error("mutex hierarchy violated.");
    }
    }

    void update_hierarchy_value()
    {
    pervious_hierarchy_value = this_thread_hierarchy_value;
    this_thread_hierarchy_value = hierarchy_value;
    }

    public:
    explicit hierarchical_mutex(unsigned long value):
    hierarchy_value(value),
    previous_hierarchy_value(0) {}
    void lock()
    {
    check_for_hierarchy_violation();
    internal_mutex.lock();
    update_hierarchy_value();
    }
    void unlock()
    {
    if (this_thread_hierarchy_value != hierarchy_value)
    throw std::logic_error("mutex hierarchy violated.");
    this_thread_hierarchy_value = previous_hierarchy_value;
    internal_mutex.unlock();
    }
    bool try_lock()
    {
    check_for_hierarchy_violation();
    if (!internal_mutex.try_lock())
    return false;
    update_hierarchy_value();
    return true;
    }
    };
    thread_local unsigned long
    hierarchical_mutex::this_thread_hierarchy_value(ULONG_MAX);

    Tips: thread_local

std::unique_lock

使用 std::unique_lock 可以将 Listing 3.6 改写为如下形式。其中 std::defer_lock 表示在构造 std::unqiue_lock 实例时,会让传入的 std::mutex 保持 unlocked。

std::unique_lock 的优缺点:

  • 优点:std::unique_lock 并不会总是持有 std::mutex 的所有权,使用更加灵活
  • 缺点:为了实现上面的优点,需要花费额外的空间和时间以存储更新相关状态数据,所以 std::unqiue_lockstd::lock_guard 慢且更占存储空间。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// Listing 3.9 Using std::lock() and std::unique_lock() in a swap operation

class some_big_object;
void swap(some_big_object& lhs, some_big_object& rhs);

class X
{
private:
some_big_object some_detail;
std::mutex m;

public:
X(some_big_object const& sd):some_detail(sd) {}
friend void swap(X& lhs, X& rhs)
{
if (&lhs == &rhs)
return;
// std::defer_lock leaves mutexes unlocked.
std::unique_lock<std::mutex> lock_a(lhs.m, std::defer_lock);
std::unique_lock<std::mutex> lock_b(rhs.m, std::defer_lock);
// Mutexes are locked here
// std::unique_lock objects could be passed to std::lock()
std::lock(lock_a, lock_b);
swap(lhs.some_detail, rhs.some_detail);
}
};

在上例中,我们发现 std::unique_lock 对象可以传入 std::lock() 函数,这是因为 std::unique_lock 实现了 lock()try_lock()unlock() 成员函数。除此之外,还可以通过调用 owns_lock() 函数查看 std::unique_lock 当前是否拥有 std::mutex

std::unique_lock 类型不支持 copy,但支持 move,例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
std::unique_lock<std::mutex> get_lock()
{
extern std::mutex some_mutex;
std::unique_lock<std::mutex> lk(some_mutex);
prepare_data();
return lk;
}

void process_data()
{
std::unique_lock<std::mutex> lk(get_lock());
do_something();
}

合理设置互斥锁的粒度

通常说到粒度,大家都会想到数据大小相关的粒度的概念;除此之外,占有锁的时间的粒度大小也需要多加注意。比如,有时不必全程持有互斥锁,只在对共享数据进行操作时才上锁,操作完毕后及时释放。

1
2
3
4
5
6
7
8
9
10
11
void get_add_process_data ()
{
std::unique_lock<std::mutex> my_lock(the_mutex);
some_class data_to_process = get_next_data_chunk();
// Don't need mutex locked across call to process()
my_lock.unlock();
result_type result = process(data_to_process);
// Relock mutex to write result
my_lock.lock();
write_result(data_to_process, result);
}

其次,我们需要尽可能地减少占有锁的时间。比如,如果对数据的副本进行操作也可以得到想要的结果,而且拷贝操作耗时很小时,我们可以选择操作副本。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// Listing 3.10 Locking one mutex at a time in a comparison operator

class Y
{
private:
int some_detail;
mutable std::mutex m;
int get_detail() const
{
std::lock_guard<std::mutex> lock_a(m);
return some_detail;
}

public:
Y(int sd):some_detail(sd) {}
friend bool operator== (Y const& lhs, Y const& rhs)
{
if (&lhs == &rhs)
return true;
int const lhs_value = lhs.get_detail(); // phase 1
int const rhs_value = rhs.get_detail(); // phase 2
return lhs_value == rhs_value;
}
};

上例中需要注意的是,在 phase 1 和 phase 2 之间,可能 lhs 的值发生了改变,所以返回的结果的有效性有一定损失。

保护共享数据的其他场景/方法

互斥锁的确很通用,但在特定的场合下选择更适合的保护机制也是很有必要的。

仅在初始化阶段需要保护的共享数据

假设你需要对某个对象使用 Lazy initialization 机制,在单线程下很容易实现,仅需要在使用之前检查是否已初始化即可:

1
2
3
4
5
6
7
8
9
std::shared_ptr<some_resource> resource_ptr;
void foo()
{
if (!resource_ptr)
{
resource_ptr.reset(new some_resource);
}
resource_ptr->do_something();
}

在多线程的场景下,初始化的操作需要进行保护,一个简单的实现如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Listing 3.11 Thread-safe lazy initialization using a mutex

std::shared_ptr<some_resource> resource_ptr;
std::mutex resource_mutex;
void foo()
{
// All threads are serialized here
std::unique_lock<std::mutex> lk(resource_mutex);
if (!resource_ptr)
{
// Only the initialization needs protection
resource_ptr.reset(new some_resource);
}
lk.unlock();
resource_ptr->do_something();
}

在上述实现中,增加了不必要的加锁、释放锁的操作,而我们所需要保护的仅仅是初始化操作而已。为了解决这个问题,可能有人会想到 double-checked locking 模式,写出如下所示的代码,但是这不仅不会解决我们得问题,还会额外引发其他问题:线程在 phase 1 可随意访问共享对象,其与 phase 3 并不处于同步状态,很有可能在 phase 1 阶段发现指针非空便直接到了 phase 4 阶段,而此时对象 some_resource 可能还未构造完毕,从而造成未定义的行为。

1
2
3
4
5
6
7
8
9
10
11
12
13
// infamous double-checked locking pattern
void undefined_behaviour_with_double_checked_locking ()
{
if (!resource_ptr) // phase 1
{
std::lock_guard<std::mutex> lk(resource_mutex);
if (!resource_ptr) // phase 2
{
resource_ptr.reset(new some_resource); // phase 3
}
}
resource_ptr->do_something(); // phase 4
}

使用 C++ 提供的 std::once_flagstd::call_once 便可写出满足要求的代码,每个 std::once_flag 实例对应于不同的初始化过程。

1
2
3
4
5
6
7
8
9
10
11
12
std::shared_ptr<some_resource> resource_ptr;
std::once_flag resourceZ_flag;
void init_resource()
{
resource_ptr.reset(new some_resource);
}
void foo()
{
// Initalization is called exactly once.
std::call_once(resource_flag, init_resource);
resource_ptr->do_something();
}

在类的成员函数使用 std::call_once() 时需要传入 this 指针,就像 std::thread 的构造函数或 std::bind() 一样,如下所示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// Listing 3.12 Thread-safe lazy initialization of a class member using std::call_once

class X
{
private:
connection_info connection_details;
connection_handle connection;
std::once_flag connection_init_flag;
void open_connection()
{
connection = connection_manager.open(connection_details);
}

public:
X(connection_info const& connection_details_) :
connection_details(connection_details_) {}
void send_data(data_packet const& data)
{
std::call_once(connection_init_flag, &X::open_connection, this);
connection.send_data(data);
}
data_packet receive_data()
{
std::call_once(connection_init_flag, &X::open_connection, this);
return connection.receive_data();
}
};

在 C++11 之前,静态变量也有潜在的竞态情况,但在 C++11 之中这一点被修复了,静态局部变量的初始化仅会在一个线程中执行,所以在 C++11 之中,声明变量为 static 也可以起到和 std::call_once() 类似的效果。

1
2
3
4
5
6
7
class my_class;
my_class& get_my_class_instance()
{
// Initialization guaranteed to be thread-safe.
static my_class instance;
return instance;
}

保护极少更新的共享数据

对于这种情况,我们可以立马想到使用读写锁,C++17 提供两种读写锁:std::shared_mutexstd::shared_timed_mutex;C++14 仅提供 std::shared_time_mutex;而对于 C++11,如果想使用读写锁,就需要依赖 Boost library 提供的实现。

对于 std::shared_mutex(std::mutex_time_mutex同样),在使用 RAII 机制时:

  • 写锁:std::lock_guard<std::shared_mutex>std::unique_lock<std::shared_mutex>
  • 读锁:std::shared_lock<std::shared_mutex>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// Listing 3.13 Protecting a data structure with std::shared_mutex

#include <map>
#include <string>
#include <mutex>
#include <shared_mutex>

class dns_entry;

class dns_cache
{
std::map<std::string, dns_entry> entries;
mutable std::shared_mutex entry_mutex;

public:
dns_entry find_entry(std::string const& domain) const
{
std::shared_lock<std::shared_mutex> lk(entry_mutex);
std::map<std::string, dns_entry>::const_iterator const it = entries.find(domain);
return (it == entries.end()) ? dns_entry() : it->second;
}

void update_or_add_entry(std::string const& domain, dns_entry const& dns_details)
{
std::lock_guard<std::shared_mutex> lk(entry_mutex);
entries[domain] = dns_details;
}
};

递归锁

虽然一般不推荐使用递归锁,但是想使用的话,C++ 提供了递归锁的实现:std::recursive_mutex