协程的简单实现(C/C++)
本文介绍Linux下用C/C++实现简单的协程的方法
协程有数种不同的实现方式,这里尝试最简单的一种方法。
参考:
https://en.wikipedia.org/wiki/Coroutine 协程介绍
https://linux.die.net/man/3/getcontext 函数文档
https://linux.die.net/man/3/makecontext 函数文档
https://code.woboq.org/userspace/glibc/sysdeps/unix/sysv/linux/x86/sys/ucontext.h.html linux源码
https://zhuanlan.zhihu.com/p/52061644
https://github.com/tonbit/coroutine
https://github.com/Winnerhust/uthread
https://github.com/cloudwu/coroutine
搜索引擎能搜到不少资料。其他在此不一一列举。
- 重要的概念:调用栈上下文
- 两个重要数据:mcontext_t,ucontext_t
- 四个系统级函数:getcontext,setcontext,makecontext,swapcontext
主要就是用这几个工具,在调用栈层面修改相关寄存器的内容,来实现代码的执行流程(上下文)控制(切换、跳转),从而实现简单的协程。函数为系统级库函数,多数用汇编实现。
看一下数据结构:
/* Userlevel context. */
typedef struct ucontext_t
{
unsigned long int __ctx(uc_flags);
struct ucontext_t *uc_link;
stack_t uc_stack; //signal stack
mcontext_t uc_mcontext;
sigset_t uc_sigmask;
struct _libc_fpstate __fpregs_mem;
__extension__ unsigned long long int __ssp[4];
} ucontext_t;
mcontext_t uc_mcontext
描述cpu的状态。包含相关寄存器的数据。
uc_link
指针,指向下一个上下文(当前上下文结束后执行)。
stack_t uc_stack
signal stack信号栈
sigset_t uc_sigmask
信号mask
再看两个函数
int getcontext(ucontext_t *ucp)
把当前上下文存入ucp,成功返回0。
当后续切换到这个ucp时仿佛从此处再次返回,并往下执行。
int setcontext(const ucontext_t *ucp)
将当前上下文替换为ucp,即“切换”协程,不返回。
看个小例子感受一下:
#include <iostream>
#include <ucontext.h>
#include <chrono>
#include <thread>
int main()
{
int count = 0;
ucontext_t context;
getcontext(&context);
std::cout<<"gg "<<count<<std::endl; // c++ 打印
++ count;
std::this_thread::sleep_for(std::chrono::seconds(1));
setcontext(&context); // 跳转到上面的getcontext处继续执行
return 0;
}
输出为
gg 0
gg 1
gg 2
gg 3
gg 4
gg 5
….
无限循环
注意count的值不会归零,只是执行点瞬间跳转,不相关的值并不会发生回退。
再看另两个函数
void makecontext(ucontext_t *ucp, void (*func)(), int argc, ...);
getcontext获取了上下文后,可以对其进行人为修改。func为此ucp被激活时调用的函数,即一般所说的协程。
此前得给这个ucp的uc_stack干净的空间。uc_link也得指定func结束之后要切换的目的上下文,如果设为NULL,那就不再切换,流程随func直接返回。
int swapcontext(ucontext_t *oucp, ucontext_t *ucp);
把当前上下文存入oucp,然后切换到ucp。
那么现在已经可以实现一个简单的协程系统。
先有一个Coroutine类。
包含每个协程自己的状态,上下文等。再有一个调度者Hub类。
Hub管理所有协程的数据、状态,并进行调度。Hub会保存自己的上下文,然后每次协程切换都会先回到Hub的上下文,Hub再进行调度。
Hub::make()
make创建或者说是注册一个协程,用户端把自己的函数传给make,等待执行即可。Hub::yield()
yield切换(交出控制权),返回hub继续调度执行其他协程。
这里说明一下。单纯的协程切换并没有什么用,需要配合事件系统(也可以说异步机制)来达到不浪费cpu的目的。举例:现在要从socket读数据,数据由远程机器的数据库产生,可能需要10秒,那么这10秒阻塞就浪费了自己的cpu。
如果借助某个事件系统比如libev来起一个事件,先切换出去干别的事,数据到了以后libev会发通知,那时再切换回读socket这个点继续执行代码。这样就不会浪费cpu。
可参考gevent等。这里不作深入,只实现基本的协程。调度。这里没有任何算法,没有优化,就是从当前位置无脑往后轮询,保证每个协程都能轮到、不出现卡死即可。
实用情况应参考或发明各种调度方法。
最终代码和测试结果:
cr.h
// xc 20200129 coroutine cr.h
#include <iostream>
#include <ucontext.h>
#include <vector>
#include <chrono>
#include <cassert>
// EMPTY-可创建新协程 NEW-新的协程 RUNNING-运行中 SUSPEND-挂起 DONE-完成
enum CoroutineStatus {EMPTY, NEW, RUNNING, SUSPEND, DONE};
const int STACK_SIZE = 1024 * 32; // 每个协程的栈大小
const int MAX_COROUTINE_COUNT = 5; // 协程总数限制
class Coroutine // 协程类。包含状态、上下文、函数、各种标志位。
{
public:
int id;
CoroutineStatus status;
ucontext_t context; // 上下文
char stack[STACK_SIZE]; // 栈空间。直接用数组,不折腾动态申请。
void (*func)(void); // 要执行的函数。为了简便,不处理参数了。
int yielded; // 区分协程是1.函数自然执行完毕,还是2.切换出来。如果是执行完毕,状态改为EMPTY腾出位置。
Coroutine()
{
status = EMPTY; // 默认EMPTY,该位置可创建新协程。
}
};
class Hub // hub类。即调度者,每次协程切换都会回到hub的context进行调度。
{
private:
ucontext_t hub_context; // hub的上下文。每次协程切换都会回这里进行调度。
public:
int max_size; // 协程总数限制
int current_index; // 当前位置
int coroutine_count; // 当前协程数
std::vector<Coroutine> coroutine_vector; // 协程vector。简化为vector,一次创建,不动态申请。
public:
Hub()
{
coroutine_count = 0;
current_index = -1;
max_size = MAX_COROUTINE_COUNT;
coroutine_vector = std::vector<Coroutine>(MAX_COROUTINE_COUNT); // 直接创建所有。不折腾动态申请内存。
}
int print_status() // 打印系统当前状态
{
std::cout<<"-- coroutine_count = "<<this->coroutine_count<<"/"<<this->max_size<<std::endl;
std::cout<<"-- current coroutine id = "<<this->current_index<<std::endl;
}
int make(void (*func)(void)) // 创建协程
{
if(coroutine_count >= max_size) // 超出限定数量
return 1;
// 找一个EMPTY的位置
int pos = -1;
auto coroutine_vector_size = coroutine_vector.size();
for(int i = current_index + 1; i < coroutine_vector_size; ++ i) // 往后找
{
if(EMPTY == coroutine_vector[i].status)
{
pos = i;
break;
}
}
if(pos == -1) // 没找到。再从头找。
{
for(int i = 0; i < current_index; ++ i)
{
if(EMPTY == coroutine_vector[i].status)
{
pos = i;
break;
}
}
}
assert(pos != -1);
coroutine_vector[pos].id = pos;
coroutine_vector[pos].status = NEW; // 标志为新协程
coroutine_vector[pos].func = func;
++ coroutine_count; // 计数+1
return 0;
}
int schedule() // 调度
{
// 选出要执行的协程。NEW或SUSPEND状态。
int pos = -1;
auto coroutine_vector_size = coroutine_vector.size();
for(int i = current_index + 1; i < coroutine_vector_size; ++ i) // 往后找
{
if(NEW == coroutine_vector[i].status || SUSPEND == coroutine_vector[i].status)
{
pos = i;
break;
}
}
if(pos == -1) // 从头找
{
for(int i = 0; i < current_index; ++ i)
{
if(NEW == coroutine_vector[i].status || SUSPEND == coroutine_vector[i].status)
{
pos = i;
break;
}
}
}
if(pos == -1) // 没找到可执行的
{
if(coroutine_count == 1 && coroutine_vector[current_index].yielded == 0)
{
return 1; // 如果只存在一个协程,那么就是当前的协程,并且yielded == 0,代表函数正常结束,那么代表所有协程都执行完毕。
}
std::cout<<"no runnable coroutine"<<std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
return 0;
}
std::cout<<"schedule pick "<<pos<<std::endl; // 选择了pos这个位置
// 更新状态
if(current_index != -1) // 初始状态的话,不用更新。否则更新当前协程的状态。
{
if(coroutine_vector[current_index].yielded == 0)
{
// 如果是函数自己执行完毕,设为EMPTY,计数-1。即销毁。
coroutine_vector[current_index].status = EMPTY;
-- coroutine_count;
}
else // 否则是切换出来的,设为挂起。
coroutine_vector[current_index].status = SUSPEND;
}
current_index = pos; // 当前位置标记设为pos
print_status();
// 开始切换
auto coroutine = &coroutine_vector[pos]; // 取新选定的协程
if(coroutine->status == NEW) // 新的协程。需要配置一下。
{
getcontext(&(coroutine->context)); // 获取当前contenxt
// 配置
std::fill_n(coroutine->stack, STACK_SIZE, 0); // 清理stack
coroutine->context.uc_stack.ss_sp = coroutine->stack;
coroutine->context.uc_stack.ss_size = STACK_SIZE;
coroutine->context.uc_stack.ss_flags = 0;
coroutine->context.uc_link = &hub_context; // 此协程执行完跳回hub_context
makecontext(&(coroutine->context), coroutine->func, 0); // 设置要执行的函数
}
coroutine->status = RUNNING;
coroutine->yielded = 0; // 默认标记为“非切换”
// 切换。当前上下文存hub_context
swapcontext(&hub_context, &(coroutine->context));
// 因为uc_link设置为了&hub_context,所以下次切换回hub时会走到这里。
// 这样就实现了每次协程切换都会回到hub_context进行下一次调度。
return 0;
}
int yield() // 切换出去,交出控制权。
{
// 切换回hub
coroutine_vector[current_index].yielded = 1; // 标记为切换
swapcontext(&(coroutine_vector[current_index].context), &hub_context);
// 在用户端的函数内调用yield()
// 当前上下文会存到对应的协程内。下次切换回这个函数时,就会从这里继续执行。
}
int run() // 启动
{
std::cout<<"run"<<std::endl;
print_status();
for(;;) // 不停地调度,直到所有协程都执行完毕。
{
std::cout<<"schedule ..."<<std::endl;
auto result = schedule();
if(result == 1)
{
std::cout<<"all over"<<std::endl;
break;
}
}
return 0;
}
};
Hub hub;
cr.cpp
// xc 20200129 coroutine cr.cpp
#include <iostream>
#include <chrono>
#include <thread>
#include "cr.h"
void func_1_1()
{
std::cout<<"func_1_1"<<std::endl;
hub.yield(); // 切换
std::cout<<"func_1_1 over"<<std::endl;
}
void func_1()
{
std::cout<<"func_1"<<std::endl;
hub.yield(); // 切换
for(int i = 0; i < 6; ++ i)
{
auto result = hub.make(func_1_1); // 创建
if(result == 0)
std::cout<<"hub.make ok"<<std::endl;
else
std::cout<<"hub.make failed"<<std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
}
std::cout<<"func_1 over"<<std::endl;
}
void func_2()
{
std::cout<<"func_2"<<std::endl;
hub.yield(); // 切换
hub.yield(); // 切换
std::cout<<"func_2 over"<<std::endl;
}
void func_3()
{
std::cout<<"func_3"<<std::endl;
hub.yield(); // 切换
std::cout<<"func_3 over"<<std::endl;
}
int main()
{
std::cout<<"main"<<std::endl;
auto result = hub.make(func_1); // 创建
result = hub.make(func_2); // 创建
result = hub.make(func_3); // 创建
hub.run(); // 启动
return 0;
}
输出:
main
run // hub 启动
-- coroutine_count = 3/5 // 新建了3个协程
-- current coroutine id = -1 // 初始状态。当前id为-1
schedule ...
schedule pick 0 // 执行0号位
-- coroutine_count = 3/5
-- current coroutine id = 0
func_1 // 0号位func_1
schedule ...
schedule pick 1 // 切换。执行1号位
-- coroutine_count = 3/5
-- current coroutine id = 1
func_2 // 1号位func_2
schedule ...
schedule pick 2 // 切换。执行2号位
-- coroutine_count = 3/5
-- current coroutine id = 2
func_3 // 2号位func_3
schedule ...
schedule pick 0 // 切换。执行0号位
-- coroutine_count = 3/5
-- current coroutine id = 0
hub.make ok // 连续建6个协程。但是当前已经有3个,而且总数限定为5。所以只成功2个。
hub.make ok
hub.make failed
hub.make failed
hub.make failed
hub.make failed
func_1 over // 0号位func_1结束。
schedule ...
schedule pick 1 // 切换。执行1号位func_2
-- coroutine_count = 4/5 // 数量变为4
-- current coroutine id = 1
schedule ...
schedule pick 2 // 切换。执行2号位
-- coroutine_count = 4/5
-- current coroutine id = 2
func_3 over // 2号位func_3结束。
schedule ...
schedule pick 3 // 切换。执行3号位
-- coroutine_count = 3/5 // 数量变为3
-- current coroutine id = 3
func_1_1 // 3号位func_1_1
schedule ...
schedule pick 4 // 切换。执行4号位
-- coroutine_count = 3/5
-- current coroutine id = 4
func_1_1 // 4号位func_1_1
schedule ...
schedule pick 1 // 切换。执行1号位
-- coroutine_count = 3/5
-- current coroutine id = 1
func_2 over // 1号位func_2结束。
schedule ...
schedule pick 3 // 切换。执行3号位
-- coroutine_count = 2/5 // 数量变为2
-- current coroutine id = 3
func_1_1 over // 3号位func_1_1结束。
schedule ...
schedule pick 4 // 切换。执行4号位
-- coroutine_count = 1/5 // 数量变为1
-- current coroutine id = 4
func_1_1 over // 4号位func_1_1结束。
schedule ...
all over // 所有协程执行完毕