2021 0724:
2021 0824:
2021 0901:
这是遗忘了很久的一篇,一直没有发出来。
今天我的 RSS 表里面出现了一篇写 C++11 线程池的,使得我觉得要清理一下,就找出来,重写了一遍,发了。
说是重写,其实八成都是去年的想法,也不打算梳理自己这方面的思考了,就这么着吧——回头审视自己以前的(我私人的电脑里还有很多年前我的旧网站的截图)页面,文字什么的,感觉也很有意思。
2022 0808:
拖了很久(算起来竟然有两三年了),最终还是决定把线程池这篇发出来了。
一直不想发的原因是,这个科目其实没什么个人创见,大抵就是综合综合再综合。
但是放在 _draft 里面着实碍眼,又觉得丢弃了很不环保。那就发出来吧,拉低水准有何不可。
提前的话
几近三年疫情以来,没有健康码,没有核酸检测过,没有打疫苗过,没被隔离过,没有时空重叠过,没有弹窗过,没有被封小区过的人,该是多么完美干净的一个人啊。
一夜难眠之后,
听我说,从弹窗人士回归绿色,人是一定会成长的。
听我说,是男人也无所谓,此时此刻绿了就绿了。
缘起
基本上说,这次介绍的线程池(Thread Pool)是面向 C++17 的,但是下降到 C++11 也没有什么不可逾越的新设施。
这个实现是受到一个外部实现的激发(应该是源于某几个相关的 Stackoverflow posts),而后做出来用了段时间感觉不错,并且进行了各种修补后形成的最终版本。然后现在是在 hicc-cxx 中发酵,这是一个目标为实验性的项目,而稳定后的版本则会移入到 cmdr-cxx 中。现在可以直接在 cmdr-cxx 中取用它。
cmdr-cxx 是一个核心上提供命令行参数解析的基本库,同时也提供一个层级型的配置参数管理器,从而非常适合被用作一个 CLI 项目的基本构架。除了对其进行例行维护之外,目前我是在将一些项目开发所需的基础设施填充到 cmdr-cxx 中,例如包装过的 path/filesystem,process,mmap 等等,当然还有 thread_pool 等。这些设施的共性是跨平台(至少是 darwin/linux/windows),所以这是个有用处的 repo。
基本知识
-
launch:包括
std::launch::async
和std::launch::deferred
两个枚举值 - std::thread:执行一个线程
- async:异步运行给定的函数 f
- future:用于访问异步函数 f 的返回值
- packaged_task:用于包装一个 Callable 对象
- promise:我不怎么喜爱用
- threaded message queue
- thread pool
从 C++11 起,标准库提供了一组线程方面的跨平台原语,包括 std::thread,async,launch,future,packaged_task,promise 等等。完整的列表可以在 这里 查询。
下面对其中的主要原语作一个概要的介绍。
std::thread
thread 的概念很容易理解。你可以启动一个线程,然后 join 它以便等待其执行结束。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#include <iostream>
#include <cmath>
#include <thread>
#include <future>
#include <functional>
void task_thread()
{
std::packaged_task<int(int,int)> task(f);
std::future<int> result = task.get_future();
std::thread task_td(std::move(task), 2, 10);
task_td.join();
std::cout << "task_thread:\t" << result.get() << '\n';
}
int main()
{
task_thread();
}
packaged_task 和 future
参见上一节的代码示例。
packaged_task 采用 std::function 的模板塑形技术,让你可以将函数签名规范化。例如 std::packaged_task<int(int,int)> task(f)
声明了一个异步函数对象 task
,它要求你给出的 f
必须满足函数签名 int(int,int)
。
以后,你可以借助 std::thread 的完美转发能力将异步函数 f 所需的入参传入。例如 std::thread task_td(std::move(task), 2, 10);
为了访问异步函数 f 的返回值(一个 int
),我们需要 future 的包装:std::future<int> result = task.get_future();
。当 task_td.join()
返回控制权时,f 执行结束了,所以返回值也就可用了,所以现在使用 result.get()
可以提取到返回值。
async
async 是 std::thread 的升级版。它允许提供 launch 枚举入参,所以能够控制异步启动的方式(新线程,或是当前线程中延后执行)。
一般来说,你只需要声明一个静态成员函数,将它交给 async 就可以了,如同这样:
1
auto a1 = std::async(&X::foo, &x, 42, "Hello");
一个完整的示例如下,
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
#include <iostream>
#include <vector>
#include <algorithm>
#include <numeric>
#include <future>
#include <string>
#include <mutex>
std::mutex m;
struct X {
void foo(int i, const std::string& str) {
std::lock_guard<std::mutex> lk(m);
std::cout << str << ' ' << i << '\n';
}
void bar(const std::string& str) {
std::lock_guard<std::mutex> lk(m);
std::cout << str << '\n';
}
int operator()(int i) {
std::lock_guard<std::mutex> lk(m);
std::cout << i << '\n';
return i + 10;
}
};
template <typename RandomIt>
int parallel_sum(RandomIt beg, RandomIt end)
{
auto len = end - beg;
if (len < 1000)
return std::accumulate(beg, end, 0);
RandomIt mid = beg + len/2;
auto handle = std::async(std::launch::async,
parallel_sum<RandomIt>, mid, end);
(void)handle;
int sum = parallel_sum(beg, mid);
return sum + handle.get();
}
int main()
{
std::vector<int> v(10000, 1);
std::cout << "The sum is " << parallel_sum(v.begin(), v.end()) << '\n';
X x;
// Calls (&x)->foo(42, "Hello") with default policy:
// may print "Hello 42" concurrently or defer execution
auto a1 = std::async(&X::foo, &x, 42, "Hello");
// Calls x.bar("world!") with deferred policy
// prints "world!" when a2.get() or a2.wait() is called
auto a2 = std::async(std::launch::deferred, &X::bar, x, "world!");
// Calls X()(43); with async policy
// prints "43" concurrently
auto a3 = std::async(std::launch::async, X(), 43);
a2.wait(); // prints "world!"
std::cout << a3.get() << '\n'; // prints "53"
} // if a1 is not done at this point, destructor of a1 prints "Hello 42" here
async 实际上是将 std::thread 的使用方法改造了下。
小小结
有了 async,packaged_task,future 的基本概念,下面可以介绍我们最终的线程池代码了。
主要实现
下文中为示意性的代码展示,如欲查看在运行的真实代码,请前往 cmdr-cxx 中检视 cmdr-pool.hh 文件。
一个线程池 thread_pool
包含两个主要接口:将异步函数放入池中,以及调度其进入运行态。
我们将要实现的线程池的用意在于在 CPU 占用率上进行控制和平衡,既要尽可能充分利用 CPU 算力,又不要无谓地消耗额外资源在线程的上下文切换上。所以我们建立 CPU 核心数相同的 workers,然后将任务不断地送给这些 workers 以便调度执行,从而达到预期的目的。
queue_task
queue_task
的关键代码是这样:
1
2
3
4
5
6
7
8
9
10
11
template<class F, class R = std::result_of_t<F &()>>
std::future<R> queue_task(F &&task) {
auto p = std::packaged_task<R()>(std::forward<F>(task));
// std::packaged_task<R()> p(std::move(task));
auto r = p.get_future();
// _tasks.push_back(std::move(p));
_tasks.emplace_back(std::move(p));
pool_debug("queue_task.");
std::this_thread::yield();
return r;
}
借助于 std::function 的配套工具 std::result_of_t 我们能够将异步函数的返回值类型抽出来,这在元编程中是一个关键性工具。
然后就乏善可陈了,无非是建立 packaged_task 对象及其 future 对象,然后放入成员变量 _tasks
中。
std::this_thread::yield()
的目的在于发出一个信号,令当前线程(正在添加任务到线程池中的线程)暂时休眠一下,释放出 CPU 控制。这样可以让其他线程有机会被调度运行。在我们添加任务到线程池的过程中,这么打断一下,潜在的用意是让刚刚被放入的任务能够有机会被调度执行(只要线程池还比较空闲)。
但是也并不是一定非要执行不可。在这里并没有硬性的约定,这个调用是懒散式的:就是说,喂,要是得空的话,就 run 一下呗。
start_thread
start_thread
在 thread_pool
构造函数中被启动。
它的任务是负责核心的调度算法。
我们的线程池有一个运行任务阈值 N,你可以通过 cmdr::pool::thread_pool(int n = 1)
来传入该阈值,其默认值为 CPU 的核心数量。如果你在多 CPU 环境中使用,那么你可能会需要显式指定 N 以便获得更多并行任务管线。
start_thread
会一次性启动 N 个线程,每个线程都试图从 _tasks 中取得一个任务,并将该任务调度运行。这就达到了线程池约束并行任务数的目的。
所以这个线程池是不能动态修改阈值 N 的。
只要我们解决了同步、竞争、锁的最小化等几个相关问题,这个线程池就可用了。
当前的骨干代码是这样的:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void start_thread(std::size_t n = 1) {
while (n-- > 0) {
_threads.push_back(
std::async(std::launch::async, [&] {
cw_setter cws(_future_ended);
while (auto task = _tasks.pop_front()) {
pool_debug("got_task.");
++_active;
try {
(*task)();
} catch (...) {
--_active;
throw;
}
--_active;
}
}));
}
}
cw_setter
是一个条件变量的包装类,利用 RAII 来设定 _future_ended
,该标志跟踪线程池中的每个工作线程的结束状态。全部工作线程都退出的时候,就表示着 thread_pool
已经 shutdown 完毕了。
_active
是原子的,无需担心其竞态问题。
_threads
是工作线程的容器。
_tasks
是待调度任务的容器。它是一个由包装类 threaded_message_queue
负责管理的数组,默认时使用底层容器 std::deque
,但我们尚未提供修改底层容器类的终端模板参数入口。
1
2
std::vector<std::future<void>> _threads{}; // fixed, running pool
mutable threaded_message_queue<std::packaged_task<void()>> _tasks{}; // the futures
包装类 threaded_message_queue
解决了并行环境中队列的竞态问题,如果感兴趣可以前去源码中查看。
完整的源码在 GitHub 中可以找到。
auto task = _tasks.pop_front()
以原子的方式从队尾取出一个 task,然后执行该任务,然后再度尝试出列 _tasks。这个过程会在调用 pop_front 时阻塞,从而释放 CPU 控制权。
队列化任务流
由于 threaded_message_queue
具有队列的编程接口,所以 thread_pool
是不断从队列中取用,同时队列也不断缩减的。
这是我们的 thread_pool
的关键用法。
你将会不断地压入新任务,这些任务最终会被调度运行,然后被丢弃。
了解上述事实非常重要。
小小结
是否可以采用别的调度方案?
当然可以。
这是你的任务了。
结束
完整的源码在 GitHub 中可以找到。我们已经提到过,这些代码都会在 hicc-cxx 中发酵,稳定之后即迁移到 cmdr-cxx 中。
所以尽管文章一直没有发,但是代码是早已发布了。
如果你愿意在自己的项目中以 cmdr-cxx 提供的命令行参数处理(类似于 getopt)以及层级化配置选项管理器为基础框架的话,那么你也可以使用到那些已经发酵的工具类,例如 mmap,thread_pool 等等。
留下评论