引子

今次打算做条件变量(condition variable)的介绍,但不会做基础解释,因为基本定义类的概念直接 cppreference 就够了。

同样的道理,我讲述的是面向 C++17 及以后的规范的。

原因在于我一向以为,C++11 固然不错,但实在是太简陋了,一切稍微重要的实作型基础设施,例如读写锁,信号量等等全都没有,而可变参数模板、可变参数展开、折叠表达式等等也没有(或者说存在问题),这导致我浅度尝试了迁移代码后发现同样的形状难看(因为实在是需要太多预处理和各式各样的 hack 手法)后被迫挂起了操作,转投其他语言。相信我,对 TC 有感情的人(暂时)放弃也绝对是很难过的。

直到 C++17 的各种特性固化后,这些情况才得以最终改善(即使是信号量仍旧是没有)。

其实还不算完美,例如匿名函数一直以来真的炒鸡难看,和 Kotlin 比比看就是被秒的下场。不过你忍一下哈。

所以呢,我从 C++17 开始觉得可以重新操练一下。这么多年来 C++ 的演变史,人的审美观也是在主动地、被迫地不断演进。不管什么原因,我现在就是觉得可以从 17 开始。

没有 C++11 的奠基,14,17 都是不可能的,所以我也没有贬低 11 的意思。

此外,何不从 20 开始呢?毕竟 C++23 都已经热闹起来了。我想,还是不必太激进吧。再有就是 20 其实没什么必不可少的特性,不像 17,没有参数包展开以及折叠,一大票类库构型可能都会很难编写,后果是调用者有时候就要忍受不自然的语法。

条件变量

timeline

FROM: https://www.modernescpp.com/images/blog/Cpp17/Overview/timeline.png

条件变量(condition variable)的存在目的是“被唤醒”。

多线程(多例程、或者说多协程)开发中若干消费者等待有 food 喂食了被唤醒,是条件变量的典型的应用场景。

所以条件变量有两种典型的需求:唤醒某个消费者,给它喂一颗食,此时要求随机唤醒;唤醒所有消费者,(让它们自己去抢?)让它们自尽,app 将要 terminated 了。

这是由 std::condition_variablenotify_one()notify_all() 分别达成的。

等待某个条件满足

假设我们需要 100 个线程,并且我们想要知道这些线程都已经就绪了,这一点可以通过 condition_variable 来实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
std::condition_variable cv100;
std::mutex cv100_m;
int count{0};

void test_cv_0() {
  std::vector<std::thread> waits;
  for (int i = 0; i < 100; i++) {
    auto t = std::thread([]() {
      {
        {
          std::unique_lock<std::mutex> lk(cv100_m);
          count++;
        }
        cv100.notify_one();

        // one thread running now
      }

      using namespace std::literals::chrono_literals;
      std::this_thread::sleep_for(10s); //simulate a long stuff
      std::cout << "end of thread" << '\n';
    });
    waits.push_back(std::move(t));
  }

  {
    std::unique_lock<std::mutex> lk(cv100_m);
    cv100.wait(lk, []() { return count >= 100; });
    // all threads ran now.
  }

  // all threads ready, do stuff ...

  // and, waiting for their stops.
  for (auto it = waits.begin(); it != waits.end(); it++) {
    auto &t = (*it);
    t.join();
  }
}

这个模型可以被你改造到你自己的用途。我认为这是我为你制作的一个最佳范本,它能够清晰明了地向你解释清楚 condition variable 有何用处,该怎么用。而大多数的 condition variable 的范例都显得非常的高深,或者难以解释。

显然,每当 notify_one 在被调用时,“生产者”已经投食了(通过 count++),此时等待语句 cv100.wait(lk, []() { return count >= 100; }); 就会活过来一下,发现条件不满足(count>=100),就又睡了。

等到 100 个线程都投过食了,等待语句发现条件满足,那么就向下走。

据此,我们就得到了一个证实,全部线程都已经开始运行了。

生产者消费者模型

而稍稍调整其用法,只有 m 个线程在反复不断地投食,n 个线程都使用等待语句在睡着:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
std::condition_variable cvpc;
std::mutex cvpc_m;
std::deque<int> cvpc_queue;
int seq;

void test_cv_1() {
  std::vector<std::thread> waits;
  for (int i = 0; i < 5; i++) {
    auto t = std::thread([]() {
      for (int j = 0; j < 10; j++) {
        {
          std::unique_lock<std::mutex> lk(cvpc_m);
          seq++;
          cvpc_queue.push_back(seq);
        }
        cvpc.notify_one();

        std::this_thread::yield();
      }
      // printf("#%d: ended.\n", i);
    });
    waits.push_back(std::move(t));
  }

  for (int i = 5; i < 15; i++) {
    auto t = std::thread([i]() {
      while (true) {
        {
          std::unique_lock<std::mutex> lk(cvpc_m);
          if (seq >= 50 && cvpc_queue.empty()) break;
          cv100.wait(lk, []() { return !cvpc_queue.empty(); });
        }
        auto vv = cvpc_queue.front(); // 不那么费力地
        cvpc_queue.pop_front();       // 简化这两句 
        printf("#%d: got %d\n", i, vv);

        std::this_thread::yield();
      }
      // printf("#%d: ended.\n", i);
    });
    waits.push_back(std::move(t));
  }

  for (auto it = waits.begin(); it != waits.end(); it++) {
    auto &t = (*it);
    t.join();
  }
}

这是简易的示范,它忽略了提取数据时的严谨性,为了避免可能的竞争条件,你可能需要改进这两行代码的写法:

1
2
        auto vv = cvpc_queue.front(); // 不那么费力地
        cvpc_queue.pop_front();       // 简化这两句 

不过用于演示目的目前是足够了。

另外,我们没有在这里加入全部线程就绪的条件变量,所以有的线程活动比较早,也因此它的运行结果可能不那么均匀,某些线程在早期会获得更多的唤醒机会并吃到投食。

但这些并不是我们演示的重点。

改进

即使像上面这样简单,我们还是会觉得条件变量显得麻烦,这可能是因为它需要搭配一些别的数据(一颗锁,一个乃至多个变量)才能一起使用吧。

所以我们可以这样做一点点封装:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
namespace hicc::pool {
  template<typename Pred = std::function<bool()>, typename Setter = std::function<void()>>
  class conditional_wait {
    std::condition_variable cv;
    std::mutex m;
    Pred p;
    Setter s;

    public:
    conditional_wait(Pred &&p_, Setter &&s_)
      : p(std::move(p_))
        , s(std::move(s_)) {}
    ~conditional_wait() {}
    conditional_wait(conditional_wait &&) = delete;
    conditional_wait &operator=(conditional_wait &&) = delete;

    public:
    void wait() {
      std::unique_lock<std::mutex> lk(m);
      cv.wait(lk, p);
    }
    void set() {
      {
        std::unique_lock<std::mutex> lk(m);
        s();
      }
      cv.notify_one();
    }
    void set_for_all() {
      {
        std::unique_lock<std::mutex> lk(m);
        s();
      }
      cv.notify_all();
    }
  };
}

这个包装类对条件变量的使用做了一些抽象和封装。但它还只是一个基础类,我们提供了两个进一步的封装,除了能演示 conditional_wait 的用法之外,也提供更简明的编码能力:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
namespace hicc::pool {
  class conditional_wait_for_bool : public conditional_wait<> {
    bool var;

    public:
    conditional_wait_for_bool()
      : conditional_wait([this]() { return _wait(); }, [this]() { _set(); }) {}
    ~conditional_wait_for_bool() { release(); }
    conditional_wait_for_bool(conditional_wait_for_bool &&) = delete;
    conditional_wait_for_bool &operator=(conditional_wait_for_bool &&) = delete;

    private:
    bool _wait() const { return var; }
    void _set() { var = true; }
    void release() {
      //
    }
  };

  class conditional_wait_for_int : public conditional_wait<> {
    int var;
    int max_value;

    public:
    conditional_wait_for_int(int max_value_ = 1)
      : conditional_wait([this]() { return _wait(); }, [this]() { _set(); })
        , max_value(max_value_) {}
    ~conditional_wait_for_int() { release(); }
    conditional_wait_for_int(conditional_wait_for_int &&) = delete;
    conditional_wait_for_int &operator=(conditional_wait_for_int &&) = delete;

    private:
    bool _wait() const { return var >= max_value; }
    void _set() { var++; }
    void release() {
      //
    }
  };
}

你还可以做你自己的简洁封装,它们会非常有用。请看我们下面的示例就知道了,你的代码能够因此而易读的多,无需太多的脑力消耗。

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
void test_cv() {
  {
    hicc::pool::conditional_wait_for_bool cv;
    auto t = std::thread([&cv]() {
      using namespace std::literals::chrono_literals;
      std::this_thread::yield();
      std::this_thread::sleep_for(1s);

      cv.set_for_all();
      std::cout << "end of thread" << '\n';
    });
    std::cout << "wait for cv" << '\n';
    cv.wait();
    std::cout << "end of cv test" << '\n';
    t.join();
  }
  {
    std::vector<std::thread> waits;
    hicc::pool::conditional_wait_for_int cv(3);
    for (int i = 0; i < 3; i++) {
      auto t = std::thread([&cv]() {
        using namespace std::literals::chrono_literals;
        std::this_thread::sleep_for(2s);

        cv.set();
        std::cout << "end of thread" << '\n';
      });
      waits.push_back(std::move(t));
    }
    std::cout << "wait for cv" << '\n';
    cv.wait(); // waiting for all threads ended.

    std::cout << "end of cv test" << '\n';
    for (auto it = waits.begin(); it != waits.end(); it++) {
      auto &t = (*it);
      t.join();
    }
  }
}

注意,第二个子测试只是为了演示的目的,作为生产者消费者模型的一种简易替代。你应该将 conditional_wait_for_int 直接作用于上一小节的示例程序中,从而获得更简明的代码,那样将会有更好的示范效果。但在这里我只是懒惰所以放飞了一下。

小结

所以,上面我们提供的改进方案,能够很好地简化地、清晰化地运用 condition variable,你可以不必分散地申明一组相关变量(通常是 cv, m, ready 等三个以上的变量),而是收纳到一个单一的类中。

对于很多典型的 condition variable,我们提供的预制 conditional_wait_for_boolconditional_wait_for_int 可能就足够用了。

你也能够非常轻易地利用 conditional_wait 做你的定制实现。

关于 hicc-cxx

hicc-cxx 是我的一个实验性项目,我会做不少重整、重建工作,并且借助于 GitHub Actions 进行一些跨平台的试验。如果一个实现、封装或者实用工具达到了足够稳定的程度,通常我会在未来一段时间内将其迁移到 cmdr-cxx 中。

所以我不鼓励你直接使用 hicc,虽然这么做也是行得通的。你应该取用 cmdr-cxx 中的相对应的 stable 版本。如果你未能在 cmdr-cxx 中发现期待的目标,issue 我或者自行抽取都可以。

结束

基本可锁定

除了 std::condition_variable 之外,还有一个条件变量类:std::condition_variable_any。它和前者的区别在于:前者被要求和 std::unique_lock<std::mutex> 联合使用,不可以换用其他手段,而后者则可以在任何满足基本可锁定 (BasicLockable) 要求的锁上工作。也就是说,你可以使用更广泛的其他锁定手段。

所以,什么是基本可锁定呢?

所谓的 BasicLockable 是一种约定。它要求你的类必须实现 lock() 和 unlock() 两个函数。

除了标准库提供的 std::mutex 系列类之外,你可以设计实现自定义类并通过提供 lock/unlock 函数的方式来满足约定。

这么做有何用处?你的类将能够被用来 std::condition_variable_any 中,或者其他要求 BasicLockable 的场所,例如 std::lock_guard

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class L {
public:
    L() {}
    virtual ~L() {}
    void lock() {}
    void unlock() {}
};

void test() {
    {
        L l;
        std::lock_guard<L> lk(l);
    }
}

对于做并发同步类库的朋友来说,它将会是一个有力的能力。

注意到我们的定制类 L 并没有任何派生类,没有任何合约声明,直接就能满足 std::lock_guard 的使用需要。这一点,Java 是做不到的,与其等价的能力是 Golang 的 interface,同样是提供了一种解耦的、满足合约的类库实现方法。在 C++ 的模板元编程中,很多时候这是一种被称为 Policy 的技法。

duck type 解耦并不是 Policy 专有提供的能力,它是由模板技术提供的基础能力,不过,policy 范式往往将这一特点呈现的最好。

下一篇文章将会对此做进一步的延伸,尽情期待。

:end:

这里是真的 END