上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

muduo库的模拟实现——工具部分

guduadmin261月前

文章目录

  • 一、Buffer模块
    • 1.为什么需要Buffer缓冲区
    • 2.Buffer模块的设计
    • 3.Buffer模块的实现
    • 4.Buffer缓冲区的其它设计方案
    • 二、Socket模块
      • 1.Socket模块的设计
      • 2.Socket代码实现
      • 三、Acceptor模块
        • 1.Acceptor模块的设计与实现
        • 2.Acceptor模块完整代码实现
        • 四、定时器模块
          • 1.时间轮的思想
          • 2.TimerTask类
          • 3.TimerWheel类
          • 五、线程池模块
            • 1.LoopThread类
            • 2.LoopThreadPool类

              一、Buffer模块

              1.为什么需要Buffer缓冲区

              我们实现的TCP网络服务器必须要有发送缓冲区和接收缓冲区,这一点是毋庸置疑的。考虑以下两种情况:

              • 如果服务器没有发送缓冲区,那么当服务器想向客户端发送假设100KB的数据时,调用操作系统的write接口进行发送。但是如果操作系统(准确来说是操作系统的缓冲区)只接收了80KB,我们服务器还有20KB的数据没有发送出去,这个时候只能阻塞在write接口处等待最后20KB数据发送出去。如果有了发送缓冲区,我们就能将这20KB的数据再放回发送缓冲区,等什么时候操作系统缓冲区能接收新数据了再进行第二次发送。
              • 如果服务器没有接收缓冲区,由于TCP是面向字节流的协议,也就是说它的数据报文是没有明显边界的,那么就可能会出现一次接收的数据不完整或者数据粘包的问题,这时候我们拿到不完整的数据既不能解析,也不能丢弃,只有接收缓冲区,能让我们缓存这些不完整的数据报文,等到下一次对方再发送新数据的时候,我们读取到完整的报文时再交给上层进行业务处理。

                这两个简单常见的例子就足以说明,TCP服务器必须要有Buffer缓冲区。

                2.Buffer模块的设计

                muduo库的作者陈硕老师在《Linux多线程服务端编程:使用muduo C++网络库》书中提到,muduo Buffer的设计考虑了常见的网络编程需求,他试图在易用性和性能之间找一个平衡点,目前这个平衡点更偏向于易用性。所以muduo库的Buffer设计要点如下:

                • 对外表现是一块连续的内存空间,这样更方便代码的编写。
                • Buffer缓冲区的空间大小是可变的,支持动态扩容,以适应不同大小的消息。
                • 虽然服务器需要接收缓冲区和发送缓冲区,但我们不希望分开两个缓冲区来写,而是希望一块内存空间既能读也能写,做到读写配合。

                  从连续的内存空间、支持动态扩容这两个要点来看,Buffer缓冲区底层的数据结构应该选择vector最合适。并且还需要两个index分别代表读位置和写位置,这样就能满足一块内存空间的读写配合。有个细节的地方需要注意,读位置和写位置的index不应该设置为指针类型或者迭代器类型,最好使用下标来表示,因为vector如果需要扩容将数据拷贝到新空间,会出现迭代器失效的情况,需要我们去处理迭代器失效,从易用性角度考虑,使用下标是最好的。

                  确定了Buffer的数据结构以后,我们可以很容易地想象出Buffer的结构图如下图所示:

                  muduo库的模拟实现——工具部分,在这里插入图片描述,第1张

                  通过起始位置、读位置(reader_index)、写位置(writer_index)、末尾位置,我们可以将Buffer分成三部分:

                  1. 读位置之前的空间,即起始位置和读位置之间的空间,我们称为头部空闲空间;
                  2. 读位置和写位置之间的空间,我们称为有效数据。这部分存放的就是可读数据;
                  3. 写位置之后的空间,即写位置和末尾位置之间的空间,我们称为尾部空闲空间。

                  划分完成之后,Buffer缓冲区的读写数据方式会变得非常清晰。读位置和写位置之间的空间,就是有效数据区域,也就是可读数据。当我们要从Buffer缓冲区中读取数据时,从读位置(reader_index)开始向后读取指定的字节,读取完以后再将读位置(reader_index)向后移动读取的字节数。写入也是同理,当我们要向Buffer缓冲区中写入数据时,从写位置(writer_index)开始向后写入指定的字节,写入完以后再将写位置(writer_index)向后移动写入的字节数。当读位置(reader_index)和写位置(writer_index)相遇的时候,说明缓冲区已经没有可读数据了。

                  如果写位置(writer_index)一直写入到末尾位置,尾部空闲空间不够写入怎么办?

                  Buffer模块是支持动态扩容的,当我们向Buffer缓冲区写入指定字节的数据时,它先会去检查尾部空闲空间能不能容纳这些字节的数据,如果不能,再将头部空闲空间加入进来,判断尾部空闲空间加上头部空闲空间是否能够容纳,如果可以的话,就将可读数据挪动到起始位置,更新读位置(reader_index)和写位置(writer_index)后再写入新数据,例如下列示意图:

                  muduo库的模拟实现——工具部分,在这里插入图片描述,第2张

                  如果尾部空闲空间加上头部空闲空间都不够写入,那就只能从写位置(writer_index)开始向后扩容了。需要注意的是,扩容之后的Buffer并不会缩小空间大小,比如扩容到1000字节的大小,下一次写入比1000字节小的数据也不会重新开辟分配空间。也就是说,muduo库的Buffer缓冲区空间大小是自适应的,比如一开始空间大小是1KB,程序经常收发10KB的数据,那么用了几次之后它的空间大小会自动增长到10KB,然后就保持不变。这样做的好处是:一方面避免浪费内存,即一开始不会开很大的空间,而是不够了再去扩容。另一方面是避免反复分配内存,扩容之后申请了多大的内存空间就保持不变了,只会扩大不会缩小,因为分配内存也是需要时间开销的。

                  3.Buffer模块的实现

                  有了上述的设计思路以后,我们实现一个Buffer模块并不是什么难事,这里直接把代码贴出来了。我写这些文章的目的是记录每个模块的设计思路以及细节,学习muduo库设计的高性能高并发版本服务器,所以代码实现并不是最重要的,思路更重要,这些简单代码就不过多介绍浪费篇幅了。

                  #define BUFFER_DEFAULT_SIZE 1024
                  class Buffer
                  {
                  public:
                      Buffer()
                          : _reader_index(0), _writer_index(0), _buffer(BUFFER_DEFAULT_SIZE)
                      {
                      }
                      // 获取当前写入起始地址
                      char *getWriteStartPosition()
                      {
                          // buffer空间的起始地址加上写偏移量
                          return begin() + _writer_index;
                      }
                      // 获取当前读取起始地址
                      char *getReadStartPosition()
                      {
                          return begin() + _reader_index;
                      }
                      // 获取缓冲区末尾空间大小--写偏移之后的空闲空间
                      uint64_t getTailSpaceSize()
                      {
                          // 整体空间大小减去写偏移
                          return _buffer.size() - _writer_index;
                      }
                      // 获取缓冲区起始空间大小--读偏移之前的空闲空间
                      uint64_t getHeadSpaceSize()
                      {
                          return _reader_index;
                      }
                      // 获取可读数据大小
                      uint64_t getReadableSize()
                      {
                          return _writer_index - _reader_index;
                      }
                      // 将读偏移向后移动
                      void moveReadOffset(uint64_t len)
                      {
                          if(len == 0)
                          {
                              return;
                          }
                          assert(len <= getReadableSize());
                          _reader_index += len;
                      }
                      // 将写偏移向后移动
                      void moveWriteOffset(uint64_t len)
                      {
                          assert(len <= getTailSpaceSize());
                          _writer_index += len;
                      }
                      // 确保可写空间足够(整体空闲空间够了就移动数据,否则就扩容)
                      void ensureWriteSpace(uint64_t len)
                      {
                          // 如果末尾空闲空间大小足够,直接返回
                          if (len <= getTailSpaceSize())
                          {
                              return;
                          }
                          // 如果末尾空闲空间不够,则判断加上起始空闲空间大小是否足够,够的话就将数据到起始位置
                          else if (len <= getHeadSpaceSize() + getTailSpaceSize())
                          {
                              uint64_t readAbleSize = getReadableSize();
                              std::copy(getReadStartPosition(), getReadStartPosition() + readAbleSize, begin());
                              _reader_index = 0;
                              _writer_index = readAbleSize;
                          }
                          else
                          {
                              // 扩容
                              _buffer.resize(_writer_index + len);
                          }
                      }
                      // 写入数据
                      void write(const void *data, uint64_t len)
                      {
                          // 1.保证有足够空间 2.拷贝数据进行
                          if(len == 0)
                          {
                              return;
                          }
                          ensureWriteSpace(len);
                          const char *chardata = (const char *)data;
                          std::copy(chardata, chardata + len, getWriteStartPosition());
                      }
                      void writeAndPush(const void *data, uint64_t len)
                      {
                          write(data, len);
                          moveWriteOffset(len);
                      }
                      void writeFromString(const std::string &data)
                      {
                          write(data.c_str(), data.size());
                      }
                      void writeStringAndPush(const std::string &data)
                      {
                          writeFromString(data);
                          moveWriteOffset(data.size());
                      }
                      void writeFromBuffer(Buffer &data)
                      {
                          write(data.getReadStartPosition(), data.getReadableSize());
                      }
                      void writeBufferAndPush(Buffer &data)
                      {
                          writeFromBuffer(data);
                          moveWriteOffset(data.getReadableSize());
                      }
                      // 读取数据
                      void read(void *buf, uint64_t len)
                      {
                          // 要求要获取的数据大小必须小于可读数据大小
                          assert(len <= getReadableSize());
                          std::copy(getReadStartPosition(), getReadStartPosition() + len, (char *)buf);
                      }
                      void readAndPop(void *buf, uint64_t len)
                      {
                          read(buf, len);
                          moveReadOffset(len);
                      }
                      std::string readAsString(uint64_t len)
                      {
                          assert(len <= getReadableSize());
                          std::string str;
                          str.resize(len);
                          read(&str[0], len);
                          return str;
                      }
                      std::string readAsStringAndPop(uint64_t len)
                      {
                          assert(len <= getReadableSize());
                          std::string str = readAsString(len);
                          moveReadOffset(len);
                          return str;
                      }
                      // 寻找换行字符
                      char *findCRLF()
                      {
                          char *findRes = (char *)memchr(getReadStartPosition(), '\n', getReadableSize());
                          return findRes;
                      }
                      std::string getLine()
                      {
                          char *pos = findCRLF();
                          if (pos == nullptr)
                          {
                              return "";
                          }
                          return readAsString(pos - getReadStartPosition() + 1);
                      }
                      std::string getLineAndPop()
                      {
                          std::string str = getLine();
                          moveReadOffset(str.size());
                          return str;
                      }
                      // 清空缓冲区
                      void clear()
                      {
                          _reader_index = 0;
                          _writer_index = 0;
                      }
                  private:
                      char *begin()
                      {
                          return &(*_buffer.begin());
                      }
                  private:
                      std::vector _buffer; // 使用vector进行内存空间管理
                      uint64_t _reader_index;    // 读偏移
                      uint64_t _writer_index;    // 写偏移
                  };
                  

                  4.Buffer缓冲区的其它设计方案

                  其实在刚开始学习muduo库的Buffer缓冲区时,我很好奇为什么会选择vector这个数据结构作为缓冲区的底层数据结构呢?要知道vector的优势在于连续性,但是插入和删除的时间复杂度都是比较高的,因为vector的插入和删除会涉及到很多数据挪动,时间复杂度往往都是O(N^2)级别的。而muduo库不是高性能的网络版本服务器吗?怎么还会用这种低效的方式呢?我一开始以为会是什么复杂的高深的数据结构来实现,将时间复杂度降到最低,毕竟服务器不就是追求性能和效率吗?

                  其实这个问题的答案作者在书中已经给我们解答了,还记得文章开始我们介绍过,作者陈硕老师在《Linux多线程服务端编程:使用muduo C++网络库》书中提到:“muduo Buffer的设计考虑了常见的网络编程需求,试图在易用性和性能之间找一个平衡点,目前这个平衡点更偏向于易用性”。所以选择vector作为Buffer的存储结构是因为偏向易用性,为了让代码实现以及muduo库组件的使用都更简单方便。

                  作者在书中也提供了其它几种设计方案,感兴趣的读者可以去翻阅一下这本书。其中最高效的方案应该是zero copy方案,该方案是实现分段连续的zero copy buffer。这个方案虽然使得服务器性能更好更高效,但代价就是代码实现变得复杂,晦涩难懂。并且由于Buffer缓冲区不是连续的,parse消息会变得比较麻烦。

                  那么为什么muduo库的Buffer选择偏向易用性而不是偏向高性能呢?

                  其实我们都可以发现,muduo库的Buffer模块有很多可以优化从而提高效率的地方。那么我们可能会怀疑,muduo库的性能会不会太低了?作者给出的解释是:“可以优化,但不一定值得优化”。

                  目前最常用的千兆以太网的吞吐量基本都是几百兆每秒,而现在服务器上最常用的DDR2/DDR3内存的带宽至少是4GB/s,比千兆以太网高40倍以上。也就是说,对于Buffer缓冲区中几KB或几十KB大小的数据,在内存中复制几次根本不是问题,因为受千兆以太网延迟和带宽的限制,跟这个程序通信的其他机器上的程序不会察觉到性能差异。

                  再比如说,如果我们的服务器上层应用需要和数据库打交道,那么性能瓶颈往往出现在数据库的数据交互上,提高服务器本身的这点性能,尤其是Buffer缓冲区模块的这一点点性能提升,对整个程序本身的优化是微不足道的,因为从数据库中读取一次数据往往就抵消了Buffer模块所做的所有低等级优化,这时还不如把精力放在DB调优上。

                  所以muduo库的Buffer模块可以优化,但优化的意义往往不大,因为这些性能提升往往是微不足道的。如果确实在内存带宽方面遇到问题,或者确实有性能的需要,可以考虑将Buffer模块改写为zero copy方案,甚至可以考虑将程序放到Linux内核中去,而不是在用户态尝试各种优化。因为只有把程序做到操作系统内核才能真正实现zero copy,否则内核态和用户态之间始终有一次内存拷贝。

                  二、Socket模块

                  1.Socket模块的设计

                  Socket模块是封装了socket套接字操作,也就是将Linux的那一套TCP socket套接字操作封装成一个Socket类,这个类比较简单,需要的功能有以下几个:

                  • 创建套接字,即封装socket创建套接字函数操作。
                  • 绑定地址信息,即封装bind函数操作。
                  • 设置开始监听,即封装listen函数操作。
                  • 设置客户端发起连接请求,即封装connect函数操作。
                  • 获取新连接,即封装accept函数操作。
                  • 接收数据,即封装recv函数操作。
                  • 发送数据,即封装send函数操作。
                  • 最后提供创建服务器连接接口和创建客户端连接接口。

                    2.Socket代码实现

                    #define MAX_LISTEN 1024
                    class Socket
                    {
                    public:
                        Socket()
                            : _sockFd(-1)
                        {
                        }
                        Socket(int fd)
                            : _sockFd(fd)
                        {
                        }
                        ~Socket()
                        {
                            closeSocket();
                        }
                        // 创建套接字
                        bool createSocket()
                        {
                            _sockFd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
                            if (_sockFd < 0)
                            {
                                LOG("create socket error");
                                return false;
                            }
                            return true;
                        }
                        // 绑定地址信息
                        bool bindSocket(const std::string &ip, uint16_t port)
                        {
                            sockaddr_in addr;
                            addr.sin_family = AF_INET;
                            addr.sin_port = htons(port);
                            addr.sin_addr.s_addr = ip.size() == 0 ? INADDR_ANY : inet_addr(ip.c_str());
                            socklen_t len = sizeof(sockaddr_in);
                            int bindRes = bind(_sockFd, (sockaddr *)&addr, len);
                            if (bindRes < 0)
                            {
                                LOG("bind error");
                                return false;
                            }
                            return true;
                        }
                        // 开始监听
                        bool listenSocket(int backlog = MAX_LISTEN)
                        {
                            int listenRes = listen(_sockFd, backlog);
                            if (listenRes < 0)
                            {
                                LOG("listen error");
                                return false;
                            }
                            return true;
                        }
                        // 向服务器发起连接
                        // 这个接口是给客户端向服务端发起连接请求的
                        bool connectSocket(const std::string &ip, uint16_t port)
                        {
                            sockaddr_in addr;
                            addr.sin_family = AF_INET;
                            addr.sin_port = htons(port);
                            addr.sin_addr.s_addr = inet_addr(ip.c_str());
                            socklen_t len = sizeof(sockaddr_in);
                            int connectRes = connect(_sockFd, (sockaddr *)&addr, len);
                            if (connectRes < 0)
                            {
                                LOG("connect error");
                                return false;
                            }
                            return true;
                        }
                        // 获取新连接
                        int acceptConnect()
                        {
                            int newFd = accept(_sockFd, nullptr, nullptr);
                            if (newFd < 0)
                            {
                                LOG("accept error");
                                return -1;
                            }
                            return newFd;
                        }
                        // 接收数据
                        ssize_t recvData(void *buf, size_t len, int flag = 0)
                        {
                            ssize_t recvRes = recv(_sockFd, buf, len, flag);
                            if (recvRes <= 0)
                            {
                                if (errno == EAGAIN || errno == EINTR)
                                {
                                    return 0;
                                }
                                LOG("recv error");
                                return -1;
                            }
                            return recvRes;
                        }
                        ssize_t nonBlockRecv(void *buf, size_t len)
                        {
                            return recvData(buf, len, MSG_DONTWAIT);
                        }
                        // 发送数据
                        ssize_t sendData(const void *buf, size_t len, int flag = 0)
                        {
                            ssize_t sendRes = send(_sockFd, buf, len, flag);
                            if (sendRes < 0)
                            {
                                if (errno == EAGAIN || errno == EINTR)
                                {
                                    return 0;
                                }
                                LOG("send error");
                                return -1;
                            }
                            return sendRes;
                        }
                        ssize_t nonBlockSend(void *buf, size_t len)
                        {
                            if (len == 0)
                            {
                                return 0;
                            }
                            return sendData(buf, len, MSG_DONTWAIT);
                        }
                        // 关闭套接字
                        void closeSocket()
                        {
                            if (_sockFd != -1)
                            {
                                close(_sockFd);
                                _sockFd = -1;
                            }
                        }
                        // 创建一个服务端连接
                        // 这个函数接口是服务端调用的,用来创建一个服务端连接
                        bool createServer(uint16_t port, const std::string &ip = "", bool blockFlag = false)
                        {
                            if (createSocket() == false)
                            {
                                return false;
                            }
                            if (bindSocket(ip, port) == false)
                            {
                                return false;
                            }
                            if (listenSocket() == false)
                            {
                                return false;
                            }
                            if (blockFlag)
                            {
                                nonBlock();
                            }
                            reuseAddress();
                            return true;
                        }
                        // 创建一个客户端连接
                        bool createClient(uint16_t port, const std::string &ip)
                        {
                            if (createSocket() == false)
                            {
                                return false;
                            }
                            if (connectSocket(ip, port) == false)
                            {
                                return false;
                            }
                            return true;
                        }
                        // 设置套接字选项---开启地址端口重用
                        void reuseAddress()
                        {
                            int val = 1;
                            setsockopt(_sockFd, SOL_SOCKET, SO_REUSEADDR, (void *)&val, sizeof(int));
                            val = 1;
                            setsockopt(_sockFd, SOL_SOCKET, SO_REUSEPORT, (void *)&val, sizeof(int));
                        }
                        // 设置套接字阻塞属性---设置为非阻塞
                        void nonBlock()
                        {
                            int flag = fcntl(_sockFd, F_GETFL, 0);
                            fcntl(_sockFd, F_SETFL, flag | O_NONBLOCK);
                        }
                        int getSocketFd()
                        {
                            return _sockFd;
                        }
                    private:
                        // socket创建的套接字
                        int _sockFd;
                    };
                    

                    三、Acceptor模块

                    1.Acceptor模块的设计与实现

                    Acceptor模块就是主Reactor获取新连接的模块,它与主Reactor关联,由最顶层的TcpServer模块将它与EventLoop模块关联起来,这个模块比较简单,我们可以通过设计与实现来了解该模块的原理以及功能。首先看一下Acceptor的类内成员变量:

                    • _socket:该变量用于创建服务器,我们上面已经封装了Socket类,所以可以通过_socket这个变量快速地调用Socket类接口,通过socket套接字操作搭建服务器。
                    • _eventLoop:该变量就是主Reactor,只负责监控新连接的到来。
                    • _channel:该变量是主Reactor的事件管理类变量。
                    • _acceptCallBack:该变量是新连接到来以后执行的回调函数,这个回调函数由外部设置,Acceptor类负责调用。
                      private:
                          Socket _socket;// 用于创建监听套接字
                          EventLoop *_eventLoop;// 用于对监听套接字进行事件监控
                          Channel _channel;// 用于对监听套接字进行事件管理
                          AcceptCallBack _acceptCallBack;
                      

                      接下来是Acceptor类的构造函数,构造函数初始化主Reactor以及创建服务器,同时设置主Reactor可读事件触发的回调函数。

                          // 不能将启动读事件监控放到构造函数中,必须在设置回调函数之后,再去启动
                          // 否则有可能造成启动监控后,立即有事件到来了,但是处理的时候回调函数还没设置,所以新连接得不到处理,并且资源泄露
                          Acceptor(EventLoop *eventLoop, uint16_t port)
                          :_eventLoop(eventLoop), _socket(createServer(port)), _channel(eventLoop, _socket.getSocketFd())
                          {
                              _channel.setReadAbleCallBack(std::bind(&Acceptor::handleRead, this));
                          }
                      

                      构造函数里设置的主Reactor可读事件触发回调函数是handleRead函数,这个函数也是Acceptor类内成员函数,该函数首先是调用accept函数操作,获取新连接。因为TCP服务器的套接字操作步骤是,socket创建套接字、bind绑定地址信息、listen设置监听状态,最后accept获取连接。当有连接到来的时候,进程会从accept函数调用处返回,返回以后,handleRead函数再去调用外部设置的新连接到来触发的回调函数,即调用成员变量_acceptCallBack。

                          // 监听套接字的读事件回调函数---获取新连接,调用_acceptCallBack函数进行新连接处理
                          void handleRead()
                          {
                              int newFd = _socket.acceptConnect();
                              if(newFd < 0)
                              {
                                  return;
                              }
                              if(_acceptCallBack)
                              {
                                  _acceptCallBack(newFd);
                              }
                          }
                      

                      事实上这个_acceptCallBack是由TcpServer模块设置的,这一整条链路的逻辑是这样的:Acceptor构造函数初始化_socket时调用createServer函数创建服务器,createServer函数只会执行socket创建套接字、bind绑定地址信息、listen设置监听状态的操作,并不会调用accept获取连接,因为accept函数如果没有连接到来的话是会让线程或进程阻塞的。Acceptor构造函数设置的主Reactor的可读事件回调函数是handleRead,这个handleRead函数只会在可读事件触发时被调用,主Reactor的可读事件只会在新连接到来时触发。所以当有新连接到来时,主Reactor的可读事件触发,调用handleRead函数执行。

                      我们再来看看handleRead函数内部执行了什么,首先是调用accept函数获取连接,也就是说这种写法不用进程或线程阻塞在accept函数调用处等待连接的到来,而是当连接到来的时候,以可读事件触发的形式调用handleRead函数去执行accept获取连接,这一次获取连接是不需要等待就能获取成功的,所以接下来可以执行外部设置的连接到来回调函数_acceptCallBack。

                      这样又回到上面说的,这个_acceptCallBack是由TcpServer模块设置的,设置的函数就是newConnection,这个函数后续会介绍,它的功能就是为新连接创建Connection对象去管理。至此,主Reactor如何获取新连接,以及Acceptor的原理和功能我们都清楚了。这个模块涉及了一些回调函数,并且还与其它多个模块关联,如果不仔细梳理,很难彻底弄清楚Acceptor模块到底是做什么的,也很难弄清楚该模块与其它模块的关系。

                      2.Acceptor模块完整代码实现

                      介绍完了关键几个代码之后,剩下的函数接口都比较简单,这里就不过多讲解了,贴上代码即可。

                      using AcceptCallBack = std::function;
                      class Acceptor
                      {
                      public:
                          // 不能将启动读事件监控放到构造函数中,必须在设置回调函数之后,再去启动
                          // 否则有可能造成启动监控后,立即有事件到来了,但是处理的时候回调函数还没设置,所以新连接得不到处理,并且资源泄露
                          Acceptor(EventLoop *eventLoop, uint16_t port)
                          :_eventLoop(eventLoop), _socket(createServer(port)), _channel(eventLoop, _socket.getSocketFd())
                          {
                              _channel.setReadAbleCallBack(std::bind(&Acceptor::handleRead, this));
                          }
                          void setAcceptCallBack(const AcceptCallBack &callBack)
                          {
                              _acceptCallBack = callBack;
                          }
                          void startListen()
                          {
                              _channel.startReadAbleEvent();
                          }
                      private:
                          // 监听套接字的读事件回调函数---获取新连接,调用_acceptCallBack函数进行新连接处理
                          void handleRead()
                          {
                              int newFd = _socket.acceptConnect();
                              if(newFd < 0)
                              {
                                  return;
                              }
                              if(_acceptCallBack)
                              {
                                  _acceptCallBack(newFd);
                              }
                          }
                          int createServer(uint16_t port)
                          {
                              bool ret = _socket.createServer(port);
                              assert(ret == true);
                              return _socket.getSocketFd();
                          }
                      private:
                          Socket _socket;// 用于创建监听套接字
                          EventLoop *_eventLoop;// 用于对监听套接字进行事件监控
                          Channel _channel;// 用于对监听套接字进行事件管理
                          AcceptCallBack _acceptCallBack;
                      };
                      

                      四、定时器模块

                      1.时间轮的思想

                      为了防止一个连接长时间不与服务器进行信息交互,但又霸占着连接资源,所以我们的服务器必须要有定时器模块。因为服务器的资源是有限的,最简单的来说,每个连接就是一个socket,其实本质上是文件描述符,而Linux操作系统中文件描述符是有限资源,如果被大量这种不通信又不断开的连接霸占,那么后面到来的新连接就没办法获取文件描述符了。所以定时器模块可以让网络库的使用者设定一个超时时间,当连接距离上一次通信的时间到当前时刻的时间间隔大于超时时间,服务器会主动断开连接。

                      实现定时器模块首先需要有计时的功能,muduo库使用的计时器功能是Linux操作系统提供的timerfd,这是以文件描述符方式管理超时提醒的机制。timerfd_fd函数能创建一个系统的计时器,我们设置计时时间,操作系统会给我们返回一个文件描述符,当设置的时间到了,操作系统会向文件里写入数据,数据表示距离上一次读取超时了多少次。有了这个机制,我们可以很好地将计时功能与epoll多路转接联系起来使用。我们可以将文件描述符放在epoll模型中监控可读事件,当可读事件触发时,说明操作系统向该文件写入数据了,也就是说明时间到了,这时候再由epoll_wait函数返回可读事件,经由上层用户去处理超时连接。这里就不介绍timerfd是具体使用了,只需要知道是用来做什么的就可以,不了解如何使用的可以去学习一下接口。

                      但是如果只使用timerfd显然无法满足我们的需求,试想一下,仅仅使用timerfd来计时的话,最好的方式就是全局定义一个计时器,然后每个连接记录一个距离上次通信的时间,检查超时销毁连接的操作就变成了需要遍历所有的连接,如果超时了再将其销毁。这样的做法针对服务器中大量连接的情况是不实际的。所以需要思考的就是如何高效地组织这些定时器,能够快速地找出当前时刻已经超时的连接。muduo的做法是使用set数据结构,也就是底层使用的红黑树,把每个连接按到期时间先后排序,操作的时间复杂度是O(logN)。但除此之外,作者在书中还介绍了时间轮的方案,虽然作者没有实现在muduo库源码上,但通过作者描述的思路,我更感兴趣这一种实现方案,下面我将详细介绍时间轮方案的实现。

                      其实我翻阅了一些书籍以及搜索了网上的一些文章,定时器的实现方案还是有许多的,比如按到期时间先后排序的队列、以到期时间建立最小堆、红黑树、时间轮。这些都是比较常见的实现方案,对比了一下其实会发现,时间轮方案比其它都更优越。

                      我在项目中实现的时间轮是多级时间轮,不过我的多级时间轮只有两级,因为多了没必要,秒级时间轮和分钟级时间轮就已经能大致满足需求了,毕竟一个连接多长时间未通信销毁其实没有固定值,一般也是根据场景来确定。几秒钟几十秒就关闭连接太短了,几个小时才关闭连接似乎又太长了,所以分钟级别是比较合适的。

                      秒级时间轮和分钟级时间轮其实是一样的数据结构,它们都是有一个vector数组,这个数组有60个元素,分别代表60秒和60分钟。vector数组每一个元素下面还跟着一个数组,这个数组就是超时任务对象。具体的运作逻辑是,秒针每秒向后移动一格,然后将当前指向的所有超时任务销毁,这就是时间轮的基本思想。由于秒针是每秒向后移动一格,然后秒针走到哪里,就执行哪里的超时任务,所以一维的vector数组应该被我们设计成循环队列,这一点也不难理解,因为限制了长度是60,循环队列才能让秒针一直在转,就像时钟一样,不停地转圈圈。

                      举个例子更好地说明一下,假设初始状态,秒针和分针都指向0下标处,此时有一个5秒之后的超时任务,所以在秒针当前位置向后加5的位置,插入这个超时任务。由于秒针是每秒向前走一格的,走到5位置处刚好是5秒,也就意味着改任务的时间到了,可以执行超时处理了。

                      muduo库的模拟实现——工具部分,在这里插入图片描述,第3张

                      上图虽然只有秒级时间轮,但加入分钟级时间轮也很好理解,这其实和现实生活中的时钟是一样的道理,秒级时间轮的指针走了60格以后,分钟级时间轮的指针才能向前走1格,就好比现实中秒针走一圈,分针才向前走一格。这就是时间轮的思想,理解起来应该不困难,接下来介绍时间轮定时器模块的具体设计与实现。

                      2.TimerTask类

                      首先我们需要有一个超时任务类,因为时间轮管理的是一个个的超时任务对象,它只负责如何高效地管理这些对象,具体如何执行超时以后的操作,还是通过超时任务对象来确定的。所以TimerTask类就是设置超时任务类。它可以设置超时的时间是多少,以及超时以后该如何处理。超时以后如何处理是上层决定的事情,定时器模块需要的只是一个回调函数,超时以后调用该回调函数即可。

                      除此之外,我们不希望时间轮的指针走到超时位置时,还要一个一个TimerTask对象去执行超时对应的回调函数,我们希望这些TimerTask对象自己调用这些回调。所以我们采用RAII的思想,让TimerTask类在构造函数里完成这些超时时间、超时回调函数的设置,在析构函数里调用超时回调函数。这样就只需要在时间轮里释放TimerTask对象即可,它会在析构函数里自动调用超时回调函数,非常方便。

                      TimerTask的代码不难实现,这里就直接给出了,具体是看TimerTask对象如何在时间轮里被使用。

                      /// @brief 定时器任务类,这个类实例化出来的对象,在生命周期内就是一个定时任务,当生命周期结束的时候就代表超时了,就会执行超时的任务
                      ///        这里其实使用了RAII的思想
                      class TimerTask
                      {
                      public:
                          // 构造函数,需要告诉我定时器任务对象的id是什么,超时时间是什么,以及超时以后处理的任务是什么
                          TimerTask(uint64_t id, uint32_t timeout, const TaskFunc &task)
                              : _id(id),
                                _timeout(timeout),
                                _task(task),
                                _cancelFlag(false)
                          {
                          }
                          // 析构函数,在析构的时候执行超时任务
                          ~TimerTask()
                          {
                              // 如果上层用户没有取消超时任务,就执行超时任务
                              if (_cancelFlag == false)
                              {
                                  _task();
                              }
                              // 调用release回调函数
                              _release();
                          }
                          void setRelease(const ReleaseFunc &release)
                          {
                              _release = release;
                          }
                          uint32_t getTimeOut()
                          {
                              return _timeout;
                          }
                          void cancel()
                          {
                              _cancelFlag = true;
                          }
                      private:
                          uint64_t _id;         // 定时器任务对象的ID
                          uint32_t _timeout;    // 定时任务的超时时间
                          bool _cancelFlag;     // false表示没有被取消,true表示被取消
                          TaskFunc _task;       // 定时器对象要执行的定时任务
                          ReleaseFunc _release; // 用于删除TimerWheel中保存的定时器对象信息
                      };
                      

                      3.TimerWheel类

                      TimerWheel类就是我们要实现的时间轮,首先看一下TimerWheel类成员变量的设计:

                      private:
                          int _second_tick; // 当前的秒针,走到哪里释放哪里,就相当于执行哪里的任务
                          int _minute_tick; // 当前的分针
                          int _capacity;    // 时间轮表盘最大数量,其实就是最大延迟时间
                          // 这是个二维数组,其实就是一个桶结构,一维的每一个元素代表每1s的时间,
                          // 每秒下面挂的就是这一时刻的定时器任务对象的shared_ptr
                          std::vector> _second_wheel; // 秒级时间轮
                          std::vector> _minute_wheel; // 分钟级时间轮
                          // _timers是用来保存定时器任务对象的weak_ptr的,key值是定时器任务对象的ID值,用来索引对应的定时器任务对象
                          // value值是保存定时器任务对象的weak_ptr
                          // 这里用weak_ptr的原因是不会使shared_ptr的引用计数增加
                          // 如果使用shared_ptr的话,在插入到_timers中或者从_timers中获取对象的时候都会拷贝shared_ptr对象,使得引用计数增加,这样是不合理的
                          std::unordered_map _timers;
                          EventLoop *_eventLoop;
                          int _timerfd; // 定时器描述符
                          // 定时器任务对象的时间管理对象指针
                          std::unique_ptr _timerChannel;
                      

                      成员变量中首先需要的是两个时间轮,分别是秒级时间轮和分钟级时间轮,以及两个时间轮的指针,分别代表秒针和分针。还需要一个_timers哈希表用来保存所有的定时任务TimerTask对象,这也很有必要,因为我们需要将TimerTask对象管理起来,以后需要对其进行查找,使用哈希表效率比较高。

                      重点的是,我们这里时间轮管理的定时任务TimerTask对象,并不是使用原生的对象指针,而是使用智能指针,我认为这是设计的一个巧妙之处,这里用智能指针主要有以下场景的考虑:

                      • 当我们需要刷新定时任务的时间时,比如说在2s的时候一个新连接到来了,服务器为这个连接建立了一个定时任务,5s之后如果不通信的话就销毁连接,也就是应该在第7s销毁连接,那么这个定时任务就应该添加在秒级时间轮下标为7的位置。但如果当秒针走到3s处,这个连接有一次新的通信,我们就应该刷新定时任务的销毁时间,就不应该是第7s销毁了,因为最近一次通信是第3s,按照5s之后不活跃销毁的规则,真正销毁的时间应该是第8s。这时如果不使用智能指针,而是使用原生的对象指针是会出问题的,因为第7s和第8s处都有这个定时任务,指针会被释放两次,这是会出错误的。所以使用shared_ptr智能指针,在这个地方非常有必要,我们可以在第8s新插入这个定时任务对象的shared_ptr,即使秒针走到第7s释放了原来的shared_ptr,由于引用计数不为0,所以不会真正释放对象,等到第8s的时候才会真正释放,这就是使用shared_ptr智能指针的原因。

                        接下来再介绍多层级的时间轮是如何实现的:首先定时任务TimerTask对象会设置timeout超时时间,如果这个超时时间大于或等于60,说明超时时间在一分钟以上,那就需要使用分钟级时间轮和秒级时间轮了。否则的话秒级时间轮就够用了。如果要用分钟级时间轮,假设超时时间timeout为110s,即1min50s,那么就在分针级时间轮的第1格插入这个TimerTask对象。当秒针走过了60格以后,分针就会向前走1格,此时并不释放分针级时间轮第一个的这个TimerTask对象,而是将该定时任务对象转移到秒级时间轮对应的格子中,比如这个110s的TimerTask对象,就应该转移到秒级时间轮的第50格。当下一次秒针再走到第50格的时候,前后就一共走了60+50=110s,就可以释放这个TimerTask对象了。也就是说,分钟级时间轮不处理定时任务,只将定时任务转移到秒级时间轮,只有秒级时间轮才会处理超时任务。

                        muduo库的模拟实现——工具部分,在这里插入图片描述,第4张

                        接下来我们可以介绍addTimer、refreshTimer和runTimerTask三个函数接口了。

                        addTimer:

                        首先是addTimer函数接口,这个接口是添加一个定时任务TimerTask对象到时间轮中,由于我们有秒级时间轮和分钟级时间轮,所以需要对TimerTask对象的超时时间timeout进行判断,如果timeout大于或等于60,即超过一分钟以上,那就需要将这个TimerTask对象放在分钟级时间轮上。否则的话,就放在秒级时间轮上。该函数的具体实现如下:

                        	// 添加定时任务到EventLoop中
                            void addTimerToEventLoop(uint64_t id, uint32_t timeout, const TaskFunc &task)
                            {
                                // 首先new一个定时器任务对象出来,交给shared_ptr管理
                                TaskSharedPtr ptr(new TimerTask(id, timeout, task));
                                // 设置定时器任务对象的release回调函数,设置为removeTimer函数
                                // 这个release函数是用来清理TimerWheel中保存的定时器任务对象信息的
                                ptr->setRelease(std::bind(&TimerWheel::removeTimer, this, id));
                                // 设置完以后计算超时时间,如果timeout>60,说明要用到分钟轮
                                if (timeout >= 60)
                                {
                                    int minute_timeout = timeout / 60;
                                    int minute_pos = (_minute_tick + minute_timeout) % _capacity;
                                    _minute_wheel[minute_pos].push_back(ptr);
                                }
                                else
                                {
                                    int second_pos = (_second_tick + timeout) % _capacity;
                                    _second_wheel[second_pos].push_back(ptr);
                                }
                                // 构造一个weak_ptr到_timers中,保存定时器任务对象的信息
                                _timers[id] = TaskWeakPtr(ptr);
                            }
                        

                        refreshTimer:

                        refreshTimer是刷新定时任务的函数接口,也就是上面提到的使用智能指针在这种场景下的好处。实现这个接口也不难,只需要在当前指针加上timeout的位置处,插入一个管理TimerTask对象的shared_ptr智能指针,就可以实现刷新操作。

                            // 刷新或者延迟定时任务
                            bool refreshTimerInEventLoop(uint64_t id)
                            {
                                // 通过保存的定时器对象的weak_ptr构造一个shared_ptr出来,添加到时间轮中
                                // 首先通过定时器任务ID到_timers中查找对应的定时器任务对象的weak_ptr
                                auto iter = _timers.find(id);
                                if (iter == _timers.end())
                                {
                                    return false;
                                }
                                // 找到了以后,将对应的weak_ptr转换成shared_ptr
                                TaskSharedPtr ptr = iter->second.lock();
                                // 获取这个定时器任务对象的timeout超时时间
                                int timeout = ptr->getTimeOut();
                                if (timeout >= 60)
                                {
                                    int minute_timeout = timeout / 60;
                                    int minute_pos = (_minute_tick + minute_timeout) % _capacity;
                                    _minute_wheel[minute_pos].push_back(ptr);
                                }
                                else
                                {
                                    int second_pos = (_second_tick + timeout) % _capacity;
                                    _second_wheel[second_pos].push_back(ptr);
                                }
                                return true;
                            }
                        

                        runTimerTask:

                        runTimerTask函数接口其实就是让秒针每秒向前走一格,如果秒针已经走了一圈了,就让分针向前走一格。因此,这个函数必须每秒钟被调用一次,具体如何保证每秒钟被调用一次我们下面再介绍,这里先介绍该函数接口的实现。其实实现也是非常简单,就是每次调用都让秒针向后移动一格,然后释放秒级时间轮秒针位置的所有TimerTask对象,这样就会执行这个TimerTask对象的析构函数从而去执行超时处理。如果秒针走了一圈了,那就让分针向前走一格,然后将分钟级时间轮中分针指向位置的TimerTask对象转移到秒级时间轮的对应位置。分钟级时间轮不处理超时任务,只将超时任务转移给秒级时间轮去处理。

                            void runTimerTask()
                            {
                                // 首先判断秒针是否已经走满一圈,是的话就让分针进一格
                                if (_second_tick + 1 >= 60)
                                {
                                    _minute_tick = (_minute_tick + 1) % _capacity;
                                    for(int i = 0; i < _minute_wheel[_minute_tick].size(); i++)
                                    {
                                        int timeout = _minute_wheel[_minute_tick][i]->getTimeOut();
                                        // 分钟级时间轮的时间到了,将定时任务转移到秒级时间轮对应的位置
                                        _second_wheel[timeout % 60].push_back(_minute_wheel[_minute_tick][i]);
                                    }
                                    _minute_wheel[_minute_tick].clear();
                                }
                                // 每一秒更新一次_tick的位置,相当于每一秒时钟向后走一步
                                _second_tick = (_second_tick + 1) % _capacity;
                                // 更新完以后,就销毁该时刻下对应的所有定时器任务
                                _second_wheel[_second_tick].clear();
                            }
                        

                        到这里,我们就基本上实现了时间轮的所有核心操作,但是现在的时间轮只有操作和管理定时任务TimerTask对象的功能,定时器模块还需要计时功能,没有计时功能怎么知道有没有超时呢?所以接下来我们需要实现定时器模块的计时功能。

                        我们之前也提过,muduo库的计时功能是通过Linux提供的timerfd机制实现的,操作系统会为我们创建一个文件,超时了就向文件中写入数据。所以我们可以将该文件的文件描述符用epoll监控起来,一旦可读事件触发,说明文件中有数据到来,也就是说提醒我们时间到了。所以首先需要创建一个timerfd,并且我们需要将超时时间设置为1s,因为我们希望操作系统每秒提醒我们调用一次runTimerTask函数,这样就能让时间轮的秒针每秒向前移动一格。createTimerFd函数实现如下:

                            static int createTimerFd()
                            {
                                // 使用timerfd_create函数会创建一个定时器的文件描述符
                                // 这个文件描述符是操作系统帮我们管理的,一旦超时,操作系统就会向文件里写入数据
                                // 每次读取出来的数据表示距离上一次读取超时了多少次
                                int timerfd = timerfd_create(CLOCK_MONOTONIC, 0);
                                if (timerfd < 0)
                                {
                                    LOG("timerfd_create error");
                                    abort();
                                }
                                // 这里将超时时间设置为1s,每1s后超时一次
                                struct itimerspec itime;
                                itime.it_value.tv_sec = 1;
                                itime.it_value.tv_nsec = 0; // 第一次超时时间为1s后
                                itime.it_interval.tv_sec = 1;
                                itime.it_interval.tv_nsec = 0; // 第一次超时后,每次超时的时间间隔
                                timerfd_settime(timerfd, 0, &itime, nullptr);
                                return timerfd;
                            }
                        

                        我们在TimerWhell时间轮类的构造函数处就应该调用createTimerFd去创建timerfd,并且将该文件描述符交给epoll去监控,设置可读事件触发的回调函数为timeout,这个函数接下来我们会实现,然后启动epoll的可读事件监控,这样就能监控timerfd文件描述符了,由于我们设置的timerfd超时时间是1s,所以操作系统每1s都会向文件描述符写入数据,epoll每1s都会监控到可读事件,然后调用timeout函数。构造函数的实现如下:

                            // 构造函数,需要传递进入eventLoop对象
                            // 因为定时器任务也是需要被reactor管理的
                            // 添加定时器任务、刷新定时器任务、取消定时器任务
                            // 这些都会作为事件让eventLoop对象管理,eventLoop对象再交给Poller对象去监控这些事件
                            // 监控事件发生以后,再让Channel对象去处理
                            TimerWheel(EventLoop *eventLoop)
                                : _capacity(60), _second_tick(0), _minute_tick(0), _second_wheel(_capacity), _minute_wheel(_capacity),
                                  _eventLoop(eventLoop), _timerfd(createTimerFd()),
                                  _timerChannel(new Channel(_eventLoop, _timerfd))
                            {
                                // _timerChannel是定时器任务事件管理的对象,用来管理定时器任务的事件
                                // 这里设置_timerChannel的可读事件,设置为timeOut,一旦可读事件就绪,就会调用timeout
                                _timerChannel->setReadAbleCallBack(std::bind(&TimerWheel::timeOut, this));
                                // 开始可读事件的监控
                                _timerChannel->startReadAbleEvent();
                            }
                        

                        接下来是timeout函数,这个函数是被绑定在timerfd文件描述符的可读事件下的,timerfd的可读事件触发就会被调用,所以这个函数每秒钟会被调用一次。所以我们要利用它每秒钟被调用一次的特点,通过该函数去调用runTimerTask函数,让秒针每秒向后走一格,这样整个定时器模块的逻辑就打通了。

                            // 这个函数已经被绑定在_timerChannel对象中,也就是说被作为可读事件监控起来了
                            // 一旦可读事件触发,就会调用这个函数
                            // 那什么时候会触发呢?当这个事件关联的文件描述符也就是_timerfd有数据到来的时候
                            // 因为_timerfd是操作系统为我们创建的一个定时器文件,是操作系统在帮我们管理
                            // 并且我们设置了每1s超时一次,所以操作系统每1s都会提醒我们一次
                            // 也就是说每1s操作系统都会向_timerfd中写入数据,那么每1s可读事件都会就绪,就会调用timeout函数
                            // 调用以后就会去读取_timerfd里的内容,并且执行runTimerTask函数去处理超时任务
                            // 这里就是定时器的精髓,很好地利用操作系统的timerfd机制,将文件描述符交给epoll去监控管理
                            // 一旦超时了操作系统会通知,非常牛逼
                            void timeOut()
                            {
                                // 先调用readTimerFd函数去读取timerfd里的数据
                                // times表示距离上一次读取超时了多少次
                                int times = readTimerFd();
                                // 循环执行runTimerTask
                                // 超时了多少次就执行多少次,runTimerTask函数是将指时钟向后移动1s,然后执行对应时刻的所有超时任务
                                for (int i = 0; i < times; i++)
                                {
                                    runTimerTask();
                                }
                            }
                        

                        至此,TimerWhell时间轮类的实现基本差不多了,剩下的接口比较简单,就不过多介绍了,这里可以直接给完整的代码:

                        using TaskWeakPtr = std::weak_ptr;
                        using TaskSharedPtr = std::shared_ptr;
                        /// @brief 时间轮类,用来管理定时任务对象的类,实现超时处理的方式是时间轮
                        class TimerWheel
                        {
                        public:
                            // 构造函数,需要传递进入eventLoop对象
                            // 因为定时器任务也是需要被reactor管理的
                            // 添加定时器任务、刷新定时器任务、取消定时器任务
                            // 这些都会作为事件让eventLoop对象管理,eventLoop对象再交给Poller对象去监控这些事件
                            // 监控事件发生以后,再让Channel对象去处理
                            TimerWheel(EventLoop *eventLoop)
                                : _capacity(60), _second_tick(0), _minute_tick(0), _second_wheel(_capacity), _minute_wheel(_capacity),
                                  _eventLoop(eventLoop), _timerfd(createTimerFd()),
                                  _timerChannel(new Channel(_eventLoop, _timerfd))
                            {
                                // _timerChannel是定时器任务事件管理的对象,用来管理定时器任务的事件
                                // 这里设置_timerChannel的可读事件,设置为timeOut,一旦可读事件就绪,就会调用timeout
                                _timerChannel->setReadAbleCallBack(std::bind(&TimerWheel::timeOut, this));
                                // 开始可读事件的监控
                                _timerChannel->startReadAbleEvent();
                            }
                            ~TimerWheel()
                            {
                            }
                            // 把所有对定时任务的操作都放在一个线程中完成,所以放在EventLoop线程中比较合适
                            // 因为定时器中有个_timers成员,定时器信息的操作有可能在多线程中进行,因此需要考虑线程安全问题
                            // 但是又不想加锁,因为加锁耗费资源,所以把所有定时操作放在一个线程中进行
                            // 这里和EventLoop模块、线程池模块的设计解决了这种线程不安全的问题
                            // 因为设计的时候一个EventLoop对象就会被分配一个线程,并且EventLoop对象内部会绑定这个线程的ID
                            // 这些函数将来都会作为任务通过EventLoop对象的runInLoop函数添加到任务队列
                            // 在这里就会判断当前执行的线程是不是EventLoop对象对应的线程
                            // 如果是的话就直接执行任务,如果不是的话就加入任务队列,等到下一次被EventLoop对象对应的线程调用
                            // 这样就能保证这些addTimer类似的函数能够只被一个线程调用,不会被其它线程调用
                            // 所以临界资源_timers是安全的,就不会有线程安全了
                            void addTimer(uint64_t id, uint32_t timeout, const TaskFunc &task);
                            void refreshTimer(uint64_t id);
                            void cancelTimer(uint64_t id);
                            // 存在线程安全问题,所以只能在EventLoop线程调用,不能被其它线程调用
                            bool hasTimer(uint64_t id);
                        private:
                            // 刷新或者延迟定时任务
                            bool refreshTimerInEventLoop(uint64_t id)
                            {
                                // 通过保存的定时器对象的weak_ptr构造一个shared_ptr出来,添加到时间轮中
                                // 首先通过定时器任务ID到_timers中查找对应的定时器任务对象的weak_ptr
                                auto iter = _timers.find(id);
                                if (iter == _timers.end())
                                {
                                    return false;
                                }
                                // 找到了以后,将对应的weak_ptr转换成shared_ptr
                                TaskSharedPtr ptr = iter->second.lock();
                                // 获取这个定时器任务对象的timeout超时时间
                                int timeout = ptr->getTimeOut();
                                if (timeout >= 60)
                                {
                                    int minute_timeout = timeout / 60;
                                    int minute_pos = (_minute_tick + minute_timeout) % _capacity;
                                    _minute_wheel[minute_pos].push_back(ptr);
                                }
                                else
                                {
                                    int second_pos = (_second_tick + timeout) % _capacity;
                                    _second_wheel[second_pos].push_back(ptr);
                                }
                                return true;
                            }
                            // 添加定时任务到EventLoop中
                            void addTimerToEventLoop(uint64_t id, uint32_t timeout, const TaskFunc &task)
                            {
                                // 首先new一个定时器任务对象出来,交给shared_ptr管理
                                TaskSharedPtr ptr(new TimerTask(id, timeout, task));
                                // 设置定时器任务对象的release回调函数,设置为removeTimer函数
                                // 这个release函数是用来清理TimerWheel中保存的定时器任务对象信息的
                                ptr->setRelease(std::bind(&TimerWheel::removeTimer, this, id));
                                // 设置完以后计算超时时间,如果timeout>60,说明要用到分钟轮
                                if (timeout >= 60)
                                {
                                    int minute_timeout = timeout / 60;
                                    int minute_pos = (_minute_tick + minute_timeout) % _capacity;
                                    _minute_wheel[minute_pos].push_back(ptr);
                                }
                                else
                                {
                                    int second_pos = (_second_tick + timeout) % _capacity;
                                    _second_wheel[second_pos].push_back(ptr);
                                }
                                // 构造一个weak_ptr到_timers中,保存定时器任务对象的信息
                                _timers[id] = TaskWeakPtr(ptr);
                            }
                            // 执行定时任务
                            // 这个函数应该每秒钟被执行一次,相当于秒针向后走了一步
                            // 这个函数会被timeout函数调用,timeout函数会在超时的时候被调用
                            // 而TimerFd我们设置的是1s钟超时,所以这里的逻辑就是,每隔1s,操作系统会向timerfd中写入数据代表时间到了
                            // 然后epoll监控到timerfd文件描述符的可读事件触发,就调用对应的可读事件回调函数,也就是timeout函数
                            // timeout函数再调用runTimerTask函数,这样就能保证一秒钟执行这个函数一次
                            void runTimerTask()
                            {
                                // 首先判断秒针是否已经走满一圈,是的话就让分针进一格
                                if (_second_tick + 1 >= 60)
                                {
                                    _minute_tick = (_minute_tick + 1) % _capacity;
                                    for(int i = 0; i < _minute_wheel[_minute_tick].size(); i++)
                                    {
                                        int timeout = _minute_wheel[_minute_tick][i]->getTimeOut();
                                        // 分钟级时间轮的时间到了,将定时任务转移到秒级时间轮对应的位置
                                        _second_wheel[timeout % 60].push_back(_minute_wheel[_minute_tick][i]);
                                    }
                                    _minute_wheel[_minute_tick].clear();
                                }
                                // 每一秒更新一次_tick的位置,相当于每一秒时钟向后走一步
                                _second_tick = (_second_tick + 1) % _capacity;
                                // 更新完以后,就销毁该时刻下对应的所有定时器任务
                                _second_wheel[_second_tick].clear();
                            }
                            // 删除保存的定时器任务对象的信息,也就是在_timers中的内容
                            void removeTimer(uint64_t id)
                            {
                                auto iter = _timers.find(id);
                                if (iter != _timers.end())
                                {
                                    _timers.erase(iter);
                                }
                            }
                            // 取消定时器任务
                            void cancelTimerInEventLoop(uint64_t id)
                            {
                                // 首先看该定时器任务是否在_timers中
                                // 如果不在的话,说明没有这个定时器任务,直接返回
                                auto iter = _timers.find(id);
                                if (iter == _timers.end())
                                {
                                    return;
                                }
                                // 找到了以后构造shared_ptr指针对象,然后去执行定时器任务对象的cancel函数
                                // 这个函数就是将取消标志位设置为true,这样在定时器任务对象析构的时候,就不会去执行定时器对象要处理的任务
                                // 只会执行release回调函数,也就是删除在_timers中保存的定时器对象的信息
                                TaskSharedPtr ptr = iter->second.lock();
                                if (ptr)
                                {
                                    ptr->cancel();
                                }
                            }
                            // 创建Timerfd
                            static int createTimerFd()
                            {
                                // 使用timerfd_create函数会创建一个定时器的文件描述符
                                // 这个文件描述符是操作系统帮我们管理的,一旦超时,操作系统就会向文件里写入数据
                                // 每次读取出来的数据表示距离上一次读取超时了多少次
                                int timerfd = timerfd_create(CLOCK_MONOTONIC, 0);
                                if (timerfd < 0)
                                {
                                    LOG("timerfd_create error");
                                    abort();
                                }
                                // 这里将超时时间设置为1s,每1s后超时一次
                                struct itimerspec itime;
                                itime.it_value.tv_sec = 1;
                                itime.it_value.tv_nsec = 0; // 第一次超时时间为1s后
                                itime.it_interval.tv_sec = 1;
                                itime.it_interval.tv_nsec = 0; // 第一次超时后,每次超时的时间间隔
                                timerfd_settime(timerfd, 0, &itime, nullptr);
                                return timerfd;
                            }
                            // 读取timerfd里的数据
                            int readTimerFd()
                            {
                                // 这个times就是读取出来的内容,表示距离上一次读取超时了多少次
                                uint64_t times;
                                int readRes = read(_timerfd, ×, 8);
                                if (readRes < 0)
                                {
                                    LOG("read error");
                                    abort();
                                }
                                return times;
                            }
                            // 这个函数已经被绑定在_timerChannel对象中,也就是说被作为可读事件监控起来了
                            // 一旦可读事件触发,就会调用这个函数
                            // 那什么时候会触发呢?当这个事件关联的文件描述符也就是_timerfd有数据到来的时候
                            // 因为_timerfd是操作系统为我们创建的一个定时器文件,是操作系统在帮我们管理
                            // 并且我们设置了每1s超时一次,所以操作系统每1s都会提醒我们一次
                            // 也就是说每1s操作系统都会向_timerfd中写入数据,那么每1s可读事件都会就绪,就会调用timeout函数
                            // 调用以后就会去读取_timerfd里的内容,并且执行runTimerTask函数去处理超时任务
                            // 这里就是定时器的精髓,很好地利用操作系统的timerfd机制,将文件描述符交给epoll去监控管理
                            // 一旦超时了操作系统会通知,做到了异步的事件驱动机制,非常牛逼
                            void timeOut()
                            {
                                // 先调用readTimerFd函数去读取timerfd里的数据
                                // times表示距离上一次读取超时了多少次
                                int times = readTimerFd();
                                // 循环执行runTimerTask
                                // 超时了多少次就执行多少次,runTimerTask函数是将指时钟向后移动1s,然后执行对应时刻的所有超时任务
                                for (int i = 0; i < times; i++)
                                {
                                    runTimerTask();
                                }
                            }
                        private:
                            int _second_tick; // 当前的秒针,走到哪里释放哪里,就相当于执行哪里的任务
                            int _minute_tick; // 当前的分针
                            int _capacity;    // 时间轮表盘最大数量,其实就是最大延迟时间
                            // 这是个二维数组,其实就是一个桶结构,一维的每一个元素代表每1s的时间,
                            // 每秒下面挂的就是这一时刻的定时器任务对象的shared_ptr
                            std::vector> _second_wheel; // 秒级时间轮
                            std::vector> _minute_wheel; // 分钟级时间轮
                            // _timers是用来保存定时器任务对象的weak_ptr的,key值是定时器任务对象的ID值,用来索引对应的定时器任务对象
                            // value值是保存定时器任务对象的weak_ptr
                            // 这里用weak_ptr的原因是不会使shared_ptr的引用计数增加
                            // 如果使用shared_ptr的话,在插入到_timers中或者从_timers中获取对象的时候都会拷贝shared_ptr对象,使得引用计数增加,这样是不合理的
                            std::unordered_map _timers;
                            EventLoop *_eventLoop;
                            int _timerfd; // 定时器描述符
                            // 定时器任务对象的时间管理对象指针
                            std::unique_ptr _timerChannel;
                        };
                        

                        五、线程池模块

                        由于我们实现的是多线程模型,所以必须要实现一个线程池,我们要实现one loop one thread服务器,主Reactor运行在主线程上,只负责接收获取连接,从属Reactor运行在子线程上,负责处理连接的IO事件,所以我们可能会创建多个线程来运行从属Reactor,就需要线程池来管理这些线程。

                        1.LoopThread类

                        LoopThread类实现的是单个线程的逻辑,我们希望创建单个线程对象,即LoopThread对象时,也意味着创建一个从属Reactor。也就是说,一个子线程与一个从属Reactor绑定在一起。我们绑定线程的执行函数为threadEntry。在threadEntry这个函数内部,我们创建从属Reactor,然后让该从属Reactor启动监控,这个启动监控其实是一个while(true)死循环,也就是它会一直启动。threadEntry函数的实现如下:

                            // 实例化EventLoop对象,唤醒_cond上有可能阻塞的线程,并且开始运行EventLoop模块的功能
                            void threadEntry()
                            {
                                // 这个函数是每次被线程执行的函数
                                // 每次执行都会新建一个EventLoop对象,创建好之后调用start启动
                                EventLoop loop;
                                {
                                    std::unique_lock lock(_mutex);
                                    _eventLoop = &loop;
                                    _cond.notify_all();
                                }
                                loop.start();
                                // 出了这个函数EventLoop会自动销毁
                                // 但实际上start是一个死循环,除非关闭连接或者连接出错了
                                // 就会去调用连接关闭回调函数和错误处理回调函数来关闭连接,这样才会退出
                                // 所以也就是说这个创建的loop是贯穿一个连接整个生命周期的
                                // 只有连接结束了这个loop才会被销毁
                            }
                        

                        这样单个线程的任务就完成了,LoopThread类的完整代码实现如下:

                        class LoopThread
                        {
                        public:
                            // 创建线程,设定线程入口函数
                            // 这里创建线程的时候将线程_thread初始化函数设置为threadEntry函数
                            // 也就是说创建的这个线程每次会去执行threadEntry函数
                            LoopThread()
                            :_eventLoop(nullptr), _thread(std::thread(&LoopThread::threadEntry, this))
                            {}
                            // 返回当前线程关联的EventLoop对象指针
                            EventLoop *getEventLoop()
                            {
                                EventLoop *loop = nullptr;
                                {
                                    // 这里必须加锁等待条件变量_eventLoop不为空成立才能返回_eventLoop
                                    // 因为刚创建线程可能还没有执行threadEntry函数,就来获取_eventLoop的话,获取到的是空值
                                    // 必须等threadEntry创建了_eventLoop对象才能返回
                                    std::unique_lock lock(_mutex);
                                    _cond.wait(lock, [&](){
                                        return _eventLoop != nullptr;
                                    });
                                    loop = _eventLoop;
                                }
                                return loop;
                            }
                        private:
                            // 实例化EventLoop对象,唤醒_cond上有可能阻塞的线程,并且开始运行EventLoop模块的功能
                            void threadEntry()
                            {
                                // 这个函数是每次被线程执行的函数
                                // 每次执行都会新建一个EventLoop对象,创建好之后调用start启动
                                EventLoop loop;
                                {
                                    std::unique_lock lock(_mutex);
                                    _eventLoop = &loop;
                                    _cond.notify_all();
                                }
                                loop.start();
                                // 出了这个函数EventLoop会自动销毁
                                // 但实际上start是一个死循环,除非关闭连接或者连接出错了
                                // 就会去调用连接关闭回调函数和错误处理回调函数来关闭连接,这样才会退出
                                // 所以也就是说这个创建的loop是贯穿一个连接整个生命周期的
                                // 只有连接结束了这个loop才会被销毁
                            }
                        private:
                            // 互斥锁和条件变量用于实现EventLoop获取的同步关系,避免线程创建了,但是EventLoop还没有实例化,就去获取EventLoop,就会获取到nullptr
                            std::mutex _mutex;        // 互斥锁
                            std::condition_variable _cond; // 条件变量
                            EventLoop *_eventLoop;    // EventLoop指针变量,这个对象需要在线程内实例化
                            std::thread _thread;      // EventLoop对应的线程
                        };
                        

                        2.LoopThreadPool类

                        LoopThreadPool类是线程池类,外界通过线程池的nextEventLoop函数接口获取子线程,实际上就是获取从属Reactor。这里的逻辑是,当主Reactor监控到一个新连接到来的时候,就从线程池的nextEventLoop函数中获取一个从属Reactor,让该从属Reactor监控新连接的IO事件。线程池的完整代码如下:

                        class LoopThreadPool
                        {
                        public:
                            // 构造函数,需要用baseLoop来初始化,也就是需要用主reactor来初始化
                            LoopThreadPool(EventLoop  *baseLoop)
                            :_threadCount(0), _nextLoopIndex(0), _baseLoop(baseLoop)
                            {}
                            // 设置线程数量
                            void setThreadCount(int count)
                            {
                                _threadCount = count;
                            }
                            // 创建所有的从属线程
                            void create()
                            {
                                if(_threadCount > 0)
                                {
                                    _threads.resize(_threadCount);
                                    _eventLoops.resize(_threadCount);
                                    for(int i = 0; i < _threadCount; i++)
                                    {
                                        _threads[i] = new LoopThread;
                                        _eventLoops[i] = _threads[i]->getEventLoop();
                                    }
                                }
                            }
                            // 从线程池中获取eventLoop
                            // 这个函数接口是给TcpServer调用的,当一个新连接到来的时候,就会创建一个新的Connection对象
                            // 这个Connection对象需要从线程池中拿到一个EventLoop来关联
                            // 关联起来后,往后这个Connection对象的所有操作都放在这个EventLoop对象中
                            EventLoop *nextEventLoop()
                            {
                                // 如果线程池的线程数量为0,就返回主eventLoop
                                if(_threadCount == 0)
                                {
                                    return _baseLoop;
                                }
                                _nextLoopIndex = (_nextLoopIndex + 1) % _threadCount;
                                return _eventLoops[_nextLoopIndex];
                            }
                        private:
                            int _threadCount;                     // 从属线程的数量
                            int _nextLoopIndex;
                            EventLoop *_baseLoop;                 // 主EventLoop,运行在主线程,从属线程数量为0,则所有操作都在_baseLoop中进行
                            std::vector _threads;   // 保存所有的LoopThread对象
                            std::vector _eventLoops; // 从属线程数量大于0则从_eventLoop中进行线程EventLoop分配
                        };
                        

网友评论

搜索
最新文章
热门文章
热门标签
 
 已婚女人梦见抱女婴  生肖配对姻缘查询  周公解梦梦见孩子死了