Kiến thức Linux - I/O Nâng cao (Phần 2)

Kiến thức Linux - I/O Nâng cao (Phần 2)

I. Đa kênh I/O - Phương pháp Poll

1. Giao diện Poll

Tham số:

  • fds: Địa chỉ bắt đầu của cấu trúc struct pollfd;
  • nfds: Số lượng cấu trúc được truyền vào;
  • timeout:
    • 0: Chờ không chặn;
    • -1: Chờ chặn;
    • Giá trị > 0: Chờ n mili giây rồi trả về;

Giá trị trả về:

  • > 0: Có bao nhiêu file descriptor đã sẵn sàng;
  • = 0: Hết thời gian chờ timeout;
  • < 0: Poll thất bại;

Tham số đầu vào và đầu ra của poll được tách biệt vì cấu trúc struct pollfd có nhiều thành viên có thể thực hiện các chức năng khác nhau:

  • fd: File descriptor, một khi được thiết lập sẽ không thay đổi khi gọi và trả về;
  • events: Người dùng thông báo cho kernel cần quan tâm đến sự kiện nào của fd này;
  • revents: Kernel thông báo cho người dùng sự kiện nào của fd này đã sẵn sàng;

2. Triển khai Poll

Quá trình xử lý fd của poll và select tương tự nhau, chỉ khác ở cách quản lý nhiều fd.

main.cc

#include "pollServer.hpp"
#include <memory>

int main()
{
    std::unique_ptr<PollServer> svr(new PollServer);
    svr->Start();

    return 0;
}

pollServer.hpp

#ifndef __POLL_SVR_H__
#define __POLL_SVR_H__

#include <iostream>
#include <string>
#include <vector>
#include <poll.h>
#include <sys/time.h>
#include "Log.hpp"
#include "Sock.hpp"

#define FD_NONE -1
using namespace std;

class PollServer
{
public:
    static const int max_fds = 100;

public:
    PollServer(const uint16_t &port = 8080)
        : _port(port)
        , _max_fds(max_fds)
    {
        _listen_sock = Sock::Socket();
        Sock::Bind(_listen_sock, _port);
        Sock::Listen(_listen_sock);
        logMessage(DEBUG, "%s", "Tạo socket cơ bản thành công");
        _poll_fds = new struct pollfd[_max_fds];
        for(int i = 0; i < _max_fds; i++)
        {
            _poll_fds[i].fd = FD_NONE;
            _poll_fds[i].events = _poll_fds[i].revents = 0;
        }
        _poll_fds[0].fd = _listen_sock;
        _poll_fds[0].events = POLLIN;
        _timeout = 1000;
    }

    void Start()
    {
        while (true)
        {
            int ready_count = poll(_poll_fds, _max_fds, _timeout);

            switch (ready_count)
            {
            case 0:
                logMessage(DEBUG, "Hết thời gian chờ");
                break;
            case -1:
                logMessage(WARNING, "Lỗi poll: %d : %s", errno, strerror(errno));
                break;
            default:
                logMessage(DEBUG, "Nhận được sự kiện kết nối mới");
                ProcessEvents();
                break;
            }
        }
    }

    ~PollServer()
    {
        if (_listen_sock >= 0)
        {
            close(_listen_sock);
        }
        if(_poll_fds)
        {
            delete[] _poll_fds;
        }
    }

private:
    void ProcessEvents()
    {
        for (int i = 0; i < _max_fds; i++)
        {
            if (_poll_fds[i].fd == FD_NONE)
            {
                continue;
            }
            if (_poll_fds[i].revents & POLLIN)
            {
                if (_poll_fds[i].fd == _listen_sock)
                {
                    AcceptConnection();
                }
                else
                {
                    ReceiveData(i);
                }
            }
        }
    }

    void AcceptConnection()
    {
        string client_ip;
        uint16_t client_port = 0;
        int new_sock = Sock::Accept(_listen_sock, &client_ip, &client_port);
        if (new_sock < 0)
        {
            logMessage(WARNING, "%s", "Lỗi accept");
            return;
        }
        logMessage(DEBUG, "Kết nối mới thành công : [%s:%d] : %d", client_ip.c_str(), client_port, new_sock);

        int pos = 1;
        for (; pos < _max_fds; pos++)
        {
            if (_poll_fds[pos].fd == FD_NONE)
            {
                break;
            }
        }
        if (pos == _max_fds)
        {
            logMessage(WARNING, "%s:%d", "Máy chủ poll đã đầy, đóng: %d", new_sock);
            close(new_sock);
        }
        else
        {
            _poll_fds[pos].fd = new_sock;
            _poll_fds[pos].events = POLLIN;
        }
    }

    void ReceiveData(int pos)
    {
        logMessage(DEBUG, "Có dữ liệu vào, nhận sự kiện IO: %d", _poll_fds[pos].fd);
        char buffer[1024];
        int bytes = recv(_poll_fds[pos].fd, buffer, sizeof(buffer) - 1, 0);
        if (bytes > 0)
        {
            buffer[bytes] = 0;
            logMessage(DEBUG, "Client[%d]# %s", _poll_fds[pos].fd, buffer);
        }
        else if (bytes == 0)
        {
            logMessage(DEBUG, "Client[%d] thoát, máy chủ cũng thoát...", _poll_fds[pos].fd);
            close(_poll_fds[pos].fd);
            _poll_fds[pos].fd = FD_NONE;
            _poll_fds[pos].events = 0;
        }
        else
        {
            logMessage(WARNING, "%d sock recv lỗi, %d : %s", _poll_fds[pos].fd, errno, strerror(errno));
            close(_poll_fds[pos].fd);
            _poll_fds[pos].fd = FD_NONE;
            _poll_fds[pos].events = 0;
        }
    }

private:
    uint16_t _port;
    int _listen_sock;
    struct pollfd *_poll_fds;
    int _max_fds;
    int _timeout;
};
#endif

3. Ưu và nhược điểm của Poll

Ưu điểm:

  • Hiệu suất cao;
  • Hiệu quả với nhiều kết nối nhưng chỉ một số nhỏ là hoạt động, tiết kiệm tài nguyên;
  • Tham số đầu vào và đầu ra tách biệt, không cần thiết lập lại nhiều lần;
  • Không giới hạn số lượng fd có thể quản lý;

Nhược điểm:

  • Vẫn cần duyệt qua nhiều fd ở tầng người dùng để kiểm tra sự kiện sẵn sàng;
  • Cần sao chép từ kernel sang người dùng - không thể tránh khỏi;
  • Mã phức tạp hơn select;

II. Đa kênh I/O - Phương pháp Epoll

1. Giao diện Epoll

Epoll có ba giao diện:

epoll_create

  • Tạo mô hình epoll;
  • Tham số size hiện nay phần lớn đã bị bỏ qua, thường ghi là 512 hoặc 256;
  • Giá trị trả về là một file descriptor;

epoll_ctl

  • epfd: fd trả về bởi epoll_create;
  • op: Thao tác thực hiện trên mô hình epoll (thêm, xóa, sửa);
  • fd: File descriptor cần quan tâm;
  • event: Sự kiện cần quan tâm của fd;

epoll_wait

  • Lấy các sự kiện đã sẵn sàng trong mô hình epoll, tham số timeout giống như poll;
  • epfd: fd trả về bởi epoll_create;
  • events: Mảng cấu trúc epoll_events đã được cấp phát;
  • maxevents: Kích thước của mảng events;

2. Nguyên lý hoạt động của Epoll

Nguyên lý hoạt động của Epoll:

  • Gọi epoll_create để tạo một mô hình epoll, hệ điều hành sẽ duy trì một cấu trúc cây đỏ-đen cho người dùng;
  • Trong epoll, với mỗi sự kiện, một cấu trúc epitem sẽ được tạo ra;
  • HĐH còn duy trì một hàng đợi sẵn sàng để thông báo cho người dùng những sự kiện nào đã sẵn sàng;
  • HĐH có thể thiết lập một hàm callback có thể được đăng ký vào tầng dưới,一旦底层有数据,就会调用回调函数;

3. Triển khai máy chủ Epoll

epoll.hpp

#pragma once

#include <iostream>
#include <sys/epoll.h>
#include <unistd.h>

class EpollWrapper
{
public:
    static const int max_events = 256;
public:
    static int CreateEpoll()
    {
        int epfd = epoll_create(max_events);
        if(epfd > 0)
        {
            return epfd;
        }
        exit(5);
    }

    static bool ControlEpoll(int epfd, int operation, int sock, uint32_t events)
    {
        struct epoll_event ev;
        ev.events = events;
        ev.data.fd = sock;
        int result = epoll_ctl(epfd, operation, sock, &ev);
        return result == 0;
    }

    static int WaitEpoll(int epfd, struct epoll_event revs[], int num, int timeout)
    {
        return epoll_wait(epfd, revs, num, timeout);
    }
};

epollServer.hpp

#ifndef __EPOLL_SERVER_HPP__
#define __EPOLL_SERVER_HPP__
#include <iostream>
#include <string>
#include <functional>
#include <cassert>
#include "Log.hpp"
#include "Sock.hpp"
#include "epoll.hpp"

namespace EpollNamespace
{
    const static int default_port = 8080;
    const static int events_num = 64;
    
    class EpollServer
    {
        using HandlerFunc = std::function<void(std::string)>;
    public:
        EpollServer(HandlerFunc dataHandler, const int& port = default_port)
            : _port(port)
            , _ready_events_num(events_num)
            , _dataHandler(dataHandler)
        {
            _ready_events = new struct epoll_event[_ready_events_num];
            _listen_sock = Sock::Socket();
            Sock::Bind(_listen_sock, _port);
            Sock::Listen(_listen_sock);
            _epfd = EpollWrapper::CreateEpoll();
            logMessage(DEBUG, "Khởi tạo thành công, listen_sock: %d, epfd: %d", _listen_sock, _epfd);
            
            if(!EpollWrapper::ControlEpoll(_epfd, EPOLL_CTL_ADD, _listen_sock, EPOLLIN))
            {
                exit(6);
            }
            logMessage(DEBUG, "Thêm listen_sock vào epoll thành công.");
        }

        void AcceptNewConnection()
        {
            std::string client_ip;
            uint16_t client_port;
            int new_sock = Sock::Accept(_listen_sock, &client_ip, &client_port);
            if(new_sock < 0)
            {
                logMessage(WARNING, "Lỗi accept!");
                return;
            }
            
            if(!EpollWrapper::ControlEpoll(_epfd, EPOLL_CTL_ADD, new_sock, EPOLLIN))
            {
                return;
            }
            logMessage(DEBUG, "Thêm sock mới : %d vào epoll thành công", new_sock);
        }

        void ReceiveData(int sock)
        {
            char buffer[10240];
            ssize_t bytes = recv(sock, buffer, sizeof(buffer) - 1, 0);
            if(bytes > 0)
            {
                buffer[bytes] = 0;
                _dataHandler(buffer);
            }
            else if(bytes == 0)
            {
                bool res = EpollWrapper::ControlEpoll(_epfd, EPOLL_CTL_DEL, sock, 0);
                assert(res);
                close(sock);
                logMessage(NORMAL, "Client %d thoát, máy chủ cũng thoát...", sock);
            }
            else
            {
                bool res = EpollWrapper::ControlEpoll(_epfd, EPOLL_CTL_DEL, sock, 0);
                assert(res);
                close(sock);
                logMessage(NORMAL, "Client recv %d lỗi, đóng sock lỗi", sock);
            }
        }

        void ProcessReadyEvents(int count)
        {
            assert(count > 0);
            for(int i = 0; i < count; i++)
            {
                uint32_t ready_events = _ready_events[i].events;
                int current_sock = _ready_events[i].data.fd;
                
                if(ready_events & EPOLLIN)
                {
                    if(current_sock == _listen_sock)
                    {
                        AcceptNewConnection();
                    }
                    else 
                    {
                        ReceiveData(current_sock);
                    }
                }
            }
        }

        void LoopOnce(int timeout = -1)
        {
            int ready_count = EpollWrapper::WaitEpoll(_epfd, _ready_events, _ready_events_num, timeout);

            switch(ready_count)
            {
            case 0:
                logMessage(DEBUG, "Hết thời gian chờ...");
                break;
            case -1:
                logMessage(WARNING, "Lỗi chờ epoll: %s", strerror(errno));
                break;
            default:
                logMessage(DEBUG, "Nhận được sự kiện");
                ProcessReadyEvents(ready_count);
                break;
            }
        }

        void Start()
        {
            while(true)
            {
                LoopOnce();
            }
        }

        ~EpollServer()
        {
            if(_listen_sock > 0)
            {
                close(_listen_sock);
            }
            if(_epfd > 0)
            {
                close(_epfd);
            }
            if(_ready_events)
            {
                delete[] _ready_events;
            }
        }


    private:
        int _listen_sock;
        int _epfd;
        uint16_t _port;
        struct epoll_event* _ready_events;
        int _ready_events_num;
        HandlerFunc _dataHandler;
    };
}
#endif

4. Ưu điểm của Epoll

  • Giao diện dễ sử dụng: Dù được chia thành ba hàm, nhưng sử dụng thuận tiện và hiệu quả hơn, không cần thiết lập file descriptor cần quan tâm ở mỗi vòng lặp;
  • Sao chép dữ liệu nhẹ nhàng: Chỉ sao chép cấu trúc file descriptor vào kernel khi phù hợp (thao tác này không thường xuyên như select/poll);
  • cơ chế callback sự kiện: Tránh việc duyệt, sử dụng hàm callback để đưa cấu trúc file descriptor sẵn sàng vào hàng đợi, thời gian truy cập hàng đợi là O(1);
  • Không giới hạn số lượng: Số lượng file descriptor không giới hạn;

5. Chế độ làm việc của Epoll

Epoll có hai chế độ làm việc:

  • Trigger mức (LT): Nếu trong máy chủ epoll có dữ liệu của file descriptor, nó sẽ liên tục thông báo cho fd đó (chế độ mặc định của select, poll, epoll);
  • Trigger cạnh (ET): Chỉ khi lần đầu tiên có dữ liệu hoặc dữ liệu tăng lên (thay đổi) thì máy chủ mới thông báo cho fd đó;

6. Máy chủ Reactor

Máy chủ Reactor là máy chủ epoll hoạt động ở chế độ ET.

main.cc - Máy chủ tính mạng

#include "TcpServer.hpp"
#include <memory>

static Response calculate(const Request &req)
{
    Response resp(0, 0);
    switch (req.op_)
    {
    case '+':
        resp.result_ = req.x_ + req.y_;
        break;
    case '-':
        resp.result_ = req.x_ - req.y_;
        break;
    case '*':
        resp.result_ = req.x_ * req.y_;
        break;
    case '/':
        if (0 == req.y_)
            resp.code_ = 1;
        else
            resp.result_ = req.x_ / req.y_;
        break;
    case '%':
        if (0 == req.y_)
            resp.code_ = 2;
        else
            resp.result_ = req.x_ % req.y_;
        break;
    default:
        resp.code_ = 3;
        break;
    }
    return resp;
}

void networkCalculator(Connection* conn, std::string& request)
{
    logMessage(DEBUG, "networkCalculator được gọi, nhận yêu cầu: %s", request.c_str());
    Request req;
    if(!req.Deserialize(request))
        return;
    
    Response resp = calculate(req);
    std::string responseStr = resp.Serialize();
    conn->outputBuffer += responseStr;
    conn->server->EnableReadWrite(conn, true, true);
}

int main()
{
    std::unique_ptr<TcpServer> server(new TcpServer());
    server->DispatchHandler(networkCalculator);

    return 0;
}

Thẻ: linux io epoll poll reactor

Đăng vào ngày 5 tháng 7 lúc 12:26