ふわふわ時間

0%

CCIA-线程管理

《C++ Concurrentcy in Action 2nd》 第二章阅读笔记。


基础线程管理

启动线程

使用 C++ 线程库启动线程,可以归结为构造 std::thread 对象,例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// case 1
void do_some_work();
std::thread my_thread(do_some_work);


// case 2
class background_task
{
public:
void operator()() const
{
do_something();
do_something_else();
}
};
background_task f;
std::thread my_thread(f);

注意在上述 case 2 中,如果你传入一个临时变量时,编译器可能会将其误解为函数声明

1
2
3
4
5
std::thread my_thread(background_task()); // 可能会误解为函数声明

// 消除歧义的方法
std::thread my_thread( (background_task()) );
std::thread my_thread{background_task()};

也可使用 lambda 表达式改写 case 2:

1
2
3
4
std::thread my_thread([]{
do_something();
do_something_else();
});

等待线程完成

1
my_thread.join();

因为 join() 函数还会清除与线程相关的存储区,因此一个 std::thread 对象只能调用一次 join(),一旦调用了 join(),调用 joinable() 会返回 false

如果在线程启动和调用 join() 之间出现异常,我们该如何保证调用 join() 的执行呢?

  1. 使用 try/catch

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    // Listing 2.2. Waiting for a thread to finish
    struct func
    {
    int& i;
    func(int& i_) : i(i_){}
    void operator() ()
    {
    for (unsigned j = 0; j < 1000000; ++j)
    {
    do_something(i)
    }
    }
    };

    void f()
    {
    int some_local_state = 0;
    func my_func(some_local_state);
    std::thread t(my_func);
    try
    {
    do_something_in_current_thread();
    } catch(...) {
    t.join();
    throw;
    }
    t.join();
    }

  2. 使用 RAII 机制

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    // Listing 2.3. Using RAII to wait for a thread to complete
    class thread_guard
    {
    std::thread& t;
    public:
    explicit thread_guard(std::thread& t_) : t(t_) {}
    ~thread_guard()
    {
    if (t.joinable())
    {
    t.join();
    }
    }
    thread_guard(thread_guard const&) = delete;
    thread_guard& operator=(thread_guard const&) = delete;
    };

    struct func; // See Listing 2.2

    void f()
    {
    int some_local_state = 0;
    func my_func(some_local_state);
    std::thread t(my_func);
    thread_guard g(t);
    do_something_in_current_thread();
    }

分离线程

通过调用 detach() 可让线程在后台运行。可以通过调用 joinable() 判断线程是否可以分离。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// Listing 2.4. Detaching a thread to handle other documents
void edit_document(std::string const& filename)
{
open_document_and_display_gui(filename);
while (!done_editing())
{
user_command cmd = get_user_input();
if (cmd.type == open_new_document)
{
std::string const new_name = get_filename_from_user();
std::thread t(edit_document, new_name);
t.detach();
} else {
process_user_input(cmd);
}
}
}

传递参数

向可调用对象或函数传递参数很简单,只需要将这些参数作为 std::thread 构造函数的附加参数即可。但需要注意的是,在默认情况下,这些参数会被拷贝至新线程的独立内存空间中,以供新线程访问,并如同临时变量一样作为右值传递给可调用对象或函数。即使函数中的参数是引用的形式,拷贝操作也会执行。例如:

1
2
void f(int i, std::string const& s);
std::thread t(f, 3, "heoolo");

代码创建了一个调用 f(3, "hello") 的线程。注意,函数 f 需要一个 std::string 对象作为第二个参数,但这里使用的是字符串的字面值,也就是 char const * 类型。之后,在线程的上下文中完成字面值向 std::string 对象的转化。

提防隐式转换

特别需要注意的是,当指向动态变量的指针作为参数传递给线程的情况,代码如下:

1
2
3
4
5
6
7
8
void f(int i, std::string const& s);
void oops(int some_param)
{
char buffer[1024];
sprintf(buffer, "%i", some_param);
std::thread t(f, 3, buffer);
t.detach();
}

这种情况下,buffer 是一个指针变量,指向局部变量,然后此局部变量通过 buffer 传递到新线程中。此时,函数 oops 很有可能会在 buffer 转换成 std::string 对象之前结束,从而导致一些未定义的行为。因为此时无法保证隐式转换的操作和 std::thread 构造函数的拷贝操作按顺序进行,有可能 std::thread 的构造函数拷贝的是转换前的变量 (buffer 指针),而非字符串。解决方案就是在传递到 std::thread 构造函数之前就将字符数组转化为 std::string 对象:

1
2
3
4
5
6
7
8
9
void f(int i, std::string const& s);
void not_oops(int some_param)
{
char buffer[1024];
sprintf(buffer,"%i",some_param);
// Using std::string avoids dangling pointer
std::thread t(f, 3, std::string(buffer));
t.detach();
}

传递引用

当你尝试使用线程更新一个引用传递的数据结构时也会出现问题:

1
2
3
4
5
6
7
8
9
void update_data_for_widget(widget_id w, widget_data& data);
void oops_again(widget_id w)
{
widget_data data;
std::thread t(update_data_for_widget, w, data);
display_status();
t.join();
process_widget_data(data);
}

在上述代码中,虽然 update_data_for_widget 的第二个参数期待传入一个引用,但是 std::thread 的构造函数并不知晓;其无视 update_data_for_widget 期待的参数类型,并盲目地拷贝提供的变量。内部代码会将拷贝的参数以右值的方式进行传递,这是为了照顾到那些只能进行移动的类型,而后会尝试以右值为实参调用 update_data_for_widget。但因为函数期望的是一个非常量引用作为参数,而非右值,所以会在编译时出错。对于熟悉 std::bind 的开发者来说,问题的解决办法是显而易见的:可以使用 std::ref 将参数转换成引用的形式,因此可将线程的调用改为以下形式:

1
std::thread t(update_data_for_widget, w, std::ref(data));

从而,update_data_for_widget 就会接收到一个 data 变量的引用,而非 data 变量的拷贝副本,这样代码就能顺利的通过编译。

传递成员函数指针

你也可以向线程传递一个成员函数指针,并提供一个合适的对象指针作为函数第一个参数:

1
2
3
4
5
6
7
class X
{
public:
void do_lengthy_work();
};
X my_x;
std::thread t(&X::do_lengthy_work, &my_x);

在这段代码中,新线程将会调用 my_x.do_lengthy_work(),其中 my_x 的地址作为对象指针提供给函数。你也可以为成员函数提供参数:std::thread 构造函数的第三个参数就是成员函数的第一个参数。

传递仅支持 move 的参数

当原对象是一个临时变量时,自动进行 move 操作,但当原对象是一个命名变量,那么传递的时候就需要使用 std::move() 进行显示 move

1
2
3
4
void process_big_object(std::unique_ptr<big_object>);
std::unique_ptr<big_object> p(new big_object);
p->prepare_data(42);
std::thread t(process_big_object, std::move(p));

std::thread 对象其实和 std::unique 类似,也是仅支持 move,不支持 copy 的类型。这说明了在同一时刻,至多仅有一个 std::thread 实例与一个执行中的线程关联,而且允许在不同的实例之间转移线程的所有权。

转移线程所有权

1
2
3
4
5
6
7
8
void some_function();
void some_other_function();
std::thread t1(some_function); // some_function - t1
std::thread t2 = std::move(t1); // some_function - t2
t1 = std::thread(some_other_function); // some_other_function - t1
std::thread t3;
t3 = std::move(t2); // some_function - t3
t1 = std::move(t3); // This assignment will terminate the program

std::thread 对象支持 move 操作,意味着其可以作为函数的返回值。

1
2
3
4
5
6
7
8
9
10
11
12
13
// Listing 2.5. Returning a std::thread from a function
std::thread f()
{
void some_function();
return std::thread(some_function);
}

std::thread g()
{
void some_other_function(int);
std::thread t(some_other_function, 42);
return t;
}

当然,std::thread 类型也可以作为函数的参数。

1
2
3
4
5
6
7
8
9
void f(std::thread t);

void g()
{
void some_function();
f(std::thread(some_function));
std::thread t(some_function);
f(std::move(t));
}

可以利用 move 实现功能类似 thread_guard 的类,与 Listing 2.3 中的 thread_guard 不同的是,scoped_thread 直接转移线程的所有权。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// Listing 2.6. scoped_thread and example usage
class scoped_thread
{
std::thread t;
public:
explict scoped_thread(std""thread t_) : t(std::move(t_))
{
if (!t.joinable())
throw std::logic_error("No thread");
}
~scoped_thread()
{
t.join();
}
scoped_thread(scoped_thread const&) = delete;
scoped_thread& operator= (scoped_thread const&) = delete;
};

struct func; // See Listing 2.2
void f()
{
int some_locate_state;
scoped_thread t{std::thread(func(some_local_state))};
do_something_in_current_thread();
}

如果保存 std::thread 对象的容器是 move-aware 的,则其也支持 move 语义。

1
2
3
4
5
6
7
8
9
10
11
12
13
// Listing 2.8. Spawns some threads and waits for them to finsh
void do_work(unsigned id);

void f()
{
std::vector<std::thread> threads;
for (unsigned i = 0; i < 20; ++i)
{
threads.emplace_back(do_work, i); // Spawns threads
}
for (auto& entry: threads)
entry.join(); // Calls join() on each thread in turn
}

Tips: 此处使用的 emplace_backpush_back 更高效,它仅需要传入对象构造函数所需的参数。 接口定义 && 两者对比

选择所需线程的数量

函数 std::thread::hardware_concurrency() 返回当前可真正并行运行的线程数(不过需要确定当前运行环境是否支持该命令)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// Listing 2.9. A naive parallel version of std::accumulate
template<typename Iterator, typename T>
struct accumulate_block
{
void operator() (Iterator first, Iterator last, T& result)
{
result = std::accumulate(first, last, result);
}
};

template<typename Iterator, typename T>
T parallel_accumulate(Iterator first, Iterator last, T init)
{
unsigned long const length = std::distance(first, last);
if (!length)
return init;
unsigned long const min_per_thread = 25;
unsigned long const max_threads = (length + min_per_thread - 1) / min_per_thread;
unsigned long const hardware_threads = std::thread::hardware_concurrency();
unsigned long const num_threads =
std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);
unsigned long const block_size = length / num_threads;
std::vector<T> results(num_threads);
std::vector<std::thread> threads(num_threads - 1);
Iterator block_start = first;
for (unsigned long i = 0; i < (num_threads - 1); ++i)
{
Iterator block_end = block_start;
std::advance(block_end, block_size);
threads[i] = std::thread(
accumulate_block<Iterator, T>(),
block_start, block_end, std::ref(results[i]) );
block_start = block_end;
}
// current thread process the final block
accumulate_block<Iterator, T>() (block_start, last, results[num_threads - 1]);

for (auto& entry: threads)
entry.join();
return std::accumulate(results.begin(), results.end(), init);
}

线程标识

线程标识的类型是 std::thread::id,可通过种方式获取:

  • 调用 std::thread 对象的成员函数 get_id()
  • 调用 std::this_thread::get_id()
1
2
3
4
5
6
7
8
9
10
std::thread::id master_thread;

void some_core_part_of_algorithm()
{
if (std::this_thread::get_id == master_thread)
{
do_master_thread_work();
}
do_common_work();
}