libco 分析(下):协程的管理

Posted by YuanBao on October 20, 2017

前文 libco 分析(上):协程的实现 中我们介绍了 libco 使用汇编代码实现 协程上下文管理和切换的原理。今天我们继续介绍 libco 管理协程的逻辑,包括如何对 readwrite 等接口进行非侵入式改造以及 libco 的共享栈原理。

在继续下文之前想说明一点,相比于同是微信开源的 phxrpc,libco 整个代码的编码质量不算很高,代码风格比较杂乱,格式以及命名也没有严格遵循要求,并且缺乏功能上的注释。在代码中先后出现像函数 pollco_pollco_poll_innerco_eventloop 这种看起来很难区分功能的命名。但是我们在学习的过程中可以去其糟粕,取其精华,学习好的设计思想以及编程思路就可以了。

如何使用 libco

我们首先以 libco 提供的例子 example_echosvr.cpp 来介绍应用程序如何使用 libco 来编写服务端程序。 在 example_echosvr.cpp 的 main 函数中,主要执行如下几步:

  1. 创建 socket,监听在本机的 1024 端口,并设置为非阻塞;
  2. 主线程使用函数 readwrite_coroutine 创建多个读写协程,调用 co_resume 启动协程运行直到其挂起。这里我们忽略掉无关的多进程 fork 的过程;
  3. 主线程继续创建 socket 接收协程 accpet_co,同样调用 co_resume 启动协程直到其挂起;
  4. 主线程调用函数 co_eventloop 实现事件的监听和协程的循环切换;

函数 readwrite_coroutine 在外层循环中将新创建的读写协程都加入到队列 g_readwrite 中,此时这些读写协程都没有具体与某个 socket 连接对应,可以将队列 g_readwrite 看成一个 coroutine pool。当加入到队列中之后,调用函数 co_yield_ct 函数让出 CPU,此时控制权回到主线程。

主线程中的函数 co_eventloop 监听网络事件,将来自于客户端新进的连接交由协程 accept_co 处理,关于 co_eventloop 如何唤醒 accept_co 的细节我们将在后续介绍。accept_co 调用函数 accept_routine 接收新连接,该函数的流程如下:

  1. 检查队列 g_readwrite 是否有空闲的读写 coroutine,如果没有,调用函数 poll 将该协程加入到 Epoll 管理的定时器队列中,也就是 sleep(1000) 的作用;
  2. 调用 co_accept 来接收新连接,如果接收连接失败,那么调用 co_poll 将服务端的 listen_fd 加入到 Epoll 中来触发下一次连接事件;
  3. 对于成功的连接,从 g_readwrite 中取出一个读写协程来负责处理读写;

再次回到函数 readwrite_coroutine 中,该函数会调用 co_poll 将新建立的连接的 fd 加入到 Epoll 监听中,并将控制流程返回到 main 协程;当有读或者写事件发生时,Epoll 会唤醒对应的 coroutine ,继续执行 read 函数以及 write 函数。

上面的过程大致说明了控制流程是如何在不同的协程中切换,接下来我们介绍具体的实现细节,即如何通过 Epoll 来管理协程,以及如何对系统函数进行改造以满足 libco 的调用。

通过 Epoll 管理和唤醒协程

Epoll 监听 FD

上一章节中介绍了协程可以通过函数 co_poll 来将 fd 交由 Epoll 管理,待 Epoll 的相应的事件触发时,再切换回来执行 read 或者 write 操作,从而实现由 Epoll 管理协程的功能。co_poll 函数原型如下:

int co_poll(stCoEpoll_t *ctx, struct pollfd fds[], 
            nfds_t nfds, int timeout_ms)

stCoEpoll_t 是为 libco 定制的 Epoll 相关数据结构(这里吐槽一下,参数使用 ctx 也是醉了),fdspollfd 结构的文件句柄,nfdsfds 数组的长度,最后一个参数表示定时器时间,也就是在 timeout 毫秒之后触发处理这些文件句柄。这里可以看到,co_poll 能够同时将多个文件句柄同时加入到 Epoll 管理中。我们先看 stCoEpoll_t 结构:

struct stCoEpoll_t
{
    int iEpollFd;  // Epoll 主 FD
    static const int _EPOLL_SIZE = 1024 * 10;  // Epoll 可以监听的句柄总数

    struct stTimeout_t *pTimeout;  // 时间轮定时器
    struct stTimeoutItemLink_t *pstTimeoutList;  // 已经超时的时间
    struct stTimeoutItemLink_t *pstActiveList;   // 活跃的事件
    co_epoll_res *result;  // Epoll 返回的事件结果
};

stTimeout_ 开头的数据结构与 libco 的定时器管理有关,我们在后面介绍。co_epoll_res 是对 Epoll 事件数据结构的封装,也就是每次触发 Epoll 事件时的返回结果,在 Unix 和 MaxOS 下,libco 将使用 Kqueue 替代 Epoll,因此这里也保留了 kevent 数据结构。

struct co_epoll_res
{
    int size;
    struct epoll_event *events;  // for linux epoll
    struct kevent *eventlist;    // for Unix or MacOs kqueue
};

co_poll 实际是对函数 co_poll_inner 的封装。我们将 co_epoll_inner 函数的结构分为上下两半段。在上半段中,调用 co_poll 的协程 $\mathcal{C}$ 将其需要监听的句柄数组 fds 都加入到 Epoll 管理中,并通过函数 co_yield_env 让出 CPU;当 main 协程的事件循环 co_eventloop 中触发了 $\mathcal{C}$ 对应的监听事件时,会恢复 $\mathcal{C}$ 的执行。此时,$\mathcal{C}$ 将开始执行下半段,即将上半段添加的句柄 fds 从 epoll 中移除,清理残留的数据结构,下面的流程图简要说明了控制流的转移过程:

有了上面的基本概念,我们来看具体的实现细节。co_poll 首先在内部将传入的文件句柄数组 fds 转化为数据结构 stPoll_t,这一步主要是为了方便后续处理。该结构记录了 iEpollFdndfsfds 数组,以及该协程需要执行的函数和参数。有两点需要说明的是:

  1. 对于每一个 fd,为其申请一个 stPollItem_t 来管理对应 Epoll 事件以及记录回调参数。libco 在此做了一个小的优化,对于长度小于 2 的 fds 数组,直接在栈上定义相应的 stPollItem_t 数组,否则从堆中申请内存。这也是一种比较常见的优化,毕竟从堆中申请内存比较耗时;
  2. 函数指针 OnPollProcessEvent 封装了协程的切换过程。当传入指定的 stPollItem_t 结构时,即可唤醒对应于该结构的 coroutine,将控制权交由其执行;

co_poll 的第二步,也是最关键的一步,就是将 fd 数组全部加入到 Epoll 中进行监听。协程 $\mathcal{C}$ 会将每一个 epoll_event 的 data.ptr 域设置为对应的 stPollItem_t 结构。这样当事件触发时,可以直接从对应的 ptr 中取出 stPollItem_t 结构,然后唤醒指定协程。

如果本次操作提供了 Timeout 参数,co_poll 还会将协程 $\mathcal{C}$ 本次操作对应的 stPoll_t 加入到定时器队列中。这表明在 Timeout 定时触发之后,也会唤醒协程 $\mathcal{C}$ 的执行。当整个上半段都完成后,co_poll 立即调用 co_yield_env 让出 CPU,执行流程跳转回到 main 协程中。

从上面的流程图中也可以看出,当执行流程再次跳回时,表明协程 $\mathcal{C}$ 添加的读写等监听事件已经触发,即可以执行相应的读写操作了。此时 $\mathcal{C}$ 首先将其在上半段中添加的监听事件从 Epoll 中删除,清理残留的数据结构,然后调用读写逻辑。

定时器实现

协程 $\mathcal{C}$ 在将一组 fds 加入 Epoll 的同时,还能为其设置一个超时时间。在超时时间到期时,也会再次唤醒 $\mathcal{C}$ 来执行。libco 使用 Timing-Wheel 来实现定时器。关于 Timing-Wheel 算法,可以参考 Linux 下内核定时器实现,其优势是 O(1) 的插入和删除复杂度,缺点是只有有限的长度,在某些场合下不能满足需求。

回过去看 stCoEpoll_t 结构,其中 *pTimeout 代表时间轮,通过函数 AllocateTimeout 初始化为一个固定大小(60 * 1000)的数组。根据 Timing-Wheel 的特性可知,libco 只支持最大 60s 的定时事件。而实际上,在添加定时器时,libco 要求定时时间不超过 40s。成员 pstTimeoutList 记录在 co_eventloop 中发生超时的事件,而 pstActiveList 记录当前活跃的事件,包括超时事件。这两个结构都将在 co_eventloop 中进行处理。

下面我们简要分析一下加入定时器的实现:

int AddTimeout( stTimeout_t *apTimeout, stTimeoutItem_t *apItem, 
    unsigned long long allNow )
{
    if( apTimeout->ullStart == 0 )  // 初始化时间轮的基准时间
    {
        apTimeout->ullStart = allNow;
        apTimeout->llStartIdx = 0;  // 当前时间轮指针指向数组0
    }
    // 1. 当前时间不可能小于时间轮的基准时间
    // 2. 加入的定时器的超时时间不能小于当前时间
    if( allNow < apTimeout->ullStart || apItem->ullExpireTime < allNow )
    {
        return __LINE__;
    }

    int diff = apItem->ullExpireTime - apTimeout->ullStart;
    if( diff >= apTimeout->iItemSize )  // 添加的事件不能超过时间轮的大小
    {
        return __LINE__;
    }
    // 插入到时间轮盘的指定位置
    AddTail( apTimeout->pItems + 
        (apTimeout->llStartIdx + diff ) % apTimeout->iItemSize, apItem );

    return 0;
}

定时器的超时检查在函数 co_eventloop 中执行。

EPOLL 事件循环

main 协程通过调用函数 co_eventloop 来监听 Epoll 事件,并在相应的事件触发时切换到指定的协程执行。有关 co_eventloop 与 应用协程的交互过程在上一节的流程图中已经比较清楚了,下面我们主要介绍一下 co_eventloop 函数的实现:

上文中也提到,通过 epoll_wait 返回的事件都保存在 stCoEpoll_t 结构的 co_epoll_res 中。因此 co_eventloop 首先为 co_epoll_res 申请空间,之后通过一个无限循环来监听所有 coroutine 添加的所有事件:

for(;;)
{
    int ret = co_epoll_wait( ctx->iEpollFd,result,stCoEpoll_t::_EPOLL_SIZE, 1 );
    ...
}

对于每一个触发的事件,co_eventloop 首先通过指针域 data.ptr 取出保存的 stPollItem_t 结构,并将其添加到 pstActiveList 列表中;之后从定时器轮盘中取出所有已经超时的事件,也将其全部添加到 pstActiveList 中,pstActiveList 中的所有事件都作为活跃事件处理。

对于每一个活跃事件,co_eventloop 将通过调用对应的 pfnProcess 也就是上图中的OnPollProcessEvent 函数来切换到该事件对应的 coroutine,将流程跳转到该 coroutine 处执行。

最后 co_eventloop 在调用时也提供一个额外的参数来供调用者传入一个函数指针 pfn。该函数将会在每次循环完成之后执行;当该函数返回 -1 时,将会终止整个事件循环。用户可以利用该函数来控制 main 协程的终止或者完成一些统计需求。

非侵入改造系统函数

libco 在文件 co_hook_sys_call.cpp 中实现了对于系统函数的 hook。关于这一块的内容,推荐先看一下以前写的文章 动态链接黑魔法: Hook 系统函数,介绍了如何使用运行时动态链接 Hook 系统函数的过程,看完应该有一个大致的了解。剩下的内容这里也不再多说,主要是要仔细写起来也会很复杂(其实就是懒得写,不想写,很任性 ^-^ )。

关于 libco 其实还有很多东西可以学习,但是主要的内容就打算写这两篇文章了。后面有时间写写其他好玩儿的内容。