Linux网络|多路转接Reactor
- 游戏开发
- 2025-09-02 03:12:01

前言:本节内容结束Linux网络部分。本节将要简单实现一下多路转接Reactor的代码,制作一个多路转接版本的四则运算计算器服务器。Reactor的代码相当困难,除了350多行新代码, 还要用到我们之前写的许多文件, 比如之前写的计算服务的客户端代码、 序列化反序列化的代码、Epoller文件等等。代码非常困难,友友们做好心理准备, 现在废话不多说, 开始我们的学习吧!
ps:本节代码逻辑最好能看懂哦!
目录
ET和LT
Reactor代码实现
准备文件
TcpServer.hpp
Connection
TcpServer
构造函数
析构函数
Init
SetNonBlockOrDie
AddConnection
Accepter
Loop
Dispatcher
Recver
DefaultOnMessage
Sender
Excepter
Reactor是epoll的一种工作模式。 现在我们来引出这种工作模式
ET和LTepoll有两种工作模式:ET与LT。
LT叫做: level Triggered水平触发ET叫做: Edge Triggered边缘触发epoll的默认工作模式是LT。什么是T,就是如果我们事件就绪,里面的内容如果我们不处理,那么他就会一直提醒我们事件就绪了,事件就绪了,一直是有效的。
ET是什么,从无到有,从有到多, 变化的时候, 才会提醒我们一次。
就类似于取快递,张三是只要有快递,就给你打电话一直通知你。李四是只有当新快递到来的时候才会通知你,不管你取不取走,取走多少,只通知一次,即便是假如有五个快递,你取了,但是只取走了四个,也同样不会再通知你。
然后这里涉及到了一个面试题:就是为什么ET模式下fd必须是non_block?
因为ET模式下只变化时通知一次。所以它会倒逼着程序员,每次通知,都必须把本轮的数据全部取走。如何全部取走? 就是循环读取,一直到读取出错。这个时候fd就是阻塞的,这样服务就挂起了,就相当于服务挂掉了。为了不让他阻塞,就必须在ET模式下,所有的fd必须是non_block。
然后,ET和LT谁的效率更高呢? 首先,ET的通知效率更高。 但是不仅仅如此,ET的IO效率也会更高。为什么ET的IO效率更高呢? 原因就是因为ET每次通知一次,上层程序员就把数据全部取走了。这样也能保证TCP会向对方通知一个更大的窗口,从而概率上让对方一次给我发送更多的数据。
但是,不能说ET一定比LT效率高。因为LT也可以将所有的fd设置成non block,然后循环读取。这样就保证了在通知第一次的时候,数据就全部被取走了,就和ET一样了。
底层有数据了,通知上层。本质上是什么意思呢?底层数据就绪了,就是网卡中断产生数据交给网卡驱动就绪了。那么如果是ET,就是只要数据就绪,不管有没有数据,只有当中断产生数据的时候调用callback,就只回调一次callback,将节点链入到就绪队列里面。LT就是每次我们epoll_create,只要底层有数据都调用一次callback,将就绪的数据链入到就绪队列里面。
现在我们就写一下epoll的ET工作模式。 Reactor
Reactor代码实现 准备文件先简单准备一下文件, 这里准备的是暂时需要的, 后续需要别的文件再添加即可。
其中Epoller.hpp和nocopy.hpp是上一节:Linux网络 | 多路转接epoll-CSDN博客讲述的。Socket套接字和Log.hpp上一节同样使用过。实现过程博主已经讲过:linux网络 | 序列化反序列化的概念 与 结合网络计算器深度理解-CSDN博客 ——Socket组件
linux进程间通信——命名管道、 日志程序_进程间通信日志系统-CSDN博客 ——日志组件
我们的主要中心就放到TcpServer.hpp上面。 Reactor的主要逻辑都在这里。
TcpServer.hpp Connection首先创建Connection类, 这个类意思是连接, 用来保存sockfd和对应事件。同时保存对应事件的处理方法、 异常的处理方法、sockfd对应的接收缓冲区和发送缓冲区。还能添加一个指向底层TcpServer对象的回指指针, 这个回指指针为什么要有, 怎么用, 后面说。
然后是接口, 说完了属性就来看看接口。 接口我们要把Sock能够获取到, 所以有一个SockFd()来获取sockfd。 然后还要能获取到接收缓冲区和发送缓冲区, 所以也要有对应的接口。然后读数据的时候, 接收缓冲区在后面要能够追加字符串。 发送数据的时候, 发送缓冲区要能够截取字符串。 Connection不提供如何将接收缓冲区的数据与发送缓冲区的数据联系起来的操作, 这些事情如何处理, 由上层去做。 因为我们Connection已经把发送缓冲区和接收缓冲区暴露出去了, 暴露给上层了。 上层怎么处理, 是上层的事情。
下面是Connection类的创建:
class Connection { public: Connection(int sock, shared_ptr<TcpServer> tcp_server_ptr) : _sock(sock), _tcp_server_ptr(tcp_server_ptr) { } void SetCallBack(func_t recv_cb, func_t send_cb, func_t except_cb) // 设置回调函数 { _recv_cb = recv_cb; _send_cb = send_cb; _except_cb = except_cb; } int SockFd() { return _sock; } void AppendInbuffer(const string& info) { _inbuffer += info; } void AppendOutbuffer(const string& info) { _outbuffer += info; } string& Inbuffer() { return _inbuffer; } string& Outbuffer() { return _outbuffer; } ~Connection() { } private: int _sock; // 链接所在的套接字 string _inbuffer; // string二进制流,vector, 读到的数据放到这里。 string _outbuffer; // 未来通过套接字读取数据时,从inbuffer读取。 public: func_t _recv_cb; // 读回调。 一旦数据就绪,就使用这个函数。 func_t _send_cb; // 写回调。 func_t _except_cb; // // 添加一个指向底层TcpServer对象的回指指针。 shared_ptr<TcpServer> _tcp_server_ptr; string _ip; uint16_t _port; }; TcpServer先说TcpServer要有的属性, 因为是epoll, 所以要有epoll模型。我们将epoll模型封装成了Epoller对象, 所以即Epoller对象(Epoller文件里面的结构体实例化对象)。然后要有一个主监听套接字listensock, 专门来监听新连接。然后也要有一个保存Connection对象的容器, 因为这个Connection和sockfd是唯一对应的,所以我们可以使用哈希map存储键值对{sockfd, Connection},博主为了方便管理, 用的shared_ptr封装了一下Connection,键值对变成了{sockfd, shared_ptr<Connection>。 然后就是接收到的事件就绪数组:revs(epoll_event类型)。还有一个上层处理信息的回调函数:_OnMessage。然后端口号port, 退出标识符quit不解释。 最容易出错的一个地方还有一个属性, 就是一个指向自身的智能指针, 这个指针用来创建Connection连接的时候里面的那个TcpServer回指指针的拷贝, 注意,这里必须有指向自己的智能指针, 因为只有智能指针拷贝智能指针的时候才能增加引用计数, 如果创建Connection时候直接用this指针构造智能指针, 那么每一个Connection里面的智能指针的引用计数都是1, 不会增加引用计数, 当我们销毁Connection的时候就会出现问题。
我们先把里面的属性写出来, 然后接口一步一步说:
class TcpServer { public: private: shared_ptr<Epoller> _epoller_ptr; // epoll模型 shared_ptr<Socket> _listensock_ptr; // 套接字, 用来监听新连接 unordered_map<int, shared_ptr<Connection>> _Connections; // 从一个文件描述符到一个epoller链接的映射。服务器管理的所有的链接 struct epoll_event revs[num]; uint16_t _port; bool _quit; shared_ptr<TcpServer> _self; func_t _OnMessage; //让上层处理信息,将数据交给上层 };然后来看一下接口:
我们上层传入的函数, 还有以后我们的发送, 接收, 异常处理。这几个接口我们都是用的是一个包装类, 使用包装器封装的回调函数类。 即func_t, 这个func_t包装如下:
using func_t = function<void(shared_ptr<Connection>)>; // 定义一个函数对象 构造函数然后看一下构造函数, 构造函数上层要传入端口号, 传入上层处理数据的函数。
TcpServer(uint16_t port, func_t OnMessage) : _port(port), _epoller_ptr(new Epoller()), _listensock_ptr(new Socket()), _quit(false), _OnMessage(OnMessage), _self(shared_ptr<TcpServer>(this)) { } 析构函数析构函数默认就可以, 下面是博主为了调试。
~TcpServer() { cout << "~ TcpServer()"<< endl; } InitInit函数, 其实监听套接字的初始化。其中要注意的是两个接口:一个是SetNonBlockOrDie, 这个是封装的一个设置套接字变成非阻塞的接口。 第二个是AddConnection,AddConection就是创建新连接, 当有新的连接到来时就调用这个AddConnection。需要传入新到来的sockfd, 以及要关心的事件, 同时还要添加进读方法, 写方法, 异常处理方法。 以及自己的端口号和ip。然后listensock要传入的读方法和其他的文件描述符有所不同, 是一个Accepter接口,用来监听新连接。
void Init() { _listensock_ptr->InitSocket(); SetNonBlockOrDie(_listensock_ptr->Fd()); _listensock_ptr->Bind(_port); _listensock_ptr->Listen(); AddConnection(_listensock_ptr->Fd(), EVENT_IN, bind(&TcpServer::Accepter, this, placeholders::_1) , nullptr, nullptr, "0.0.0.0", _port); // 监听套接字没有回调函数, 他调用的是连接管理器。 } SetNonBlockOrDie设置fd变成非阻塞, 这个我们重新在一个文件中写。
#include <iostream> #include <unistd.h> #include <fcntl.h> using namespace std; void SetNonBlockOrDie(int sockfd) { int fl = fcntl(sockfd, F_GETFL); if (fl < 0) exit(1); fcntl(sockfd, F_SETFL, fl | O_NONBLOCK); } AddConnection添加新连接, 就是先创建一个Connection的智能指针对象, 将参数都设置进去, 然后同时将sockfd和事件添加到epoll模型当中。 以及将连接智能指针对象保存到哈希表中。
void AddConnection(int sock, uint32_t event, func_t recv_cb, func_t send_cb, func_t except_cb, const string& ip, uint16_t port) { // 还要给sock也创建一个connection 对象,将sock添加到connection中,同时sock和connection放入connections。 shared_ptr<Connection> new_connection = make_shared<Connection>(sock, _self); // 这里是构造connection。 new_connection->SetCallBack(recv_cb, send_cb, except_cb); //构造shared_ptr, 构造自己的。但是不应该构造,应该传拷贝 new_connection->_ip = ip; new_connection->_port = port; _Connections.insert(make_pair(sock, new_connection)); // 将sock套接字和conntection放到映射表中。 // 我们添加对应的事件,除了要加到内核中,关注fd, event _epoller_ptr->EpollerUpdate(EPOLL_CTL_ADD, sock, event); // 将对应sock套接字添加到内核中只是第一步 lg(Debug, "add a new connection success, sockfd is : %d", sock); } Accepter监听新连接, 和之前写的epoll监听新连接方法一样, 不同的是要获取对方的ip地址和端口号保存下来,同时还要将读写以及处理异常方法设置进去。
void Accepter(shared_ptr<Connection> connection) { while (true) { struct sockaddr_in peer; socklen_t len = sizeof(peer); int sock = ::accept(connection->SockFd(), (sockaddr*)&peer, &len); if (sock > 0) //来到新连接 { uint16_t peerport = ntohs(peer.sin_port); //网络序列转主机序列 char ipbuf[128]; inet_ntop(AF_INET, &peer.sin_addr.s_addr, ipbuf, sizeof(ipbuf)); string peerip = ipbuf; lg(Debug, "get a new client, get info-> [%s: %d], sockfd : %d\n", ipbuf, peerport, sock); SetNonBlockOrDie(sock); //设置非阻塞 //listensock只需要recv_rb, 但是其他的sock, 读, 写, 处理异常都要有 AddConnection(sock, EVENT_IN, bind(&TcpServer::Recver, this, placeholders::_1), bind(&TcpServer::Sender, this, placeholders::_1), bind(&TcpServer::Excepter, this, placeholders::_1), peerip, peerport); //添加新连接事件 } else { if (errno == EWOULDBLOCK) break; //底层没有数据 else if (errno == EINTR) continue; //EINTR为信号中断,信号导致的读取出错,挂起 else break; } } } LoopLoop其实就相当于Start函数, 就是启动服务。一直死循环下去。 然后每一个循环都要调用Dispatcher函数分发事件。
void Loop() { _quit = false; while (!_quit) { Dispatcher(3000); PrintConnection(); } _quit = true; } Dispatcher谈完Loop那么就谈Dispatcher, Dispatcher, 就是调用找到就绪的文件描述符, 根据事件类型对文件描述符执行不同的操作。这里面的IsConnectionSafe()其实就是一个判断sockfd是否安全的函数, 如果安全就返回真, 否则返回假。
void Dispatcher(int timeout) { int n = _epoller_ptr->EpollErWait(revs, num, timeout); // 获得n个就绪事件 for (int i = 0; i < n; i++) { // 事件就绪处理 uint32_t event = revs[i].events; int sock = revs[i].data.fd; //如果是EPOLLERR和EPOLLHUP, 也就是事件出错了, 异常了。统一把所有的事件异常转化为读写问题。 //这样就能让异常集中处理, 不再扩散到代码的任意位置。 if (event & EPOLLERR) event |= (EPOLLIN | EPOLLOUT); if (event & EPOLLHUP) event |= (EPOLLIN | EPOLLHUP); //事件派发里, 只需要处理EPOLLIN和EPOLLOUT就可以了。因为出异常只可能出现在这两个时间里面。 if ((event & EPOLLIN) && IsConnectionSafe(sock)) { if (_Connections[sock]->_recv_cb) //处理写事件 { _Connections[sock]->_recv_cb(_Connections[sock]); //这里传过去的就是就绪的文件描述符所在的连接 } } if ((event & EPOLLOUT) && IsConnectionSafe(sock)) //写事件被设置成关心, 去调用Sender方法 { if (_Connections[sock]->_send_cb) { _Connections[sock]->_send_cb(_Connections[sock]); } } } } Recver当一个文件描述符读事件就绪了, 那么根据Dispatcher的逻辑, 就直接进入了Recver函数。
Recver函数因为是ET模式, 所以必须要把底层的内核缓冲区里面的数据全部读完。那么最后一次读, 就一定是EWOULDBLOCK, 即错误码11。 所以如果是EWOULDBLOCK, 那么就退出循环,但是不return, 只是退出循环去执行后面的上层数据处理函数。 如果是读取数据读取到了0的时候, 就代表对面把连接关掉了, 那么sockfd就没有用了, 异常处理就行。 异常处理函数为Excepter, 保存在_except_cb成员里面, 后面会讲到。同样的, 如果是信号中断了一下, 这个时候还可以继续接着读取,就不用退出循环。其他情况下的异常, 都要Excepter。
void Recver(shared_ptr<Connection> connection) //这里传过来的 { // int sock = connection->SockFd(); //得到文件描述符 //ET模式, 必须把数据全部读完 while (true) { char buffer[g_buffer_size]; memset(buffer, 0, sizeof(buffer)); ssize_t n = recv(sock, buffer, sizeof(buffer) - 1, 0); //设置为0,原fd为非阻塞读取, 所以这里就是非阻塞读取。 if (n > 0) //读取成功 { connection->AppendInbuffer(buffer); //读取成功, 就不断的将buffer缓冲区里面的数据放到connection里面维护的inbuffer里面。 } else if (n == 0) //对端把连接关了 { lg(Info, "sockfd: %d, client info %s : %d quit...", connection->SockFd(), connection->_ip.c_str(), connection->_port); //一旦对方连接关了, 我们对应的这个连接, 就直接进入异常处理的流程: connection->_except_cb(connection); return; } else //读取出错 { //非阻塞处理的时候:出错了errno == EWOULDBLACK, 表明本轮数据已经读完。那么文件描述符没有问题。 //如果errno == EINET, 表明是因为被某个信号中断了,文件描述符没有问题, 并且数据可能没有读完, 继续读取. //如果errno == 其他的, 那么就是说明读取出错了, 文件描述符出问题了, 这个连接就可以Excepter了。 if (errno == EWOULDBLOCK) { break; } else if (errno == EINTR) { continue; } else { lg(Waring, "sockfd: %d, client info %s : %d recv error...", connection->_ip.c_str(), connection->_port); connection->_except_cb(connection); return; } } } //数据有了, 但不一定全。1、先检测。2、如果有完整报文, 就处理。 _OnMessage(connection); //读到的所有的数据, 都在connection内部。读完之后回调给上层。 } DefaultOnMessage这个函数是放到主函数的, 也就是我们说的上层处理数据方法, 传给tcp服务的。这里的序列化反序列化的接口以及处理数据的接口我们使用的是以前写计算服务的代码。 这里不写了, 直接贴过来。
Calculator.hpp
#include "protocol.hpp" #include<iostream> using namespace std; //上层业务 class Calculator { public: Response CalculatorHelper(Request req) { Response res(0, 0); res.result_ = res.code_ = 0; switch (req.op_) { case '+': res.result_ = req.x_ + req.y_; break; case '-': res.result_ = req.x_ - req.y_; break; case '*': res.result_ = req.x_ * req.y_; break; case '/': { if (req.y_ == 0) { res.code_ = 1; res.result_ = 0; } else { res.result_ = req.x_ / req.y_; res.code_ = 0; } break; } case '%': { if (req.y_ == 0) { res.code_ = 1; res.result_ = 0; } else { res.result_ = req.x_ % req.y_; res.code_ = 0; } break; } default: { res.code_ = 1; res.result_ = 0; break; } } return res; } string Handler(string &package) // 接受一个报文 { string content; bool r = Decode(package, &content); // 将报文解包,如果这个时候报文, 第一个数字是\n说明后面一定是一个 然后把数据放到新的字符串中 if (!r) return ""; // Request req; // 重新创建一个变量 req.DeSerialize(content); // 然后将变量反序列化 Response res = CalculatorHelper(req); content.clear(); res.Serialize(&content); content = Encode(content); cout << content << endl; return content; } };protocal.hpp
#pragma once #include<iostream> using namespace std; #include<string> #include<jsoncpp/json/json.h> // #define Myself 1 //封装报头 string Encode(string& content) { string package = to_string(content.size()); package += "\n"; package += content; package += "\n"; return package; } //解开报头 bool Decode(string& package, string* content) { int pos = package.find('\n', 0); //查找第一个\n if (pos == string::npos) return false; string len_str = package.substr(0, pos); int len = stoi(len_str.c_str()); ssize_t size = len_str.size() + len + 2; if (package.size() < size) return false; *content = package.substr(pos + 1, len); package.erase(0, size); return true; } class Request { public: Request(){} Request(int data1, int data2, char oper) : x_(data1), y_(data2), op_(oper){} ~Request(){} bool Serialize(string* out) { #ifdef Myself string s; s += to_string(x_); s += " "; s += op_; s += " "; s += to_string(y_); *out = s; return true; #else Json::Value root; root["x"] = x_; root["y"] = y_; root["op"] = op_; Json::FastWriter w; *out = w.write(root); return true; #endif } bool DeSerialize(string in) { #ifdef Myself string s = in; int left = s.find(' ', 0); if (left == string::npos) return false; string part_x = s.substr(0, left); int right = s.find(' ', left + 1); if (right == string::npos) return false; char part_op = s[left + 1]; string part_y = s.substr(right + 1); x_ = stoi(part_x); y_ = stoi(part_y); op_ = part_op; return true; #else Json::Value root; Json::Reader r; r.parse(in, root); x_ = root["x"].asInt(); y_ = root["y"].asInt(); op_ = root["op"].asInt(); return true; #endif } void DebugPrint() { cout << "x = " << x_; cout << "; lop = " << op_; cout << "; y = " << y_ << endl; } int x_; int y_; char op_; }; class Response { public: Response(){} Response(int result, int code) : result_(result), code_(code) {} ~Response(){} bool Serialize(string* out) { #ifdef Myself string s; s += to_string(result_); s += " "; s += to_string(code_); *out = s; return true; #else Json::Value root; root["result"] = result_; root["code"] = code_; Json::FastWriter w; *out = w.write(root); return true; #endif } bool DeSerialize(string in) { #ifdef Myself string s = in; int pos = s.find(' ', 0); if (pos == string::npos) return false; string part_res = s.substr(0, pos); string part_code = s.substr(pos + 1); result_ = stoi(part_res); code_ = stoi(part_code); return true; #else Json::Value root; Json::Reader r; r.parse(in, root); result_ = root["result"].asInt(); code_ = root["code"].asInt(); return true; #endif } void DebugPrint() { cout << "结果响应完成, result: " << result_ << " , code: " << code_ << endl; } int result_; int code_; }; Calculator calculator; void DefaultOnMessage(shared_ptr<Connection> connection_ptr) { lg(Debug, "上层已经得到数据\b"); //处理数据 string response = calculator.Handler(connection_ptr->Inbuffer()); if(response.empty()) return; //发送出去 lg(Debug, "%s", response.c_str()); //response发送出去 connection_ptr->AppendOutbuffer(response); //将结果拷贝到输出缓冲区里面。 //正确理解发送 connection_ptr->_tcp_server_ptr->Sender(connection_ptr); } SenderSender方法就是写方法,其实我们从讲多路转接开始,从来没有讲过发送的问题。现在我们来看看如何正确理解发送?
把数据真正发出去,在select、poll、epoll中,因为写事件的本质就是发送缓冲区是否有空间,这个经常是OK的,所以经常就是就绪的。如果我们设置对EPOLLOUT关心,那么EPOLLOUT几乎每次都有就绪。就导致epollserver经常返回,浪费cpu资源。
所以结论:对于读,设置长关心。对于写,按需设置。就是如下代码:
void Sender(shared_ptr<Connection> connection) { auto& outbuffer = connection->Outbuffer(); while (true) { ssize_t n = send(connection->SockFd(), outbuffer.c_str(), outbuffer.size(), 0); if (n > 0) { outbuffer.erase(0, n); if (outbuffer.empty()) break; } else if(n == 0) { connection->_except_cb(connection); return; } else { if (errno == EWOULDBLOCK) break; else if (errno == EINTR) continue; else { lg(Waring, "sockfd: %d, client info %s : %d send error...", connection->_ip.c_str(), connection->_port); connection->_except_cb(connection); return; } } } if (!outbuffer.empty()) { //开启对写事件的关心 EnableEvent(connection->SockFd(), true, true); } else { //关闭对写事件的关心 EnableEvent(connection->SockFd(), true, false); } } Excepterexcepter是对一场做处理, 当全部读完, 或者全部写完的时候, 都需要异常处理。
void Excepter(shared_ptr<Connection> connection) { int sockfd = connection->SockFd(); lg(Debug, "Excepter hander sockfd : %d, client info %s : %d Excepter hander...", sockfd, connection->_ip.c_str(), connection->_port); //异常处理 //1、先移除关心事件 _epoller_ptr->EpollerUpdate(EPOLL_CTL_DEL, sockfd, 0); //2、关闭异常的文件描述符 close(sockfd); //3、将fd->connection映射表中将对应的连接进行移除。即从unordered_map中移除 _Connections.erase(sockfd); //erase释放, 就是释放整个节点, 这个节点是{sockfd, connection_ptr} //释放对应的节点。但是 //出现了双重释放。 双重释放了什么? lg(Debug, "remove %d from, _connections...\n", connection->SockFd()); }以上重要代码就都讲完了, 后面会有整个的代码博主会上传到资源里面, 有兴趣的友友们自行下载。
——————以上就是本节全部内容哦, 如果对友友们有帮助的话可以关注博主, 方便学习更多知识哦!!!
Linux网络|多路转接Reactor由讯客互联游戏开发栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“Linux网络|多路转接Reactor”
上一篇
习题系列——数值分析与数值计算
下一篇
高速硬件电路设计