简介
本文介绍异步读写操作。
定义个session类,这个session类表示服务器处理客户端连接的管理类
其中有socket对象。
1 2 3 4 5 6 7
| class Session { public: Session(std::shared_ptr<asio::ip::tcp::socket> socket); void Connect(const asio::ip::tcp::endpoint& ep); private: std::shared_ptr<asio::ip::tcp::socket> _socket; };
|
session类定义了一个socket成员变量,负责处理对端的连接读写,封装了Connect函数
1 2 3
| void Session::Connect(const asio::ip::tcp::endpoint &ep) { _socket->connect(ep); }
|
异步写操作
在写操作前,我们先封装一个MsgNode结构,用来管理要发送和接收的数据,该结构包含数据域首地址,数据的总长度,以及已经处理的长度(已读的长度或者已写的长度)
async_write_some方式
一次性不一定发送完数据
定义MsgNode类用于封装数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| const int RECVSIZE = 1024; class MsgNode { public : MsgNode(const char* msg, int total_len): _total_len(total_len), _cur_len(0){ _msg = new char[total_len]; memcpy(_msg, msg, total_len); } MsgNode(int total_len) :_total_len(total_len), _cur_len(0) { _msg = new char[total_len]; } ~MsgNode(){ delete[]_msg; } char* _msg; int _total_len; int _cur_len; };
|
为Session添加异步写操作
1 2 3 4 5 6 7 8 9 10
| class Session{ public: void WriteCallBackErr(const boost::system::error_code & ec, std::size_t bytes_transferred, std::shared_ptr<MsgNode>); void WriteToSocketErr(const std::string& buf); private: std::shared_ptr<MsgNode> _send_node; };
|
实现void WriteToSocketErr函数
1 2 3 4 5 6 7 8 9
| void Session::WriteToSocketErr(const std::string& buf) { _send_node = make_shared<MsgNode>(buf.c_str(), buf.length()); this->_socket->async_write_some(asio::buffer(_send_node->_msg, _send_node->_total_len), std::bind(&Session::WriteCallBackErr, this, std::placeholders::_1, std::placeholders::_2, _send_node)); }
|
async_wirte_some函数第一个参数是buffer,第二个参数是回调函数)
实现WriteCallBackErr函数
因为WriteCallBackErr函数为三个参数且为成员函数,而async_write_some需要的回调函数为两个参数,所以我们通过bind将三个参数转换为两个参数的普通函数。
1 2 3 4 5 6 7 8 9 10 11
| void Session::WriteCallBackErr(const boost::system::error_code& ec, std::size_t bytes_transferred, std::shared_ptr<MsgNode> msg_node) { if (bytes_transferred + msg_node->_cur_len < msg_node->_total_len) { _send_node->_cur_len += bytes_transferred; this->_socket->async_write_some(asio::buffer(_send_node->_msg+_send_node->_cur_len, _send_node->_total_len-_send_node->_cur_len), std::bind(&Session::WriteCallBackErr, this, std::placeholders::_1, std::placeholders::_2, _send_node)); } }
|
单凭这两个不可以投入使用,具体原因是
我们可以通过队列保证应用层的发送顺序。我们在Session中定义一个发送队列,然后重新定义正确的异步发送函数和回调处理
定义queue写入消息(重点)
1 2 3 4 5 6 7 8 9
| class Session{ public: void WriteCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred); void WriteToSocket(const std::string &buf); private: std::queue<std::shared_ptr<MsgNode>> _send_queue; std::shared_ptr<asio::ip::tcp::socket> _socket; bool _send_pending; };
|
实现queue异步写入功能
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| void Session::WriteToSocket(const std::string& buf){ _send_queue.emplace(new MsgNode(buf.c_str(), buf.length())); if (_send_pending) { return; } this->_socket->async_write_some(asio::buffer(buf), std::bind(&Session::WriteCallBack, this, std::placeholders::_1, std::placeholders::_2)); _send_pending = true; }
void Session::WriteCallBack(const boost::system::error_code & ec, std::size_t bytes_transferred){ if (ec.value() != 0) { std::cout << "Error , code is " << ec.value() << " . Message is " << ec.message(); return; } auto & send_data = _send_queue.front(); send_data->_cur_len += bytes_transferred; if (send_data->_cur_len < send_data->_total_len) { this->_socket->async_write_some(asio::buffer(send_data->_msg + send_data->_cur_len, send_data->_total_len-send_data->_cur_len), std::bind(&Session::WriteCallBack, this, std::placeholders::_1, std::placeholders::_2)); return; } _send_queue.pop(); if (_send_queue.empty()) { _send_pending = false; } if (!_send_queue.empty()) { auto& send_data = _send_queue.front(); this->_socket->async_write_some(asio::buffer(send_data->_msg + send_data->_cur_len, send_data->_total_len - send_data->_cur_len), std::bind(&Session::WriteCallBack, this, std::placeholders::_1, std::placeholders::_2)); } _send_pending=false; }
|
async_send方式
一次性发送完数据
其内部的实现原理就是帮我们不断的调用async_write_some直到完成发送,async_send不能和async_write_some混合使用,我们基于async_send封装另外一个发送函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| void Session::WriteAllToSocket(const std::string& buf) { _send_queue.emplace(new MsgNode(buf.c_str(), buf.length())); if (_send_pending) { return; } this->_socket->async_send(asio::buffer(buf), std::bind(&Session::WriteAllCallBack, this, std::placeholders::_1, std::placeholders::_2)); _send_pending = true; } void Session::WriteAllCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred){ if (ec.value() != 0) { std::cout << "Error occured! Error code = " << ec.value() << ". Message: " << ec.message(); return; } _send_queue.pop(); if (_send_queue.empty()) { _send_pending = false; } if (!_send_queue.empty()) { auto& send_data = _send_queue.front(); this->_socket->async_send(asio::buffer(send_data->_msg + send_data->_cur_len, send_data->_total_len - send_data->_cur_len), std::bind(&Session::WriteAllCallBack, this, std::placeholders::_1, std::placeholders::_2)); } }
|
异步读操作
async_read_some方式
触发的回调函数获取的读数据的长度可能会小于要求读取的总长度
1 2 3 4 5 6 7 8 9
| class Session { public: void ReadFromSocket(); void ReadCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred); private: std::shared_ptr<asio::ip::tcp::socket> _socket; std::shared_ptr<MsgNode> _recv_node; bool _recv_pending; };
|
recv_node用来缓存接收的数据,_recv_pending为true表示节点正在接收数据,还未接受完。
具体实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| void Session::ReadFromSocket() { if (_recv_pending) { return; }
_recv_node = std::make_shared<MsgNode>(RECVSIZE); _socket->async_read_some(asio::buffer(_recv_node->_msg, _recv_node->_total_len), std::bind(&Session::ReadCallBack, this, std::placeholders::_1, std::placeholders::_2)); _recv_pending = true; } void Session::ReadCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred){ _recv_node->_cur_len += bytes_transferred; if (_recv_node->_cur_len < _recv_node->_total_len) { _socket->async_read_some(asio::buffer(_recv_node->_msg+_recv_node->_cur_len, _recv_node->_total_len - _recv_node->_cur_len), std::bind(&Session::ReadCallBack, this, std::placeholders::_1, std::placeholders::_2)); return; } _recv_pending = false; _recv_node = nullptr; }
|
async_receive方式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| void Session::ReadAllFromSocket(const std::string& buf) { if (_recv_pending) { return; }
_recv_node = std::make_shared<MsgNode>(RECVSIZE); _socket->async_receive(asio::buffer(_recv_node->_msg, _recv_node->_total_len), std::bind(&Session::ReadAllCallBack, this, std::placeholders::_1, std::placeholders::_2)); _recv_pending = true; } void Session::ReadAllCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred) { _recv_node->_cur_len += bytes_transferred; _recv_pending = false; _recv_node = nullptr; }
|
同样,async_read_some和async_receive不能混合使用,否则会出现逻辑问题。
总结
流程
(a)对于异步写操作,流程如下
- 设置队列
- 将需要写的放入队列里
- 如果此时在写,则return
- 否则开启写操作,绑定回调函数,然后标志”正在写“
- 回调函数实现递归逻辑,按照顺序完成队列中所有的写操作
(b)对于异步读操作,流程如下
- 如果此时在读,则return
- 否则开启读操作,绑定回调函数,然后标志”正在读“
- 回调函数实现递归逻辑,读完为止
注意
对于写操作,尽量使用async_send
对于读操作,尽量使用async_read_some
重点
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
|
void Session::WriteToSocketErr(const std::string& buf) { _send_node = make_shared<MsgNode>(buf.c_str(), buf.length()); this->_socket->async_write_some(asio::buffer(_send_node->_msg, _send_node->_total_len), std::bind(&Session::WriteCallBackErr, this, std::placeholders::_1, std::placeholders::_2, _send_node)); }
void Session::WriteAllToSocket(const std::string& buf) { _send_queue.emplace(new MsgNode(buf.c_str(), buf.length())); this->_socket->async_send(asio::buffer(buf), std::bind(&Session::WriteAllCallBack, this, std::placeholders::_1, std::placeholders::_2)); }
void Session::ReadFromSocket() { _recv_node = std::make_shared<MsgNode>(RECVSIZE); _socket->async_read_some(asio::buffer(_recv_node->_msg, _recv_node->_total_len), std::bind(&Session::ReadCallBack, this, std::placeholders::_1, std::placeholders::_2)); }
void Session::ReadAllFromSocket(const std::string& buf) { _recv_node = std::make_shared<MsgNode>(RECVSIZE); _socket->async_receive(asio::buffer(_recv_node->_msg, _recv_node->_total_len), std::bind(&Session::ReadAllCallBack, this, std::placeholders::_1, std::placeholders::_2)); }
|