《C++ Concurrentcy in Action 2nd》 第二章阅读笔记。


基础线程管理

启动线程

使用 C++ 线程库启动线程,可以归结为构造 std::thread 对象,例如:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
// case 1
void do_some_work();
std::thread my_thread(do_some_work);


// case 2
class background_task
{
public:
    void operator()() const
    {
        do_something();
        do_something_else();
    }
};
background_task f;
std::thread my_thread(f);

注意在上述 case 2 中,如果你传入一个临时变量时,编译器可能会将其误解为函数声明

1
2
3
4
5
std::thread my_thread(background_task()); // 可能会误解为函数声明

// 消除歧义的方法
std::thread my_thread( (background_task()) ); 
std::thread my_thread{background_task()};

也可使用 lambda 表达式改写 case 2:

1
2
3
4
std::thread my_thread([]{
    do_something();
    do_something_else();
});

等待线程完成

1
my_thread.join();

因为 join() 函数还会清除与线程相关的存储区,因此一个 std::thread 对象只能调用一次 join(),一旦调用了 join(),调用 joinable() 会返回 false

如果在线程启动和调用 join() 之间出现异常,我们该如何保证调用 join() 的执行呢?

  1. 使用 try/catch

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    
    // Listing 2.2. Waiting for a thread to finish
    struct func
    {
        int& i;
        func(int& i_) : i(i_){}
        void operator() ()
        {
            for (unsigned j = 0; j < 1000000; ++j)
            {
                do_something(i)
            }
        }
    };
    
    void f()
    {
        int some_local_state = 0;
        func my_func(some_local_state);
        std::thread t(my_func);
        try
        {
            do_something_in_current_thread();
        } catch(...) {
            t.join();
            throw;
        }
        t.join();
    }
    
  2. 使用 RAII 机制

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    
    // Listing 2.3. Using RAII to wait for a thread to complete
    class thread_guard
    {
        std::thread& t;
    public:
        explicit thread_guard(std::thread& t_) : t(t_) {}
        ~thread_guard()
        {
            if (t.joinable())
            {
                t.join();
            }
        }
        thread_guard(thread_guard const&) = delete;
        thread_guard& operator=(thread_guard const&) = delete;
    };
    
    struct func; // See Listing 2.2
    
    void f()
    {
        int some_local_state = 0;
        func my_func(some_local_state); 
        std::thread t(my_func);
        thread_guard g(t);
        do_something_in_current_thread();
    }
    

分离线程

通过调用 detach() 可让线程在后台运行。可以通过调用 joinable() 判断线程是否可以分离。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
// Listing 2.4. Detaching a thread to handle other documents
void edit_document(std::string const& filename)
{
    open_document_and_display_gui(filename);
    while (!done_editing())
    {
        user_command cmd = get_user_input();
        if (cmd.type == open_new_document)
        {
            std::string const new_name = get_filename_from_user();
            std::thread t(edit_document, new_name);
            t.detach();
        } else {
            process_user_input(cmd);
        }
    }
}

传递参数

向可调用对象或函数传递参数很简单,只需要将这些参数作为 std::thread 构造函数的附加参数即可。但需要注意的是,在默认情况下,这些参数会被拷贝至新线程的独立内存空间中,以供新线程访问,并如同临时变量一样作为右值传递给可调用对象或函数。即使函数中的参数是引用的形式,拷贝操作也会执行。例如:

1
2
void f(int i, std::string const& s);
std::thread t(f, 3, "heoolo");

代码创建了一个调用 f(3, "hello") 的线程。注意,函数 f 需要一个 std::string 对象作为第二个参数,但这里使用的是字符串的字面值,也就是 char const * 类型。之后,在线程的上下文中完成字面值向 std::string 对象的转化。

提防隐式转换

特别需要注意的是,当指向动态变量的指针作为参数传递给线程的情况,代码如下:

1
2
3
4
5
6
7
8
void f(int i, std::string const& s);
void oops(int some_param)
{
    char buffer[1024]; 
    sprintf(buffer, "%i", some_param);
    std::thread t(f, 3, buffer); 
    t.detach();
}

这种情况下,buffer 是一个指针变量,指向局部变量,然后此局部变量通过 buffer 传递到新线程中。此时,函数 oops 很有可能会在 buffer 转换成 std::string 对象之前结束,从而导致一些未定义的行为。因为此时无法保证隐式转换的操作和 std::thread 构造函数的拷贝操作按顺序进行,有可能 std::thread 的构造函数拷贝的是转换前的变量 (buffer 指针),而非字符串。解决方案就是在传递到 std::thread 构造函数之前就将字符数组转化为 std::string 对象:

1
2
3
4
5
6
7
8
9
void f(int i, std::string const& s);
void not_oops(int some_param)
{
    char buffer[1024];
    sprintf(buffer,"%i",some_param);
    // Using std::string avoids dangling pointer
    std::thread t(f, 3, std::string(buffer)); 
    t.detach();
}

传递引用

当你尝试使用线程更新一个引用传递的数据结构时也会出现问题:

1
2
3
4
5
6
7
8
9
void update_data_for_widget(widget_id w, widget_data& data);
void oops_again(widget_id w)
{
    widget_data data;
    std::thread t(update_data_for_widget, w, data);
    display_status();
    t.join();
    process_widget_data(data);
}

在上述代码中,虽然 update_data_for_widget 的第二个参数期待传入一个引用,但是 std::thread 的构造函数并不知晓;其无视 update_data_for_widget 期待的参数类型,并盲目地拷贝提供的变量。内部代码会将拷贝的参数以右值的方式进行传递,这是为了照顾到那些只能进行移动的类型,而后会尝试以右值为实参调用 update_data_for_widget。但因为函数期望的是一个非常量引用作为参数,而非右值,所以会在编译时出错。对于熟悉 std::bind 的开发者来说,问题的解决办法是显而易见的:可以使用 std::ref 将参数转换成引用的形式,因此可将线程的调用改为以下形式:

1
std::thread t(update_data_for_widget, w, std::ref(data));

从而,update_data_for_widget 就会接收到一个 data 变量的引用,而非 data 变量的拷贝副本,这样代码就能顺利的通过编译。

传递成员函数指针

你也可以向线程传递一个成员函数指针,并提供一个合适的对象指针作为函数第一个参数:

1
2
3
4
5
6
7
class X
{
public:
    void do_lengthy_work();
};
X my_x;
std::thread t(&X::do_lengthy_work, &my_x);

在这段代码中,新线程将会调用 my_x.do_lengthy_work(),其中 my_x 的地址作为对象指针提供给函数。你也可以为成员函数提供参数:std::thread 构造函数的第三个参数就是成员函数的第一个参数。

传递仅支持 move 的参数

当原对象是一个临时变量时,自动进行 move 操作,但当原对象是一个命名变量,那么传递的时候就需要使用 std::move() 进行显示 move

1
2
3
4
void process_big_object(std::unique_ptr<big_object>);
std::unique_ptr<big_object> p(new big_object);
p->prepare_data(42);
std::thread t(process_big_object, std::move(p));

std::thread 对象其实和 std::unique 类似,也是仅支持 move,不支持 copy 的类型。这说明了在同一时刻,至多仅有一个 std::thread 实例与一个执行中的线程关联,而且允许在不同的实例之间转移线程的所有权。

转移线程所有权

1
2
3
4
5
6
7
8
void some_function();
void some_other_function();
std::thread t1(some_function); // some_function - t1
std::thread t2 = std::move(t1); // some_function - t2
t1 = std::thread(some_other_function); // some_other_function - t1
std::thread t3;
t3 = std::move(t2); // some_function - t3
t1 = std::move(t3); // This assignment will terminate the program

std::thread 对象支持 move 操作,意味着其可以作为函数的返回值。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// Listing 2.5. Returning a std::thread from a function
std::thread f()
{
    void some_function();
    return std::thread(some_function);
}

std::thread g()
{
    void some_other_function(int);
    std::thread t(some_other_function, 42);
    return t;
}

当然,std::thread 类型也可以作为函数的参数。

1
2
3
4
5
6
7
8
9
void f(std::thread t);

void g()
{
    void some_function();
    f(std::thread(some_function));
    std::thread t(some_function);
    f(std::move(t));
}

可以利用 move 实现功能类似 thread_guard 的类,与 Listing 2.3 中的 thread_guard 不同的是,scoped_thread 直接转移线程的所有权。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// Listing 2.6. scoped_thread and example usage
class scoped_thread
{
    std::thread t;
public:
    explict scoped_thread(std""thread t_) : t(std::move(t_))
    {
        if (!t.joinable())
            throw std::logic_error("No thread");
    }
    ~scoped_thread()
    {
        t.join();
    }
    scoped_thread(scoped_thread const&) = delete;
    scoped_thread& operator= (scoped_thread const&) = delete;
};

struct func; // See Listing 2.2
void f()
{
    int some_locate_state;
    scoped_thread t{std::thread(func(some_local_state))};
    do_something_in_current_thread();
}

如果保存 std::thread 对象的容器是 move-aware 的,则其也支持 move 语义。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// Listing 2.8. Spawns some threads and waits for them to finsh
void do_work(unsigned id);

void f()
{
    std::vector<std::thread> threads;
    for (unsigned i = 0; i < 20; ++i)
    {
        threads.emplace_back(do_work, i);   // Spawns threads
    }
    for (auto& entry: threads)
        entry.join();   // Calls join() on each thread in turn
}

Tips: 此处使用的 emplace_backpush_back 更高效,它仅需要传入对象构造函数所需的参数。 接口定义 && 两者对比

选择所需线程的数量

函数 std:🧵:hardware_concurrency() 返回当前可真正并行运行的线程数(不过需要确定当前运行环境是否支持该命令)。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// Listing 2.9. A naive parallel version of std::accumulate
template<typename Iterator, typename T>
struct accumulate_block
{
    void operator() (Iterator first, Iterator last, T& result)
    {
        result = std::accumulate(first, last, result);
    }
};

template<typename Iterator, typename T>
T parallel_accumulate(Iterator first, Iterator last, T init)
{
    unsigned long const length = std::distance(first, last);
    if (!length)
        return init;
    unsigned long const min_per_thread = 25;
    unsigned long const max_threads = (length + min_per_thread - 1) / min_per_thread;
    unsigned long const hardware_threads = std:🧵:hardware_concurrency();
    unsigned long const num_threads = 
            std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);
    unsigned long const block_size = length / num_threads;
    std::vector<T> results(num_threads);
    std::vector<std::thread> threads(num_threads - 1);
    Iterator block_start = first;
    for (unsigned long i = 0; i < (num_threads - 1); ++i)
    {
        Iterator block_end = block_start;
        std::advance(block_end, block_size);
        threads[i] = std::thread(
            accumulate_block<Iterator, T>(),
            block_start, block_end, std::ref(results[i]) );
        block_start = block_end;
    }
    // current thread process the final block
    accumulate_block<Iterator, T>() (block_start, last, results[num_threads - 1]);

    for (auto& entry: threads)
        entry.join();
    return std::accumulate(results.begin(), results.end(), init);
}

线程标识

线程标识的类型是 std:🧵:id,可通过种方式获取:

  • 调用 std::thread 对象的成员函数 get_id()
  • 调用 std::this_thread::get_id()
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
std:🧵:id master_thread;

void some_core_part_of_algorithm()
{
    if (std::this_thread::get_id == master_thread)
    {
        do_master_thread_work();
    }
    do_common_work();
}