《C++ Concurrentcy in Action 2nd》 第二章阅读笔记。
1. 基础线程管理
1.1. 启动线程
使用 C++ 线程库启动线程,可以归结为构造 std::thread
对象,例如:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
// case 1
void do_some_work();
std::thread my_thread(do_some_work);
// case 2
class background_task
{
public:
void operator()() const
{
do_something();
do_something_else();
}
};
background_task f;
std::thread my_thread(f);
|
注意在上述 case 2 中,如果你传入一个临时变量时,编译器可能会将其误解为函数声明:
1
2
3
4
5
|
std::thread my_thread(background_task()); // 可能会误解为函数声明
// 消除歧义的方法
std::thread my_thread( (background_task()) );
std::thread my_thread{background_task()};
|
也可使用 lambda 表达式改写 case 2:
1
2
3
4
|
std::thread my_thread([]{
do_something();
do_something_else();
});
|
1.2. 等待线程完成
因为 join()
函数还会清除与线程相关的存储区,因此一个 std::thread
对象只能调用一次 join()
,一旦调用了 join()
,调用 joinable()
会返回 false
。
如果在线程启动和调用 join()
之间出现异常,我们该如何保证调用 join()
的执行呢?
-
使用 try/catch
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
// Listing 2.2. Waiting for a thread to finish
struct func
{
int& i;
func(int& i_) : i(i_){}
void operator() ()
{
for (unsigned j = 0; j < 1000000; ++j)
{
do_something(i)
}
}
};
void f()
{
int some_local_state = 0;
func my_func(some_local_state);
std::thread t(my_func);
try
{
do_something_in_current_thread();
} catch(...) {
t.join();
throw;
}
t.join();
}
|
-
使用 RAII 机制
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
// Listing 2.3. Using RAII to wait for a thread to complete
class thread_guard
{
std::thread& t;
public:
explicit thread_guard(std::thread& t_) : t(t_) {}
~thread_guard()
{
if (t.joinable())
{
t.join();
}
}
thread_guard(thread_guard const&) = delete;
thread_guard& operator=(thread_guard const&) = delete;
};
struct func; // See Listing 2.2
void f()
{
int some_local_state = 0;
func my_func(some_local_state);
std::thread t(my_func);
thread_guard g(t);
do_something_in_current_thread();
}
|
1.3. 分离线程
通过调用 detach()
可让线程在后台运行。可以通过调用 joinable()
判断线程是否可以分离。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
// Listing 2.4. Detaching a thread to handle other documents
void edit_document(std::string const& filename)
{
open_document_and_display_gui(filename);
while (!done_editing())
{
user_command cmd = get_user_input();
if (cmd.type == open_new_document)
{
std::string const new_name = get_filename_from_user();
std::thread t(edit_document, new_name);
t.detach();
} else {
process_user_input(cmd);
}
}
}
|
2. 传递参数
向可调用对象或函数传递参数很简单,只需要将这些参数作为 std::thread
构造函数的附加参数即可。但需要注意的是,在默认情况下,这些参数会被拷贝至新线程的独立内存空间中,以供新线程访问,并如同临时变量一样作为右值传递给可调用对象或函数。即使函数中的参数是引用的形式,拷贝操作也会执行。例如:
1
2
|
void f(int i, std::string const& s);
std::thread t(f, 3, "heoolo");
|
代码创建了一个调用 f(3, "hello")
的线程。注意,函数 f
需要一个 std::string
对象作为第二个参数,但这里使用的是字符串的字面值,也就是 char const *
类型。之后,在线程的上下文中完成字面值向 std::string
对象的转化。
2.1. 提防隐式转换
特别需要注意的是,当指向动态变量的指针作为参数传递给线程的情况,代码如下:
1
2
3
4
5
6
7
8
|
void f(int i, std::string const& s);
void oops(int some_param)
{
char buffer[1024];
sprintf(buffer, "%i", some_param);
std::thread t(f, 3, buffer);
t.detach();
}
|
这种情况下,buffer
是一个指针变量,指向局部变量,然后此局部变量通过 buffer
传递到新线程中。此时,函数 oops
很有可能会在 buffer
转换成 std::string
对象之前结束,从而导致一些未定义的行为。因为此时无法保证隐式转换的操作和 std::thread
构造函数的拷贝操作按顺序进行,有可能 std::thread
的构造函数拷贝的是转换前的变量 (buffer
指针),而非字符串。解决方案就是在传递到 std::thread
构造函数之前就将字符数组转化为 std::string
对象:
1
2
3
4
5
6
7
8
9
|
void f(int i, std::string const& s);
void not_oops(int some_param)
{
char buffer[1024];
sprintf(buffer,"%i",some_param);
// Using std::string avoids dangling pointer
std::thread t(f, 3, std::string(buffer));
t.detach();
}
|
2.2. 传递引用
当你尝试使用线程更新一个引用传递的数据结构时也会出现问题:
1
2
3
4
5
6
7
8
9
|
void update_data_for_widget(widget_id w, widget_data& data);
void oops_again(widget_id w)
{
widget_data data;
std::thread t(update_data_for_widget, w, data);
display_status();
t.join();
process_widget_data(data);
}
|
在上述代码中,虽然 update_data_for_widget
的第二个参数期待传入一个引用,但是 std::thread
的构造函数并不知晓;其无视 update_data_for_widget
期待的参数类型,并盲目地拷贝提供的变量。内部代码会将拷贝的参数以右值的方式进行传递,这是为了照顾到那些只能进行移动的类型,而后会尝试以右值为实参调用 update_data_for_widget
。但因为函数期望的是一个非常量引用作为参数,而非右值,所以会在编译时出错。对于熟悉 std::bind
的开发者来说,问题的解决办法是显而易见的:可以使用 std::ref
将参数转换成引用的形式,因此可将线程的调用改为以下形式:
1
|
std::thread t(update_data_for_widget, w, std::ref(data));
|
从而,update_data_for_widget
就会接收到一个 data
变量的引用,而非 data
变量的拷贝副本,这样代码就能顺利的通过编译。
2.3. 传递成员函数指针
你也可以向线程传递一个成员函数指针,并提供一个合适的对象指针作为函数第一个参数:
1
2
3
4
5
6
7
|
class X
{
public:
void do_lengthy_work();
};
X my_x;
std::thread t(&X::do_lengthy_work, &my_x);
|
在这段代码中,新线程将会调用 my_x.do_lengthy_work()
,其中 my_x
的地址作为对象指针提供给函数。你也可以为成员函数提供参数:std::thread
构造函数的第三个参数就是成员函数的第一个参数。
2.4. 传递仅支持 move
的参数
当原对象是一个临时变量时,自动进行 move
操作,但当原对象是一个命名变量,那么传递的时候就需要使用 std::move()
进行显示 move
1
2
3
4
|
void process_big_object(std::unique_ptr<big_object>);
std::unique_ptr<big_object> p(new big_object);
p->prepare_data(42);
std::thread t(process_big_object, std::move(p));
|
std::thread
对象其实和 std::unique
类似,也是仅支持 move
,不支持 copy
的类型。这说明了在同一时刻,至多仅有一个 std::thread
实例与一个执行中的线程关联,而且允许在不同的实例之间转移线程的所有权。
3. 转移线程所有权
1
2
3
4
5
6
7
8
|
void some_function();
void some_other_function();
std::thread t1(some_function); // some_function - t1
std::thread t2 = std::move(t1); // some_function - t2
t1 = std::thread(some_other_function); // some_other_function - t1
std::thread t3;
t3 = std::move(t2); // some_function - t3
t1 = std::move(t3); // This assignment will terminate the program
|
std::thread
对象支持 move
操作,意味着其可以作为函数的返回值。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
// Listing 2.5. Returning a std::thread from a function
std::thread f()
{
void some_function();
return std::thread(some_function);
}
std::thread g()
{
void some_other_function(int);
std::thread t(some_other_function, 42);
return t;
}
|
当然,std::thread
类型也可以作为函数的参数。
1
2
3
4
5
6
7
8
9
|
void f(std::thread t);
void g()
{
void some_function();
f(std::thread(some_function));
std::thread t(some_function);
f(std::move(t));
}
|
可以利用 move
实现功能类似 thread_guard
的类,与 Listing 2.3 中的 thread_guard
不同的是,scoped_thread
直接转移线程的所有权。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
// Listing 2.6. scoped_thread and example usage
class scoped_thread
{
std::thread t;
public:
explict scoped_thread(std""thread t_) : t(std::move(t_))
{
if (!t.joinable())
throw std::logic_error("No thread");
}
~scoped_thread()
{
t.join();
}
scoped_thread(scoped_thread const&) = delete;
scoped_thread& operator= (scoped_thread const&) = delete;
};
struct func; // See Listing 2.2
void f()
{
int some_locate_state;
scoped_thread t{std::thread(func(some_local_state))};
do_something_in_current_thread();
}
|
如果保存 std::thread
对象的容器是 move-aware 的,则其也支持 move
语义。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
// Listing 2.8. Spawns some threads and waits for them to finsh
void do_work(unsigned id);
void f()
{
std::vector<std::thread> threads;
for (unsigned i = 0; i < 20; ++i)
{
threads.emplace_back(do_work, i); // Spawns threads
}
for (auto& entry: threads)
entry.join(); // Calls join() on each thread in turn
}
|
Tips: 此处使用的 emplace_back
比 push_back
更高效,它仅需要传入对象构造函数所需的参数。
接口定义 && 两者对比
4. 选择所需线程的数量
函数 std:🧵:hardware_concurrency()
返回当前可真正并行运行的线程数(不过需要确定当前运行环境是否支持该命令)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
// Listing 2.9. A naive parallel version of std::accumulate
template<typename Iterator, typename T>
struct accumulate_block
{
void operator() (Iterator first, Iterator last, T& result)
{
result = std::accumulate(first, last, result);
}
};
template<typename Iterator, typename T>
T parallel_accumulate(Iterator first, Iterator last, T init)
{
unsigned long const length = std::distance(first, last);
if (!length)
return init;
unsigned long const min_per_thread = 25;
unsigned long const max_threads = (length + min_per_thread - 1) / min_per_thread;
unsigned long const hardware_threads = std:🧵:hardware_concurrency();
unsigned long const num_threads =
std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);
unsigned long const block_size = length / num_threads;
std::vector<T> results(num_threads);
std::vector<std::thread> threads(num_threads - 1);
Iterator block_start = first;
for (unsigned long i = 0; i < (num_threads - 1); ++i)
{
Iterator block_end = block_start;
std::advance(block_end, block_size);
threads[i] = std::thread(
accumulate_block<Iterator, T>(),
block_start, block_end, std::ref(results[i]) );
block_start = block_end;
}
// current thread process the final block
accumulate_block<Iterator, T>() (block_start, last, results[num_threads - 1]);
for (auto& entry: threads)
entry.join();
return std::accumulate(results.begin(), results.end(), init);
}
|
5. 线程标识
线程标识的类型是 std:🧵:id
,可通过种方式获取:
- 调用
std::thread
对象的成员函数 get_id()
- 调用
std::this_thread::get_id()
1
2
3
4
5
6
7
8
9
10
|
std:🧵:id master_thread;
void some_core_part_of_algorithm()
{
if (std::this_thread::get_id == master_thread)
{
do_master_thread_work();
}
do_common_work();
}
|