逻辑层设计

简介

本文概述基于boost::asio实现的服务器逻辑层结构,并且完善之前设计的消息结构。因为为了简化粘包处理,我们简化了发送数据的结构,这次我们给出完整的消息设计,以及服务器架构设计。

服务层设计

之前我们设计了Session(会话层),并且给大家讲述了Asio底层的通信过程,如下图

image-20231120183243232

我们接下来要设计的服务器结构是这样的

image-20231120183405074

消息头完善

我们之前的消息头仅包含数据域的长度,但是要进行逻辑处理,就需要传递一个id字段表示要处理的消息id,当然可以不在包头传id字段,将id序列化到消息体也是可以的,但是我们为了便于处理也便于回调逻辑层对应的函数,最好是将id写入包头。

为了减少耦合和歧义,我们重新设计消息节点

  1. MsgNode表示消息节点的基类,头部的消息用这个结构存储。
  2. RecvNode表示接收消息的节点。
  3. SendNode表示发送消息的节点。

定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class MsgNode
{
public:
MsgNode(short max_len) :_total_len(max_len), _cur_len(0) {
_data = new char[_total_len + 1]();
_data[_total_len] = '\0';
}

~MsgNode() {
std::cout << "destruct MsgNode" << std::endl;
delete[] _data;
}
void Clear() {
::std::memset(_data, 0, _total_len);
}
short _cur_len;
short _total_len;
char* _data;
};

//接受消息 id+内容
class RecvNode :public MsgNode {
public:
RecvNode(short max_len, short msg_id);

private:
short _msg_id;
};

//发送消息,id+len+内容
class SendNode :public MsgNode {
public:
SendNode(const char* msg, short max_len, short msg_id);
private:
short _msg_id;
};

实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
RecvNode::RecvNode(short max_len, short msg_id) :MsgNode(max_len),
_msg_id(msg_id) {
}

//注意,头部有(id+data_len)两个字段
SendNode::SendNode(const char* msg, short max_len, short msg_id) :MsgNode(max_len + HEAD_TOTAL_LEN)
, _msg_id(msg_id) {
//先发送id, 转为网络字节序
short msg_id_host = boost::asio::detail::socket_ops::host_to_network_short(msg_id);

//这里是HEAD_ID_LEN,不是HEAD_TOTAL_LEN
memcpy(_data, &msg_id_host, HEAD_ID_LEN);
//转为网络字节序
short max_len_host = boost::asio::detail::socket_ops::host_to_network_short(max_len);

//这里是HEAD_DATA_LEN,不是HEAD_TOTAL_LEN
memcpy(_data + HEAD_ID_LEN, &max_len_host, HEAD_DATA_LEN);

//消息内容
memcpy(_data + HEAD_ID_LEN + HEAD_DATA_LEN, msg, max_len);
}

注意

1
2
3
SendNode`发送节点构造时,先将id转为网络字节序,然后写入`_data`数据域。
然后将要发送数据的长度转为大端字节序,写入`_data`数据域,注意要偏移`HEAD_ID_LEN`长度。
最后将要发送的数据`msg`写入`_data`数据域,注意要偏移`HEAD_ID_LEN`+`HEAD_DATA_LEN

Session类改写

定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class CSession : public std::enable_shared_from_this<CSession>
{
public:
CSession(boost::asio::io_context& io_context, CServer* server);
~CSession();
tcp::socket& GetSocket();
std::string& GetUuid();
void Start();
void Send(char* msg, short max_length, short msgid);
void Send(std::string msg, short msgid);
void Close();
std::shared_ptr<CSession> SharedSelf();
private:
void HandleRead(const boost::system::error_code& error, size_t bytes_transferred, std::shared_ptr<CSession> shared_self);
void HandleWrite(const boost::system::error_code& error, std::shared_ptr<CSession> shared_self);
tcp::socket _socket;
std::string _uuid;
char _data[MAX_LENGTH];
CServer* _server;
bool _b_close;
std::queue<shared_ptr<MsgNode> > _send_que;
std::mutex _send_lock;


//收到的消息结构
std::shared_ptr<MsgNode> _recv_msg_node;
bool _b_head_parse;
//收到的头部结构
std::shared_ptr<MsgNode> _recv_head_node;
};

实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
#include "CSession.h"
#include "CServer.h"
#include <iostream>
#include <sstream>
#include <json/json.h>
#include <json/value.h>
#include <json/reader.h>

CSession::CSession(boost::asio::io_context& io_context, CServer* server) :
_socket(io_context), _server(server), _b_close(false), _b_head_parse(false) {
boost::uuids::uuid a_uuid = boost::uuids::random_generator()();
_uuid = boost::uuids::to_string(a_uuid);
_recv_head_node = make_shared<MsgNode>(HEAD_TOTAL_LEN);
}
CSession::~CSession() {
std::cout << "~CSession destruct" << endl;
}

tcp::socket& CSession::GetSocket() {
return _socket;
}

std::string& CSession::GetUuid() {
return _uuid;
}

void CSession::Start() {
::memset(_data, 0, MAX_LENGTH);
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH), std::bind(&CSession::HandleRead, this,
std::placeholders::_1, std::placeholders::_2, SharedSelf()));
}

void CSession::Send(std::string msg, short msgid) {
std::lock_guard<std::mutex> lock(_send_lock);
int send_que_size = _send_que.size();
if (send_que_size > MAX_SENDQUE) {
std::cout << "session: " << _uuid << " send que fulled, size is " << MAX_SENDQUE << endl;
return;
}

_send_que.push(make_shared<SendNode>(msg.c_str(), msg.length(), msgid));
if (send_que_size > 0) {
return;
}
auto& msgnode = _send_que.front();
boost::asio::async_write(_socket, boost::asio::buffer(msgnode->_data, msgnode->_total_len),
std::bind(&CSession::HandleWrite, this, std::placeholders::_1, SharedSelf()));
}

void CSession::Send(char* msg, short max_length, short msgid) {
std::lock_guard<std::mutex> lock(_send_lock);
int send_que_size = _send_que.size();
if (send_que_size > MAX_SENDQUE) {
std::cout << "session: " << _uuid << " send que fulled, size is " << MAX_SENDQUE << endl;
return;
}

_send_que.push(make_shared<SendNode>(msg, max_length, msgid));
if (send_que_size > 0) {
return;
}
auto& msgnode = _send_que.front();
boost::asio::async_write(_socket, boost::asio::buffer(msgnode->_data, msgnode->_total_len),
std::bind(&CSession::HandleWrite, this, std::placeholders::_1, SharedSelf()));
}

void CSession::Close() {
_socket.close();
_b_close = true;
}

std::shared_ptr<CSession>CSession::SharedSelf() {
return shared_from_this();
}

void CSession::HandleWrite(const boost::system::error_code& error, std::shared_ptr<CSession> shared_self) {
//增加异常处理
try {
if (!error) {
std::lock_guard<std::mutex> lock(_send_lock);
//cout << "send data " << _send_que.front()->_data+HEAD_LENGTH << endl;
_send_que.pop();
if (!_send_que.empty()) {
auto& msgnode = _send_que.front();
boost::asio::async_write(_socket, boost::asio::buffer(msgnode->_data, msgnode->_total_len),
std::bind(&CSession::HandleWrite, this, std::placeholders::_1, shared_self));
}
}
else {
std::cout << "handle write failed, error is " << error.what() << endl;
Close();
_server->ClearSession(_uuid);
}
}
catch (std::exception& e) {
std::cerr << "Exception code : " << e.what() << endl;
}

}

void CSession::HandleRead(const boost::system::error_code& error, size_t bytes_transferred, std::shared_ptr<CSession> shared_self) {
try {
if (!error) {
//已经移动的字符数
int copy_len = 0;
while (bytes_transferred > 0) {
if (!_b_head_parse) {
//收到的数据不足头部大小
if (bytes_transferred + _recv_head_node->_cur_len < HEAD_TOTAL_LEN) {
memcpy(_recv_head_node->_data + _recv_head_node->_cur_len, _data + copy_len, bytes_transferred);
_recv_head_node->_cur_len += bytes_transferred;

//清空缓冲区
::memset(_data, 0, MAX_LENGTH);
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
return;
}
//收到的数据比头部多
//头部剩余未复制的长度
int head_remain = HEAD_TOTAL_LEN - _recv_head_node->_cur_len;
memcpy(_recv_head_node->_data + _recv_head_node->_cur_len, _data + copy_len, head_remain);
//更新已处理的data长度和剩余未处理的长度
copy_len += head_remain;
bytes_transferred -= head_remain;
//获取头部MSGID数据
short msg_id = 0;
memcpy(&msg_id, _recv_head_node->_data, HEAD_ID_LEN);
//网络字节序转化为本地字节序
msg_id = boost::asio::detail::socket_ops::network_to_host_short(msg_id);
std::cout << "msg_id is " << msg_id << endl;
//id非法
if (msg_id > MAX_LENGTH) {
std::cout << "invalid msg_id is " << msg_id << endl;
_server->ClearSession(_uuid);
return;
}
short msg_len = 0;
memcpy(&msg_len, _recv_head_node->_data + HEAD_ID_LEN, HEAD_DATA_LEN);
//网络字节序转化为本地字节序
msg_len = boost::asio::detail::socket_ops::network_to_host_short(msg_len);
std::cout << "msg_len is " << msg_len << endl;

//len非法
if (msg_len > MAX_LENGTH) {
std::cout << "invalid data length is " << msg_len << endl;
_server->ClearSession(_uuid);
return;
}

//构造接收节点
_recv_msg_node = make_shared<RecvNode>(msg_len, msg_id);

//消息的长度小于头部规定的长度,说明数据未收全,则先将部分消息放到接收节点里
if (bytes_transferred < msg_len) {
memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred);
_recv_msg_node->_cur_len += bytes_transferred;
::memset(_data, 0, MAX_LENGTH);
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
//头部处理完成
_b_head_parse = true;
return;
}

//处理这一轮的消息体
memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, msg_len);
_recv_msg_node->_cur_len += msg_len;
copy_len += msg_len;
bytes_transferred -= msg_len;
_recv_msg_node->_data[_recv_msg_node->_total_len] = '\0';
//cout << "receive data is " << _recv_msg_node->_data << endl;

//构造数据和发送数据
Json::Reader reader;
Json::Value root;
reader.parse(std::string(_recv_msg_node->_data, _recv_msg_node->_total_len), root);
std::cout << "recevie msg id is " << root["id"].asInt() << " msg data is "
<< root["data"].asString() << endl;
root["data"] = "server has received msg, msg data is " + root["data"].asString();
std::string return_str = root.toStyledString();
Send(return_str, root["id"].asInt());

//处理下一轮的头部
_b_head_parse = false;
_recv_head_node->Clear();
if (bytes_transferred <= 0) {
::memset(_data, 0, MAX_LENGTH);
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
return;
}
continue;
}

//已经处理完头部,处理上次未接受完的消息数据
//接收的数据仍不足剩余未处理的
int remain_msg = _recv_msg_node->_total_len - _recv_msg_node->_cur_len;
if (bytes_transferred < remain_msg) {
memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred);
_recv_msg_node->_cur_len += bytes_transferred;
::memset(_data, 0, MAX_LENGTH);
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
return;
}
memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, remain_msg);
_recv_msg_node->_cur_len += remain_msg;
bytes_transferred -= remain_msg;
copy_len += remain_msg;
_recv_msg_node->_data[_recv_msg_node->_total_len] = '\0';

//构造数据和发送数据
Json::Reader reader;
Json::Value root;
reader.parse(std::string(_recv_msg_node->_data, _recv_msg_node->_total_len), root);
std::cout << "recevie msg id is " << root["id"].asInt() << " msg data is "
<< root["data"].asString() << endl;
root["data"] = "server has received msg, msg data is " + root["data"].asString();
std::string return_str = root.toStyledString();
Send(return_str, root["id"].asInt());


//处理下一轮的头部
_b_head_parse = false;
_recv_head_node->Clear();
if (bytes_transferred <= 0) {
::memset(_data, 0, MAX_LENGTH);
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
return;
}
continue;
}
}
else {
std::cout << "handle read failed, error is " << error.what() << endl;
Close();
_server->ClearSession(_uuid);
}
}
catch (std::exception& e) {
std::cout << "Exception code is " << e.what() << endl;
}
}

单例模板类

因为服务器的逻辑处理需要单例模式,后期可能还会有一些模块的设计也需要单例模式,所以先实现一个单例模板类,然后其他想实现单例类只需要继承这个模板类即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
template <typename T>
class Singleton
{
protected:
Singleton() = default;
Singleton(const Singleton<T>&) = delete;
Singleton& operator=(const Singleton<T>& st) = delete;
static std::shared_ptr<T> _instance;

public:
//获取指针
static std::shared_ptr<T> GetInstance() {
static std::once_flag s_flag;
std::call_once(s_flag, [&]() {
_instance = shared_ptr<T>(new T);
});
return _instance;
}

//输出地址
void PrintAddress() {
std::cout << _instance.get() << endl;
}

//析构
~Singleton() {
std::cout << "this is singleton destruct" << std::endl;
}
};

template<typename T>
std::shared_ptr<T> Singleton<T>::_instance = NULL;

解释

LogicNode类

LogicNode定义在CSession.h中

其包含算了会话类的智能指针,主要是为了实现伪闭包,防止session被释放。
其次包含了接收消息的节点类的智能指针。
实现如下

1
2
3
4
5
6
7
8
9
//session和recvnode
class LogicNode {
friend class LogicSystem;
public:
LogicNode(shared_ptr<CSession>Sesson, shared_ptr<RecvNode>Recvnode) :_session(Sesson), _recvnode(Recvnode) {}
private:
shared_ptr<CSession> _session;
shared_ptr<RecvNode> _recvnode;
};

LogicSystem单例类

我们实现逻辑系统的单例类,继承自Singleton<LogicSystem>,这样LogicSystem的构造函数和拷贝构造函数就都变为私有的了,因为基类的构造函数和拷贝构造函数都是私有的。另外LogicSystem也用了基类的成员_instanceGetInstance函数。从而达到单例效果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//这是回调函数的类型定义。它接受一个会话对象的智能指针、一个消息ID和消息内容作为参数。
typedef function<void(shared_ptr<CSession>, short msg_id, string msg_data)> FunCallBack;

class LogicSystem :public Singleton<LogicSystem>
{
friend class Singleton<LogicSystem>;
public:
~LogicSystem();
void PostMsgToQue(shared_ptr < LogicNode> msg);
private:
LogicSystem();
void DealMsg();
void RegisterCallBacks();
void HelloWordCallBack(shared_ptr<CSession>, short msg_id, string msg_data);
std::thread _worker_thread;
std::queue<shared_ptr<LogicNode>> _msg_que; //消息队列
std::mutex _mutex;
std::condition_variable _consume;
bool _b_stop;
std::map<short, FunCallBack> _fun_callbacks;
};
  1. FunCallBack为要注册的回调函数类型,其参数为绘画类智能指针,消息id,以及消息内容。
  2. _msg_que为逻辑队列
  3. _mutex 为保证逻辑队列安全的互斥量
  4. _consume表示消费者条件变量,用来控制当逻辑队列为空时保证线程暂时挂起等待,不要干扰其他线程。
  5. _fun_callbacks表示回调函数的map,根据id查找对应的逻辑处理函数。
  6. _worker_thread表示工作线程,用来从逻辑队列中取数据并执行回调函数。
  7. _b_stop表示收到外部的停止信号,逻辑类要中止工作线程并优雅退出。

逻辑层设计
http://example.com/2023/11/20/cpp/逻辑层设计/
作者
Mrxiad
发布于
2023年11月20日
许可协议