协程的简单实现(C/C++)

协程的简单实现(C/C++)

本文介绍Linux下用C/C++实现简单的协程的方法

协程有数种不同的实现方式,这里尝试最简单的一种方法。


参考:

https://en.wikipedia.org/wiki/Coroutine 协程介绍

https://linux.die.net/man/3/getcontext 函数文档

https://linux.die.net/man/3/makecontext 函数文档

https://code.woboq.org/userspace/glibc/sysdeps/unix/sysv/linux/x86/sys/ucontext.h.html linux源码

https://zhuanlan.zhihu.com/p/52061644

https://github.com/tonbit/coroutine

https://github.com/Winnerhust/uthread

https://github.com/cloudwu/coroutine

搜索引擎能搜到不少资料。其他在此不一一列举。


主要就是用这几个工具,在调用栈层面修改相关寄存器的内容,来实现代码的执行流程(上下文)控制(切换、跳转),从而实现简单的协程。函数为系统级库函数,多数用汇编实现。


看一下数据结构:

/* Userlevel context.  */
typedef struct ucontext_t
  {
    unsigned long int __ctx(uc_flags);
    struct ucontext_t *uc_link;
    stack_t uc_stack;                     //signal stack
    mcontext_t uc_mcontext;
    sigset_t uc_sigmask;
    struct _libc_fpstate __fpregs_mem;
    __extension__ unsigned long long int __ssp[4];
  } ucontext_t;


mcontext_t uc_mcontext
描述cpu的状态。包含相关寄存器的数据。

uc_link
指针,指向下一个上下文(当前上下文结束后执行)。

stack_t uc_stack
signal stack信号栈

sigset_t uc_sigmask
信号mask

再看两个函数

int getcontext(ucontext_t *ucp)
把当前上下文存入ucp,成功返回0。
当后续切换到这个ucp时仿佛从此处再次返回,并往下执行。

int setcontext(const ucontext_t *ucp)
将当前上下文替换为ucp,即“切换”协程,不返回。

看个小例子感受一下:

#include <iostream>
#include <ucontext.h>
#include <chrono>
#include <thread>

int main()
{
    int count = 0;

    ucontext_t context;

    getcontext(&context);

    std::cout<<"gg "<<count<<std::endl; // c++ 打印

    ++ count;

    std::this_thread::sleep_for(std::chrono::seconds(1));

    setcontext(&context); // 跳转到上面的getcontext处继续执行

    return 0;
}

输出为

gg 0

gg 1

gg 2

gg 3

gg 4

gg 5

….

无限循环

注意count的值不会归零,只是执行点瞬间跳转,不相关的值并不会发生回退。


再看另两个函数

void makecontext(ucontext_t *ucp, void (*func)(), int argc, ...);
getcontext获取了上下文后,可以对其进行人为修改。func为此ucp被激活时调用的函数,即一般所说的协程。
此前得给这个ucp的uc_stack干净的空间。uc_link也得指定func结束之后要切换的目的上下文,如果设为NULL,那就不再切换,流程随func直接返回。

int swapcontext(ucontext_t *oucp, ucontext_t *ucp);
把当前上下文存入oucp,然后切换到ucp。

那么现在已经可以实现一个简单的协程系统。

  1. 先有一个Coroutine类。
    包含每个协程自己的状态,上下文等。

  2. 再有一个调度者Hub类。
    Hub管理所有协程的数据、状态,并进行调度。

    Hub会保存自己的上下文,然后每次协程切换都会先回到Hub的上下文,Hub再进行调度。

  3. Hub::make()
    make创建或者说是注册一个协程,用户端把自己的函数传给make,等待执行即可。

  4. Hub::yield()
    yield切换(交出控制权),返回hub继续调度执行其他协程。
    这里说明一下。单纯的协程切换并没有什么用,需要配合事件系统(也可以说异步机制)来达到不浪费cpu的目的。

    举例:现在要从socket读数据,数据由远程机器的数据库产生,可能需要10秒,那么这10秒阻塞就浪费了自己的cpu。

    如果借助某个事件系统比如libev来起一个事件,先切换出去干别的事,数据到了以后libev会发通知,那时再切换回读socket这个点继续执行代码。这样就不会浪费cpu。
    可参考gevent等。这里不作深入,只实现基本的协程。

  5. 调度。这里没有任何算法,没有优化,就是从当前位置无脑往后轮询,保证每个协程都能轮到、不出现卡死即可。
    实用情况应参考或发明各种调度方法。


最终代码和测试结果:

cr.h

// xc 20200129 coroutine cr.h
#include <iostream>
#include <ucontext.h>
#include <vector>
#include <chrono>
#include <cassert>

// EMPTY-可创建新协程 NEW-新的协程 RUNNING-运行中 SUSPEND-挂起 DONE-完成
enum CoroutineStatus {EMPTY, NEW, RUNNING, SUSPEND, DONE};

const int STACK_SIZE = 1024 * 32; // 每个协程的栈大小
const int MAX_COROUTINE_COUNT = 5; // 协程总数限制

class Coroutine // 协程类。包含状态、上下文、函数、各种标志位。
{
public:
    int id;
    CoroutineStatus status;
    ucontext_t context;     // 上下文
    char stack[STACK_SIZE]; // 栈空间。直接用数组,不折腾动态申请。
    void (*func)(void); // 要执行的函数。为了简便,不处理参数了。
    int yielded; // 区分协程是1.函数自然执行完毕,还是2.切换出来。如果是执行完毕,状态改为EMPTY腾出位置。

    Coroutine()
    {
        status = EMPTY; // 默认EMPTY,该位置可创建新协程。
    }
};

class Hub // hub类。即调度者,每次协程切换都会回到hub的context进行调度。
{
private:
    ucontext_t hub_context; // hub的上下文。每次协程切换都会回这里进行调度。

public:
    int max_size; // 协程总数限制
    int current_index; // 当前位置
    int coroutine_count; // 当前协程数

    std::vector<Coroutine> coroutine_vector; // 协程vector。简化为vector,一次创建,不动态申请。

public:
    Hub()
    {
        coroutine_count = 0;
        current_index = -1;
        max_size = MAX_COROUTINE_COUNT;
        coroutine_vector = std::vector<Coroutine>(MAX_COROUTINE_COUNT); // 直接创建所有。不折腾动态申请内存。
    }

    int print_status() // 打印系统当前状态
    {
        std::cout<<"--    coroutine_count = "<<this->coroutine_count<<"/"<<this->max_size<<std::endl;
        std::cout<<"--    current coroutine id = "<<this->current_index<<std::endl;
    }

    int make(void (*func)(void)) // 创建协程
    {
        if(coroutine_count >= max_size) // 超出限定数量
            return 1;

        // 找一个EMPTY的位置
        int pos = -1;
        auto coroutine_vector_size = coroutine_vector.size();
        for(int i = current_index + 1; i < coroutine_vector_size; ++ i) // 往后找
        {
            if(EMPTY == coroutine_vector[i].status)
            {
                pos = i;
                break;
            }
        }

        if(pos == -1) // 没找到。再从头找。
        {
            for(int i = 0; i < current_index; ++ i)
            {
                if(EMPTY == coroutine_vector[i].status)
                {
                    pos = i;
                    break;
                }
            }
        }

        assert(pos != -1);

        coroutine_vector[pos].id = pos;
        coroutine_vector[pos].status = NEW; // 标志为新协程
        coroutine_vector[pos].func = func;

        ++ coroutine_count; // 计数+1

        return 0;
    }

    int schedule() // 调度
    {
        // 选出要执行的协程。NEW或SUSPEND状态。
        int pos = -1;
        auto coroutine_vector_size = coroutine_vector.size();
        for(int i = current_index + 1; i < coroutine_vector_size; ++ i) // 往后找
        {
            if(NEW == coroutine_vector[i].status || SUSPEND == coroutine_vector[i].status)
            {
                pos = i;
                break;
            }
        }

        if(pos == -1) // 从头找
        {
            for(int i = 0; i < current_index; ++ i)
            {
                if(NEW == coroutine_vector[i].status || SUSPEND == coroutine_vector[i].status)
                {
                    pos = i;
                    break;
                }
            }
        }

        if(pos == -1) // 没找到可执行的
        {
            if(coroutine_count == 1 && coroutine_vector[current_index].yielded == 0)
            {
                return 1; // 如果只存在一个协程,那么就是当前的协程,并且yielded == 0,代表函数正常结束,那么代表所有协程都执行完毕。
            }

            std::cout<<"no runnable coroutine"<<std::endl;
            std::this_thread::sleep_for(std::chrono::seconds(1));
            return 0;
        }

        std::cout<<"schedule pick "<<pos<<std::endl; // 选择了pos这个位置

        // 更新状态
        if(current_index != -1) // 初始状态的话,不用更新。否则更新当前协程的状态。
        {
            if(coroutine_vector[current_index].yielded == 0)
            {
                // 如果是函数自己执行完毕,设为EMPTY,计数-1。即销毁。
                coroutine_vector[current_index].status = EMPTY;
                -- coroutine_count;
            }
            else // 否则是切换出来的,设为挂起。
                coroutine_vector[current_index].status = SUSPEND;
        }

        current_index = pos; // 当前位置标记设为pos

        print_status();

        // 开始切换
        auto coroutine = &coroutine_vector[pos]; // 取新选定的协程

        if(coroutine->status == NEW) // 新的协程。需要配置一下。
        {
            getcontext(&(coroutine->context)); // 获取当前contenxt

            // 配置
            std::fill_n(coroutine->stack, STACK_SIZE, 0); // 清理stack
            coroutine->context.uc_stack.ss_sp = coroutine->stack;
            coroutine->context.uc_stack.ss_size = STACK_SIZE;
            coroutine->context.uc_stack.ss_flags = 0;
            coroutine->context.uc_link = &hub_context; // 此协程执行完跳回hub_context

            makecontext(&(coroutine->context), coroutine->func, 0); // 设置要执行的函数

        }

        coroutine->status = RUNNING;
        coroutine->yielded = 0;  // 默认标记为“非切换”

        // 切换。当前上下文存hub_context
        swapcontext(&hub_context, &(coroutine->context));

        // 因为uc_link设置为了&hub_context,所以下次切换回hub时会走到这里。
        // 这样就实现了每次协程切换都会回到hub_context进行下一次调度。

        return 0;
    }

    int yield() // 切换出去,交出控制权。
    {
        // 切换回hub
        coroutine_vector[current_index].yielded = 1; // 标记为切换
        swapcontext(&(coroutine_vector[current_index].context), &hub_context);

        // 在用户端的函数内调用yield()
        // 当前上下文会存到对应的协程内。下次切换回这个函数时,就会从这里继续执行。
    }

    int run() // 启动
    {
        std::cout<<"run"<<std::endl;
        print_status();
        for(;;) // 不停地调度,直到所有协程都执行完毕。
        {
            std::cout<<"schedule ..."<<std::endl;
            auto result = schedule();

            if(result == 1)
            {
                std::cout<<"all over"<<std::endl;
                break;
            }
        }

        return 0;
    }
};

Hub hub;

cr.cpp

// xc 20200129 coroutine cr.cpp
#include <iostream>
#include <chrono>
#include <thread>
#include "cr.h"

void func_1_1()
{
    std::cout<<"func_1_1"<<std::endl;
    hub.yield(); // 切换
    std::cout<<"func_1_1 over"<<std::endl;
}

void func_1()
{
    std::cout<<"func_1"<<std::endl;
    hub.yield(); // 切换

    for(int i = 0; i < 6; ++ i)
    {
        auto result = hub.make(func_1_1); // 创建
        if(result == 0)
            std::cout<<"hub.make ok"<<std::endl;
        else
            std::cout<<"hub.make failed"<<std::endl;

        std::this_thread::sleep_for(std::chrono::seconds(1));
    }
    std::cout<<"func_1 over"<<std::endl;
}

void func_2()
{
    std::cout<<"func_2"<<std::endl;
    hub.yield(); // 切换
    hub.yield(); // 切换
    std::cout<<"func_2 over"<<std::endl;
}

void func_3()
{
    std::cout<<"func_3"<<std::endl;
    hub.yield(); // 切换
    std::cout<<"func_3 over"<<std::endl;
}


int main()
{
    std::cout<<"main"<<std::endl;
    auto result = hub.make(func_1); // 创建
    result = hub.make(func_2); // 创建
    result = hub.make(func_3); // 创建

    hub.run(); // 启动

    return 0;
}

输出:

main
run                                  // hub 启动
--    coroutine_count = 3/5          // 新建了3个协程
--    current coroutine id = -1      // 初始状态。当前id为-1
schedule ...
schedule pick 0                      // 执行0号位
--    coroutine_count = 3/5
--    current coroutine id = 0
func_1                               // 0号位func_1
schedule ...
schedule pick 1                      // 切换。执行1号位
--    coroutine_count = 3/5
--    current coroutine id = 1
func_2                               // 1号位func_2
schedule ...
schedule pick 2                      // 切换。执行2号位
--    coroutine_count = 3/5
--    current coroutine id = 2
func_3                               // 2号位func_3
schedule ...
schedule pick 0                      // 切换。执行0号位
--    coroutine_count = 3/5
--    current coroutine id = 0
hub.make ok                          // 连续建6个协程。但是当前已经有3个,而且总数限定为5。所以只成功2个。
hub.make ok
hub.make failed
hub.make failed
hub.make failed
hub.make failed
func_1 over                          // 0号位func_1结束。
schedule ...
schedule pick 1                      // 切换。执行1号位func_2
--    coroutine_count = 4/5          // 数量变为4
--    current coroutine id = 1
schedule ...
schedule pick 2                      // 切换。执行2号位
--    coroutine_count = 4/5
--    current coroutine id = 2
func_3 over                          // 2号位func_3结束。
schedule ...
schedule pick 3                      // 切换。执行3号位
--    coroutine_count = 3/5          // 数量变为3
--    current coroutine id = 3
func_1_1                             // 3号位func_1_1
schedule ...
schedule pick 4                      // 切换。执行4号位
--    coroutine_count = 3/5
--    current coroutine id = 4
func_1_1                             // 4号位func_1_1
schedule ...
schedule pick 1                      // 切换。执行1号位
--    coroutine_count = 3/5
--    current coroutine id = 1
func_2 over                          // 1号位func_2结束。
schedule ...
schedule pick 3                      // 切换。执行3号位
--    coroutine_count = 2/5          // 数量变为2
--    current coroutine id = 3
func_1_1 over                        // 3号位func_1_1结束。
schedule ...
schedule pick 4                      // 切换。执行4号位
--    coroutine_count = 1/5          // 数量变为1
--    current coroutine id = 4
func_1_1 over                        // 4号位func_1_1结束。
schedule ...
all over                             // 所有协程执行完毕

上次更新 2021-01-28