Kiến thức Linux - I/O Nâng cao (Phần 2)
I. Đa kênh I/O - Phương pháp Poll
1. Giao diện Poll
Tham số:
- fds: Địa chỉ bắt đầu của cấu trúc struct pollfd;
- nfds: Số lượng cấu trúc được truyền vào;
- timeout:
- 0: Chờ không chặn;
- -1: Chờ chặn;
- Giá trị > 0: Chờ n mili giây rồi trả về;
Giá trị trả về:
- > 0: Có bao nhiêu file descriptor đã sẵn sàng;
- = 0: Hết thời gian chờ timeout;
- < 0: Poll thất bại;
Tham số đầu vào và đầu ra của poll được tách biệt vì cấu trúc struct pollfd có nhiều thành viên có thể thực hiện các chức năng khác nhau:
- fd: File descriptor, một khi được thiết lập sẽ không thay đổi khi gọi và trả về;
- events: Người dùng thông báo cho kernel cần quan tâm đến sự kiện nào của fd này;
- revents: Kernel thông báo cho người dùng sự kiện nào của fd này đã sẵn sàng;
2. Triển khai Poll
Quá trình xử lý fd của poll và select tương tự nhau, chỉ khác ở cách quản lý nhiều fd.
main.cc
#include "pollServer.hpp"
#include <memory>
int main()
{
std::unique_ptr<PollServer> svr(new PollServer);
svr->Start();
return 0;
}
pollServer.hpp
#ifndef __POLL_SVR_H__
#define __POLL_SVR_H__
#include <iostream>
#include <string>
#include <vector>
#include <poll.h>
#include <sys/time.h>
#include "Log.hpp"
#include "Sock.hpp"
#define FD_NONE -1
using namespace std;
class PollServer
{
public:
static const int max_fds = 100;
public:
PollServer(const uint16_t &port = 8080)
: _port(port)
, _max_fds(max_fds)
{
_listen_sock = Sock::Socket();
Sock::Bind(_listen_sock, _port);
Sock::Listen(_listen_sock);
logMessage(DEBUG, "%s", "Tạo socket cơ bản thành công");
_poll_fds = new struct pollfd[_max_fds];
for(int i = 0; i < _max_fds; i++)
{
_poll_fds[i].fd = FD_NONE;
_poll_fds[i].events = _poll_fds[i].revents = 0;
}
_poll_fds[0].fd = _listen_sock;
_poll_fds[0].events = POLLIN;
_timeout = 1000;
}
void Start()
{
while (true)
{
int ready_count = poll(_poll_fds, _max_fds, _timeout);
switch (ready_count)
{
case 0:
logMessage(DEBUG, "Hết thời gian chờ");
break;
case -1:
logMessage(WARNING, "Lỗi poll: %d : %s", errno, strerror(errno));
break;
default:
logMessage(DEBUG, "Nhận được sự kiện kết nối mới");
ProcessEvents();
break;
}
}
}
~PollServer()
{
if (_listen_sock >= 0)
{
close(_listen_sock);
}
if(_poll_fds)
{
delete[] _poll_fds;
}
}
private:
void ProcessEvents()
{
for (int i = 0; i < _max_fds; i++)
{
if (_poll_fds[i].fd == FD_NONE)
{
continue;
}
if (_poll_fds[i].revents & POLLIN)
{
if (_poll_fds[i].fd == _listen_sock)
{
AcceptConnection();
}
else
{
ReceiveData(i);
}
}
}
}
void AcceptConnection()
{
string client_ip;
uint16_t client_port = 0;
int new_sock = Sock::Accept(_listen_sock, &client_ip, &client_port);
if (new_sock < 0)
{
logMessage(WARNING, "%s", "Lỗi accept");
return;
}
logMessage(DEBUG, "Kết nối mới thành công : [%s:%d] : %d", client_ip.c_str(), client_port, new_sock);
int pos = 1;
for (; pos < _max_fds; pos++)
{
if (_poll_fds[pos].fd == FD_NONE)
{
break;
}
}
if (pos == _max_fds)
{
logMessage(WARNING, "%s:%d", "Máy chủ poll đã đầy, đóng: %d", new_sock);
close(new_sock);
}
else
{
_poll_fds[pos].fd = new_sock;
_poll_fds[pos].events = POLLIN;
}
}
void ReceiveData(int pos)
{
logMessage(DEBUG, "Có dữ liệu vào, nhận sự kiện IO: %d", _poll_fds[pos].fd);
char buffer[1024];
int bytes = recv(_poll_fds[pos].fd, buffer, sizeof(buffer) - 1, 0);
if (bytes > 0)
{
buffer[bytes] = 0;
logMessage(DEBUG, "Client[%d]# %s", _poll_fds[pos].fd, buffer);
}
else if (bytes == 0)
{
logMessage(DEBUG, "Client[%d] thoát, máy chủ cũng thoát...", _poll_fds[pos].fd);
close(_poll_fds[pos].fd);
_poll_fds[pos].fd = FD_NONE;
_poll_fds[pos].events = 0;
}
else
{
logMessage(WARNING, "%d sock recv lỗi, %d : %s", _poll_fds[pos].fd, errno, strerror(errno));
close(_poll_fds[pos].fd);
_poll_fds[pos].fd = FD_NONE;
_poll_fds[pos].events = 0;
}
}
private:
uint16_t _port;
int _listen_sock;
struct pollfd *_poll_fds;
int _max_fds;
int _timeout;
};
#endif
3. Ưu và nhược điểm của Poll
Ưu điểm:
- Hiệu suất cao;
- Hiệu quả với nhiều kết nối nhưng chỉ một số nhỏ là hoạt động, tiết kiệm tài nguyên;
- Tham số đầu vào và đầu ra tách biệt, không cần thiết lập lại nhiều lần;
- Không giới hạn số lượng fd có thể quản lý;
Nhược điểm:
- Vẫn cần duyệt qua nhiều fd ở tầng người dùng để kiểm tra sự kiện sẵn sàng;
- Cần sao chép từ kernel sang người dùng - không thể tránh khỏi;
- Mã phức tạp hơn select;
II. Đa kênh I/O - Phương pháp Epoll
1. Giao diện Epoll
Epoll có ba giao diện:
epoll_create
- Tạo mô hình epoll;
- Tham số size hiện nay phần lớn đã bị bỏ qua, thường ghi là 512 hoặc 256;
- Giá trị trả về là một file descriptor;
epoll_ctl
- epfd: fd trả về bởi epoll_create;
- op: Thao tác thực hiện trên mô hình epoll (thêm, xóa, sửa);
- fd: File descriptor cần quan tâm;
- event: Sự kiện cần quan tâm của fd;
epoll_wait
- Lấy các sự kiện đã sẵn sàng trong mô hình epoll, tham số timeout giống như poll;
- epfd: fd trả về bởi epoll_create;
- events: Mảng cấu trúc epoll_events đã được cấp phát;
- maxevents: Kích thước của mảng events;
2. Nguyên lý hoạt động của Epoll
Nguyên lý hoạt động của Epoll:
- Gọi epoll_create để tạo một mô hình epoll, hệ điều hành sẽ duy trì một cấu trúc cây đỏ-đen cho người dùng;
- Trong epoll, với mỗi sự kiện, một cấu trúc epitem sẽ được tạo ra;
- HĐH còn duy trì một hàng đợi sẵn sàng để thông báo cho người dùng những sự kiện nào đã sẵn sàng;
- HĐH có thể thiết lập một hàm callback có thể được đăng ký vào tầng dưới,一旦底层有数据,就会调用回调函数;
3. Triển khai máy chủ Epoll
epoll.hpp
#pragma once
#include <iostream>
#include <sys/epoll.h>
#include <unistd.h>
class EpollWrapper
{
public:
static const int max_events = 256;
public:
static int CreateEpoll()
{
int epfd = epoll_create(max_events);
if(epfd > 0)
{
return epfd;
}
exit(5);
}
static bool ControlEpoll(int epfd, int operation, int sock, uint32_t events)
{
struct epoll_event ev;
ev.events = events;
ev.data.fd = sock;
int result = epoll_ctl(epfd, operation, sock, &ev);
return result == 0;
}
static int WaitEpoll(int epfd, struct epoll_event revs[], int num, int timeout)
{
return epoll_wait(epfd, revs, num, timeout);
}
};
epollServer.hpp
#ifndef __EPOLL_SERVER_HPP__
#define __EPOLL_SERVER_HPP__
#include <iostream>
#include <string>
#include <functional>
#include <cassert>
#include "Log.hpp"
#include "Sock.hpp"
#include "epoll.hpp"
namespace EpollNamespace
{
const static int default_port = 8080;
const static int events_num = 64;
class EpollServer
{
using HandlerFunc = std::function<void(std::string)>;
public:
EpollServer(HandlerFunc dataHandler, const int& port = default_port)
: _port(port)
, _ready_events_num(events_num)
, _dataHandler(dataHandler)
{
_ready_events = new struct epoll_event[_ready_events_num];
_listen_sock = Sock::Socket();
Sock::Bind(_listen_sock, _port);
Sock::Listen(_listen_sock);
_epfd = EpollWrapper::CreateEpoll();
logMessage(DEBUG, "Khởi tạo thành công, listen_sock: %d, epfd: %d", _listen_sock, _epfd);
if(!EpollWrapper::ControlEpoll(_epfd, EPOLL_CTL_ADD, _listen_sock, EPOLLIN))
{
exit(6);
}
logMessage(DEBUG, "Thêm listen_sock vào epoll thành công.");
}
void AcceptNewConnection()
{
std::string client_ip;
uint16_t client_port;
int new_sock = Sock::Accept(_listen_sock, &client_ip, &client_port);
if(new_sock < 0)
{
logMessage(WARNING, "Lỗi accept!");
return;
}
if(!EpollWrapper::ControlEpoll(_epfd, EPOLL_CTL_ADD, new_sock, EPOLLIN))
{
return;
}
logMessage(DEBUG, "Thêm sock mới : %d vào epoll thành công", new_sock);
}
void ReceiveData(int sock)
{
char buffer[10240];
ssize_t bytes = recv(sock, buffer, sizeof(buffer) - 1, 0);
if(bytes > 0)
{
buffer[bytes] = 0;
_dataHandler(buffer);
}
else if(bytes == 0)
{
bool res = EpollWrapper::ControlEpoll(_epfd, EPOLL_CTL_DEL, sock, 0);
assert(res);
close(sock);
logMessage(NORMAL, "Client %d thoát, máy chủ cũng thoát...", sock);
}
else
{
bool res = EpollWrapper::ControlEpoll(_epfd, EPOLL_CTL_DEL, sock, 0);
assert(res);
close(sock);
logMessage(NORMAL, "Client recv %d lỗi, đóng sock lỗi", sock);
}
}
void ProcessReadyEvents(int count)
{
assert(count > 0);
for(int i = 0; i < count; i++)
{
uint32_t ready_events = _ready_events[i].events;
int current_sock = _ready_events[i].data.fd;
if(ready_events & EPOLLIN)
{
if(current_sock == _listen_sock)
{
AcceptNewConnection();
}
else
{
ReceiveData(current_sock);
}
}
}
}
void LoopOnce(int timeout = -1)
{
int ready_count = EpollWrapper::WaitEpoll(_epfd, _ready_events, _ready_events_num, timeout);
switch(ready_count)
{
case 0:
logMessage(DEBUG, "Hết thời gian chờ...");
break;
case -1:
logMessage(WARNING, "Lỗi chờ epoll: %s", strerror(errno));
break;
default:
logMessage(DEBUG, "Nhận được sự kiện");
ProcessReadyEvents(ready_count);
break;
}
}
void Start()
{
while(true)
{
LoopOnce();
}
}
~EpollServer()
{
if(_listen_sock > 0)
{
close(_listen_sock);
}
if(_epfd > 0)
{
close(_epfd);
}
if(_ready_events)
{
delete[] _ready_events;
}
}
private:
int _listen_sock;
int _epfd;
uint16_t _port;
struct epoll_event* _ready_events;
int _ready_events_num;
HandlerFunc _dataHandler;
};
}
#endif
4. Ưu điểm của Epoll
- Giao diện dễ sử dụng: Dù được chia thành ba hàm, nhưng sử dụng thuận tiện và hiệu quả hơn, không cần thiết lập file descriptor cần quan tâm ở mỗi vòng lặp;
- Sao chép dữ liệu nhẹ nhàng: Chỉ sao chép cấu trúc file descriptor vào kernel khi phù hợp (thao tác này không thường xuyên như select/poll);
- cơ chế callback sự kiện: Tránh việc duyệt, sử dụng hàm callback để đưa cấu trúc file descriptor sẵn sàng vào hàng đợi, thời gian truy cập hàng đợi là O(1);
- Không giới hạn số lượng: Số lượng file descriptor không giới hạn;
5. Chế độ làm việc của Epoll
Epoll có hai chế độ làm việc:
- Trigger mức (LT): Nếu trong máy chủ epoll có dữ liệu của file descriptor, nó sẽ liên tục thông báo cho fd đó (chế độ mặc định của select, poll, epoll);
- Trigger cạnh (ET): Chỉ khi lần đầu tiên có dữ liệu hoặc dữ liệu tăng lên (thay đổi) thì máy chủ mới thông báo cho fd đó;
6. Máy chủ Reactor
Máy chủ Reactor là máy chủ epoll hoạt động ở chế độ ET.
main.cc - Máy chủ tính mạng
#include "TcpServer.hpp"
#include <memory>
static Response calculate(const Request &req)
{
Response resp(0, 0);
switch (req.op_)
{
case '+':
resp.result_ = req.x_ + req.y_;
break;
case '-':
resp.result_ = req.x_ - req.y_;
break;
case '*':
resp.result_ = req.x_ * req.y_;
break;
case '/':
if (0 == req.y_)
resp.code_ = 1;
else
resp.result_ = req.x_ / req.y_;
break;
case '%':
if (0 == req.y_)
resp.code_ = 2;
else
resp.result_ = req.x_ % req.y_;
break;
default:
resp.code_ = 3;
break;
}
return resp;
}
void networkCalculator(Connection* conn, std::string& request)
{
logMessage(DEBUG, "networkCalculator được gọi, nhận yêu cầu: %s", request.c_str());
Request req;
if(!req.Deserialize(request))
return;
Response resp = calculate(req);
std::string responseStr = resp.Serialize();
conn->outputBuffer += responseStr;
conn->server->EnableReadWrite(conn, true, true);
}
int main()
{
std::unique_ptr<TcpServer> server(new TcpServer());
server->DispatchHandler(networkCalculator);
return 0;
}