处理网络粘包问题

粘包问题

原因:

因为TCP底层通信是面向字节流的,TCP只保证发送数据的准确性和顺序性,字节流以字节为单位,客户端每次发送N个字节给服务端,N取决于当前客户端的发送缓冲区是否有数据,比如发送缓冲区总大小为10个字节,当前有5个字节数据(上次要发送的数据比如’loveu’)未发送完,那么此时只有5个字节空闲空间,我们调用发送接口发送hello world!其实就是只能发送Hello给服务器,那么服务器一次性读取到的数据就很可能是loveuhello。而剩余的world!只能留给下一次发送,下一次服务器接收到的就是world!

其他的原因:
1   客户端的发送频率远高于服务器的接收频率,就会导致数据在服务器的tcp接收缓冲区滞留形成粘连,比如客户端1s内连续发送了两个hello world!,服务器过了2s才接收数据,那一次性读出两个hello world!。
2   tcp底层的安全和效率机制不允许字节数特别少的小包发送频率过高,tcp会在底层累计数据长度到一定大小才一起发送,比如连续发送1字节的数据要累计到多个字节才发送,可以了解下tcp底层的Nagle算法。
3   再就是我们提到的最简单的情况,发送端缓冲区有上次未发送完的数据或者接收端的缓冲区里有未取出的数据导致数据粘连。

处理粘包

处理粘包的方式主要采用应用层定义收发包格式的方式,这个过程俗称切包处理,常用的协议被称为tlv协议:

消息id+消息长度+消息内容

MsgNode类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class MsgNode
{
//Csession可以访问私有成员
friend class CSession;
public:

//(发送msg),数组长度为(max_len+头长度+1),结尾"\0",当前长度为0
MsgNode(char* msg, short max_len) :_total_len(max_len + HEAD_LENGTH), _cur_len(0) {
_data = new char[_total_len + 1]();
memcpy(_data, &max_len, HEAD_LENGTH);
memcpy(_data + HEAD_LENGTH, msg, max_len);
_data[_total_len] = '\0';
}

//(读取msg),数组为(max_len+1),,结尾"\0",当前长度为0
MsgNode(short max_len) :_total_len(max_len), _cur_len(0) {
_data = new char[_total_len + 1]();
}

~MsgNode() {
delete[] _data;
}

//清空消息内容
void Clear() {
::memset(_data, 0, _total_len);
_cur_len = 0;
}
private:
short _cur_len; //当前消息长度
short _total_len;//消息总长度
char* _data; //消息内容
};
  • _cur_len; //当前消息长度
  • _total_len;//消息总长度
  • _data; //消息内容
  • MsgNode(short max_len) 接受消息(read)的话需要这个构造
  • MsgNode(char* msg, short max_len) 发送消息(wrire)的话需要找个构造

CSession

定义

为能够对收到的数据切包处理,需要定义一个消息接收节点,一个bool类型的变量表示头部是否解析完成,以及将处理好的头部先缓存起来的结构。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
class CSession : public std::enable_shared_from_this<CSession>
{
public:
CSession(boost::asio::io_context& io_context, CServer* server);
~CSession();
tcp::socket& GetSocket();
std::string& GetUuid();


void Start();//开始接收消息
void Send(char* msg, int max_length);//发送消息
void Close();//关闭会话
std::shared_ptr<CSession> SharedSelf();//获取this指针


private:
//读回调
void HandleRead(const boost::system::error_code& error, size_t bytes_transferred, std::shared_ptr<CSession> shared_self);
//写回调
void HandleWrite(const boost::system::error_code& error, std::shared_ptr<CSession> shared_self);
tcp::socket _socket;
std::string _uuid;

//接收数据的缓冲区
char _data[MAX_LENGTH];

//对应的服务器
CServer* _server;


bool _b_close;//是否关闭

//发送队列
std::queue<shared_ptr<MsgNode> > _send_que;
std::mutex _send_lock;//发送锁,防止多线程同时发送


//收到的消息结构
std::shared_ptr<MsgNode> _recv_msg_node;
bool _b_head_parse;//是否解析到头部
//收到的头部结构
std::shared_ptr<MsgNode> _recv_head_node;
};

实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
//构造会话,_socket,服务器指针,关闭标志位,头部解析标志位,接收头部节点
CSession::CSession(boost::asio::io_context& io_context, CServer* server) :
_socket(io_context), _server(server), _b_close(false), _b_head_parse(false) {

//生成uuid
boost::uuids::uuid a_uuid = boost::uuids::random_generator()();
_uuid = boost::uuids::to_string(a_uuid);

//初始化接收头部节点
_recv_head_node = make_shared<MsgNode>(HEAD_LENGTH);
}
CSession::~CSession() {
cout << "~CSession destruct" << endl;
}

void CSession::Close() {
_socket.close();//关闭socket
_b_close = true;//设置关闭标志位
}

tcp::socket& CSession::GetSocket() {
return _socket;
}

std::string& CSession::GetUuid() {
return _uuid;
}

//获取this指针
std::shared_ptr<CSession>CSession::SharedSelf() {
return shared_from_this();
}


void CSession::Start() {
//开始接收数据
::memset(_data, 0, MAX_LENGTH);

//异步读取数据
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH), std::bind(&CSession::HandleRead, this,
std::placeholders::_1, std::placeholders::_2, SharedSelf()));
}

//发送消息
void CSession::Send(char* msg, int max_length) {
std::lock_guard<std::mutex> lock(_send_lock);//加锁

int send_que_size = _send_que.size();//获取当前发送队列大小
if (send_que_size > MAX_SENDQUE) {
cout << "session: " << _uuid << " send que fulled, size is " << MAX_SENDQUE << endl;
return;
}

//将消息放入发送队列
_send_que.push(make_shared<MsgNode>(msg, max_length));

//如果当前发送队列不为空,则说明有消息正在发送,不需要再次发送
if (send_que_size > 0) {
return;
}
//发送消息
auto& msgnode = _send_que.front();
boost::asio::async_write(_socket, boost::asio::buffer(msgnode->_data, msgnode->_total_len),
std::bind(&CSession::HandleWrite, this, std::placeholders::_1, SharedSelf()));
}

//写回调
void CSession::HandleWrite(const boost::system::error_code& error, std::shared_ptr<CSession> shared_self) {

if (!error) {
std::lock_guard<std::mutex> lock(_send_lock);//加锁
cout << "send data " << _send_que.front()->_data + HEAD_LENGTH << endl;

_send_que.pop();//write成功,一定发送完一条数据,所以直接pop

//如果发送队列不为空,则继续发送
if (!_send_que.empty()) {
auto& msgnode = _send_que.front();
boost::asio::async_write(_socket, boost::asio::buffer(msgnode->_data, msgnode->_total_len),
std::bind(&CSession::HandleWrite, this, std::placeholders::_1, shared_self));
}
}
else {
std::cout << "handle write failed, error is " << error.what() << endl;
Close();//关闭会话
_server->ClearSession(_uuid);//从服务器中移除
}
}

//读回调
void CSession::HandleRead(const boost::system::error_code& error, size_t bytes_transferred, std::shared_ptr<CSession> shared_self) {
if (!error) {
//已经移动的字符数
int copy_len = 0;

//如果接受到了数据,则继续接收
while (bytes_transferred > 0) {

//如果未处理头部,则先处理头部
if (!_b_head_parse) {
//情况1:收到的数据不足头部大小
if (bytes_transferred + _recv_head_node->_cur_len < HEAD_LENGTH) {

//将数据复制到头部节点
memcpy(_recv_head_node->_data + _recv_head_node->_cur_len, _data + copy_len, bytes_transferred);
_recv_head_node->_cur_len += bytes_transferred;

//清空消息缓冲区,继续接收
::memset(_data, 0, MAX_LENGTH);

_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
return;
}
//情况2:收到的数据比头部多

//头部剩余未复制的长度
int head_remain = HEAD_LENGTH - _recv_head_node->_cur_len;
memcpy(_recv_head_node->_data + _recv_head_node->_cur_len, _data + copy_len, head_remain);

//更新已处理的data长度和剩余未处理的长度
copy_len += head_remain;
bytes_transferred -= head_remain;

//获取头部数据(应该接受到的消息,而不是真正接受到的消息)
short data_len = 0;
memcpy(&data_len, _recv_head_node->_data, HEAD_LENGTH);

//网络字节序转化为本地字节序
data_len = boost::asio::detail::socket_ops::network_to_host_short(data_len);
cout << "data_len is " << data_len << endl;

//头部长度非法
if (data_len > MAX_LENGTH) {
std::cout << "invalid data length is " << data_len << endl;
_server->ClearSession(_uuid); //从服务器中移除
return;
}

//创建接收消息节点
_recv_msg_node = make_shared<MsgNode>(data_len);

//情况3:消息的长度小于头部规定的长度,说明数据未收全,则先将部分消息放到接收节点里
if (bytes_transferred < data_len) {
memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred);
_recv_msg_node->_cur_len += bytes_transferred;

//清空消息缓冲区,继续接收
::memset(_data, 0, MAX_LENGTH);

//继续接收
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));

//头部处理完成,下一个回调处理头部。
_b_head_parse = true;
return;
}

//情况4:接受的长度 > 头部规定的长度,说明数据已经收全,则将消息放到接收节点里
memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, data_len);
_recv_msg_node->_cur_len += data_len;
copy_len += data_len;
bytes_transferred -= data_len;

//将最后一个位置置为'\0',表示一个消息的结束(其实也不需要)
_recv_msg_node->_data[_recv_msg_node->_total_len] = '\0';
cout << "receive data is " << _recv_msg_node->_data << endl;

//此处可以调用Send发送测试
Send(_recv_msg_node->_data, _recv_msg_node->_total_len);

//情况5:继续轮询剩余未处理数据(下一个头部)
_b_head_parse = false;
_recv_head_node->Clear();//清空头部节点

//如果剩余数据没有了,则继续接收
if (bytes_transferred <= 0) {
//清空消息缓冲区,继续接收
::memset(_data, 0, MAX_LENGTH);

//继续接收
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
return;
}

//否则处理头部
continue;
}

//已经处理完头部,处理上次未接受完的消息数据


//获取应该处理的长度
int remain_msg = _recv_msg_node->_total_len - _recv_msg_node->_cur_len;

//情况1:接受的数据<应该接受的数据
if (bytes_transferred < remain_msg) {
memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred);
_recv_msg_node->_cur_len += bytes_transferred;

//清空消息缓冲区,继续接收
::memset(_data, 0, MAX_LENGTH);
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
return;
}

//情况2:接受的数据>应该接受的数据
memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, remain_msg);
_recv_msg_node->_cur_len += remain_msg;
bytes_transferred -= remain_msg;
copy_len += remain_msg;

//将最后一个位置置为'\0',表示一个消息的结束(其实也不需要)
_recv_msg_node->_data[_recv_msg_node->_total_len] = '\0';
cout << "receive data is " << _recv_msg_node->_data << endl;
//此处可以调用Send发送测试
Send(_recv_msg_node->_data, _recv_msg_node->_total_len);


//处理下一个消息的头部
_b_head_parse = false;

//清空头部节点
_recv_head_node->Clear();

//如果剩余数据没有了,则继续接收
if (bytes_transferred <= 0) {
::memset(_data, 0, MAX_LENGTH);
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
return;
}
//否则处理头部
continue;
}
}
else {
std::cout << "handle read failed, error is " << error.what() << endl;
Close(); //关闭会话
_server->ClearSession(_uuid);//从服务器中移除
}
}

处理粘包具体步骤:

  1. 判断是否在读头部,分为两种情况
  2. 读头部:
    • 收到的数据还不够2字节,那么头部构造不出来,继续异步接受,return
    • 收到的数据够头部,解析出头部,然后将剩余的数据与解析出的数据对比
    • 如果剩余的数据少于解析出的头部,那么接着读,并且下一次读到(内容区)
    • 如果剩余的数据大于解析出的头部,构造完整消息,如果还有数据,continue,否则接着监听
  3. 读内容:
    • 收到的数据不够装满内容区,则接着监听,return
    • 收到的数据大于内容区域,内容数据构造完后,下一次需要读,如果剩余数据为0,则接着监听,如果还有数据,则continue

Cserver类

定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#pragma once
#include <boost/asio.hpp>
#include "CSession.h"
#include <memory.h>
#include <map>
using namespace std;
using boost::asio::ip::tcp;
class CServer
{
public:
//端口号
CServer(boost::asio::io_context& io_context, short port);
void ClearSession(std::string);
private:
//接受回调函数
void HandleAccept(shared_ptr<CSession>, const boost::system::error_code& error);

//开始接受
void StartAccept();


boost::asio::io_context& _io_context;
short _port;
tcp::acceptor _acceptor;//接收器(需要端点)

//会话列表
std::map<std::string, shared_ptr<CSession>> _sessions;
};

实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#include "CServer.h"
#include <iostream>
CServer::CServer(boost::asio::io_context& io_context, short port) :_io_context(io_context), _port(port),
_acceptor(io_context, tcp::endpoint(tcp::v4(), port))
{
cout << "Server start success, listen on port : " << _port << endl;
StartAccept();
}

void CServer::HandleAccept(shared_ptr<CSession> new_session, const boost::system::error_code& error) {
if (!error) {
new_session->Start();
_sessions.insert(make_pair(new_session->GetUuid(), new_session));
}
else {
cout << "session accept failed, error is " << error.what() << endl;
}

StartAccept();
}

void CServer::StartAccept() {
shared_ptr<CSession> new_session = make_shared<CSession>(_io_context, this);
_acceptor.async_accept(new_session->GetSocket(), std::bind(&CServer::HandleAccept, this, new_session, placeholders::_1));
}

void CServer::ClearSession(std::string uuid) {
_sessions.erase(uuid);
}

总结

该服务虽然实现了粘包处理,但是服务器仍存在不足,比如当客户端和服务器处于不同平台时收发数据会出现异常,根本原因是未处理大小端模式的问题


处理网络粘包问题
http://example.com/2023/11/15/cpp/处理网络粘包问题/
作者
Mrxiad
发布于
2023年11月15日
许可协议