好长时间没有更新博客了。我最近读了 Implementing simple cooperative threads in C 这篇文章。它说的 cooperative threads,实际上就是每一个“线程”或者说控制流,可以主动的让出 CPU 的使用权,来达成某种意义上的“合作”。这样的行为似乎很接近纤程/用户态线程,所以我在这里姑且翻译成了“纤程”。读完之后,我用 C++ 又重新实现了一下。很久没有写 C++ 了,写得磕磕绊绊的,源代码在这里

接口

这篇文章描述的实现思路很简单,它尝试用一个数据结构去描述一个任务执行的上下文,并使用一个队列来维护没有完成的任务。任务以及调度器的接口如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class TaskHolder {
private:
// 实际要执行的任务代码
void (*_task)(Scheduler* scheduler);

public:
TaskHolder(void (*task)(Scheduler* scheduler));

// 实际触发任务执行的代码
virtual void run(Scheduler* scheduler) { _task(scheduler); }
};

class Scheduler {
protected:
// 获取正在执行的任务
virtual TaskHolder* get_current_task();

// 设置正在执行的任务
virtual void set_current_task(TaskHolder* task_holder);

// 将当前任务移除队列
virtual void exit_current_task();

public:
// 添加任务
virtual void add_task(void (*task)(Scheduler* scheduler));

// 任务主动让出 CPU, 由任务代码调用.
virtual void yield() = 0;

// 调度器开始执行任务
virtual void run() = 0;
}

只看上面的接口可能有点抽象,下面给出一个实际的使用例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
scheduler::SequentialScheduler scheduler{};
// 添加两个任务
scheduler.add_task([](scheduler::Scheduler *scheduler) {
for (int i = 0; i < 2; i++) {
std::cout << "Task 1: " << i << std::endl;
// 让出 CPU 使用权
scheduler->yield();
}
});
scheduler.add_task([](scheduler::Scheduler *scheduler) {
for (int i = 0; i < 4; i++) {
std::cout << "Task 2: " << i << std::endl;
// 让出 CPU 使用权
scheduler->yield();
}
});
// 开始执行任务, 当所有任务执行完毕后, run方法返回
scheduler.run();
std::cout << "Finish" << std::endl;

上述代码的输出是:

1
2
3
4
5
6
7
Task 1: 0
Task 2: 0
Task 1: 1
Task 2: 1
Task 2: 2
Task 2: 3
Finish

实现原理

和原文的实现一样,我的实现包含了两个核心部分:

  1. 使用setjmplongjmp来实现控制流的转移,保留上下文,即所有寄存器的值。
  2. 自行维护每个任务的栈内存,保留栈空间。

setjmplongjmp

setjmplongjmp 提供了类似汇编中 jmp 指令的功能. setjmp可以将当前指令的内存地址存储一个数据结构中(jmp_buf),而 longjmp 可以返回 jmp_buf 指定的地址继续执行。下面我们看一个例子。

1
2
3
4
5
6
7
8
9
10
jmp_buf target;

auto value = setjmp(target);
if (value) {
std::cout << "Someone jumps here! With a value " << value << std::endl;
} else {
std::cout << "Set up a place for jumping" << std::endl;
}

longjmp(target, 100);

上面这段代码会进入一个死循环,在输出一次 Set up a place for jumping 后会不断地输出 Someone jumps here! With a value 100。之所以有这样行为的原因主要是:

  1. longjmp 会跳回 setjmp 发生的位置重新执行。
  2. setjmp 实际调用会返回 0,如果由 longjmp 调用则会返回 longjmp 的第二个参数值。

手动维护栈内存

大家的知道,在执行代码的过程中,为了保证每个线程才能互不干扰的执行自己的逻辑,它们需要有独立的栈空间,操作系统会保证这一点。在我们现在的实现中,每个任务也需要有独立的栈内存。没有了操作系统的帮助,我们需要自己手动分配内存,并将其指定为栈内存。核心的实现代码如下:

1
2
3
4
5
6
// 分配栈内存
task->_stack_bottom = malloc(stack_size);
// 计算栈顶地址
task->_stack_top = task->_stack_bottom + stack_size;
// 使用汇编将 rsp 寄存器的值设置为我们分配的栈顶地址
asm volatile("mov %[rs], %%rsp \n" : [ rs ] "+r"(task->_stack_top)::)

总结

实现代码见 Github,这里就不再赘述了。

可能的扩展

Github 上给出了 SequentialScheduler 的实现,是一个单线程的版本。我感觉可以比较简单的扩展成可并发的版本。有兴趣的朋友可以试一下。

为什么用纤程这个词

我最开始在纤程和协程中犹豫了一下。

根据我个人的理解,协程之间应该有显式的控制流切换。而在我们的实现中,只有 task 和 Scheduler 之间存在控制流切换,因此我觉得协程不太恰当。

我们的实现,某种程度上似乎很像 event loop。最大的区别在于,我们并不需要一个事件来决定哪一个任务是可调度的。我的实现中,每一个任务都是可以调度的。

如果我们扩展成了可并发的版本,也许使用纤程就比较恰当了。