select and poll代码示例:https://gitee.com/tgwTTT/linux-learning-dai/tree/master/select

epoll代码示例:https://gitee.com/tgwTTT/linux-learning-dai/tree/master/Tcpepollserver

在 Linux 网络编程中,I/O 多路复用是实现高并发服务器的核心技术之一。而 select 作为最古老的系统调用之一,尽管在今天已被 epoll 等更高效的机制取代,但依然是学习网络编程的必经之路。

Select:

初识select:

select 是一个同步 I/O 多路复用机制,允许程序在一个线程内同时监视多个文件描述符(file descriptor,简称 fd),等待其中某些 fd 变为“可读”、“可写”或“发生异常”。

#include <sys/select.h>

int select(int nfds,fd_set *readfds,fd_set *writefds,fd_set *exceptfds,struct timeval *timeout);

参数解释:
• 参数nfds是需要监视的最⼤的⽂件描述符值+1;
• rdset,wrset,exset分别对应于需要检测的可读⽂件描述符的集合,可写⽂件描述符的集 合及异常
⽂件描述符的集合;
• 参数timeout为结构timeval,⽤来设置select()的等待时间
参数timeout取值:
• NULL:则表⽰select()没有timeout,select将⼀直被阻塞,直到某个⽂件描述符上发⽣了事件;
• 0:仅检测描述符集合的状态,然后⽴即返回,并不等待外部事件的发⽣。
• 特定的时间值:如果在指定的时间段⾥没有事件发⽣,select将超时返回。

辅助宏:

FD_ZERO(fd_set *set); // 清空集合
FD_SET(int fd, fd_set *set); // 将 fd 加入集合
FD_CLR(int fd, fd_set *set); // 将 fd 从集合中移除
FD_ISSET(int fd, fd_set *set); // 判断 fd 是否在集合中

在传统的阻塞 I/O 模型中,每个连接需要一个线程或进程来处理。当连接数增加时,线程切换和内存开销会迅速膨胀。

select 的引入解决了这个问题:

  • 单线程处理多个连接;
  • 阻塞等待,不占用 CPU;
  • 跨平台支持好,适用于小型服务或教学场景。

select执⾏过程:

fd_set的本质是一个整数数组,更准确的来说是一个位图,取fd_set⻓度为1字节,fd_set中的每⼀bit可以对应⼀个⽂件描述符fd。则1字节⻓的fd_set最⼤可以对应8个fd.

(1)执⾏fd_set set; FD_ZERO(&set);则set⽤位表⽰是0000,0000。

(2)若fd=5,执⾏FD_SET(fd,&set);后set变为0001,0000(第5位置为1)

(3)若再加⼊fd=2,fd=1,则set变为0001,0011

(4)执⾏select(6,&set,0,0,0)阻塞等待

(5)若fd=1,fd=2上都发⽣可读事件,则select返回,此时set变为0000,0011。注意:没有事件发⽣的fd=5被清空。

socket就绪条件

读就绪:

socket内核中, 接收缓冲区中的字节数, ⼤于等于低⽔位标记SO_RCVLOWAT. 此时可以⽆阻塞的读该⽂件描述符, 并且返回值⼤于0;

socket TCP通信中, 对端关闭连接, 此时对该socket读, 则返回0;

监听的socket上有新的连接请求;

socket上有未处理的错误;

写就绪:

socket内核中, 发送缓冲区中的可⽤字节数(发送缓冲区的空闲位置⼤⼩), ⼤于等于低⽔位标SO_SNDLOWAT, 此时可以⽆阻塞的写, 并且返回值⼤于0;

socket的写操作被关闭(close或者shutdown). 对⼀个写操作被关闭的socket进⾏写操作, 会触发SIGPIPE信号;

socket使⽤⾮阻塞connect连接成功或失败之后;

socket上有未读取的错误;

select的特点:


1.可监控的⽂件描述符个数取决于sizeof(fd_set)的值. 我这边服务器上sizeof(fd_set)=512,每bit表⽰⼀个⽂件描述符,则我服务器上⽀持的最⼤⽂件描述符是512*8=4096.

2.将fd加⼊select监控集的同时,还要再使⽤⼀个数据结构array保存放到select监控集中的fd, ◦ ⼀是⽤于再select 返回后,array作为源数据和fd_set进⾏FD_ISSET判断。 ◦ ⼆是select返回后会把以前加⼊的但并⽆事件发⽣的fd清空,则每次开始select前都要重新从array取得fd逐⼀加⼊(FD_ZERO最先),扫描array的同时取得fd最⼤值maxfd,⽤于select的第⼀个参数。

select缺点

1.每次调⽤select, 都需要⼿动设置fd集合, 从接⼝使⽤⻆度来说也⾮常不便.

2.每次调⽤select,都需要把fd集合从⽤⼾态拷⻉到内核态,这个开销在fd很多时会很⼤

3.同时每次调⽤select都需要在内核遍历传递进来的所有fd,这个开销在fd很多时也很⼤

4.select⽀持的⽂件描述符数量太⼩.

问题描述
fd 数量限制默认最大为 1024(FD_SETSIZE),无法简单扩展
性能随 fd 增加而下降每次调用都需要遍历所有 fd,复杂度为 O(n)
内存拷贝开销每次调用都要将 fd 集合从用户态拷贝到内核态
不可伸缩不适合万级并发连接,容易被 epoll 替代

代码示例

#pragma once
#include <iostream>
#include <sys/select.h>
#include "Common.hpp"
#include "Socket.hpp"
#include <vector>
using namespace std;
using namespace SocketMoudle;
class selectServer
{
private:
    unique_ptr<Socket> _listenSocket;
    bool _running = true;
    const static int size = sizeof(fd_set) * 8;
    int defaulted = -1;
    vector<int> fd_array;
    ;

public:
    selectServer(int port) : _listenSocket(make_unique<TcpSocket>(port))
    {
        _listenSocket->BuildListenSocket(port);
        fd_array.resize(size, defaulted);
    }
    ~selectServer() {}
    void start()
    {
        while (_running)
        {
            // 将listenSocket加入到select中,让select关心listenSocket的读事件
            fd_set readfds;    // 读文件描述符集
            FD_ZERO(&readfds); // 清空文件描述符集
            // FD_SET(_listenSocket->GetSockFd(),&readfds);//将listenSocket加入到文件描述符集中
            // 借助辅助数组保存服务器历史获取过的所有fd
            // struct timeval timeout={2,0};//设置超时时间为2秒
            // 最大的fd一定是变化的,每一次都要重置
            fd_array[0] = _listenSocket->GetSockFd();
            int maxfd = -1;
            for (int fd : fd_array)
            {
                if (fd != defaulted)
                {
                    FD_SET(fd, &readfds);
                    maxfd = max(maxfd, fd);
                }
            }
            int n = select(maxfd + 1, &readfds, nullptr, nullptr, nullptr);
            switch (n)
            {
            case -1:
                LOG(LogLevel::ERROR) << "select error";
                break;
            case 0:
                LOG(LogLevel::INFO) << "select timeout";
                break;
            default:
                LOG(LogLevel::INFO) << "select has event";
                Dispatcher(readfds);
                break;
            }
        }
        _running = false;
    }
    void PrintFd()
    {
        for (int fd : fd_array)
        {
            if (fd != defaulted)
            {
                cout << fd << endl;
            }
        }
    }
    void Dispatcher(fd_set &readfds)
    {
        // 处理listenSocket的读事件
        for (int i = 0; i < fd_array.size(); i++)
        {
            if (FD_ISSET(fd_array[i], &readfds))
            {
                if (fd_array[i] == _listenSocket->GetSockFd())
                {
                    // 处理listenSocket的读事件
                    AcceptNewClient();
                }
                else
                {
                    // 处理其他客户端
                    Recever(fd_array[i], i);
                }
            }
        }
    }
    void AcceptNewClient()
    {

        NetAddr client;
        int fd = _listenSocket->Accept(&client); // 不会阻塞,套接字已就绪
        if (fd >= 0)
        {
            LOG(LogLevel::INFO) << "accept a new client:" + client.stringaddr();
            // 将新的客户端加入到辅助数组中
            int i = 0;
            for (; i < fd_array.size(); i++)
            {
                if (fd_array[i] == defaulted)
                {
                    break;
                }
            }
            if (i == fd_array.size())
            {
                LOG(LogLevel::WARNING) << "达到最大连接数,无法接收新连接";
                close(fd);
            }
            else
            {
                fd_array[i] = fd;
            }
        }
    }
    void Recever(int fd, int i)
    {
        char buffer[1024];
        ssize_t n = recv(fd, buffer, sizeof(buffer) - 1, 0);
        if (n > 0)
        {
            buffer[n] = 0;
            LOG(LogLevel::INFO) << "收到客户端数据:" << buffer;
        }
        else if (n == 0)
        {
            LOG(LogLevel::INFO) << "客户端断开连接";
            close(fd);
            // 将客户端fd从辅助数组中移除
            fd_array[i] = defaulted;
        }
        else
        {
            LOG(LogLevel::ERROR) << "接收数据失败";
        }
    }
    void stop()
    {
        _running = false;
    }
};

Poll

上面,我们见识了“位图 + 最大 fd” 的经典设计,也领教了 1024 上限 与 O(n) 遍历 带来的痛苦。
今天的主角 poll 正是为了拆掉 1024 天花板、简化 API 而诞生的“过渡王者”。虽然它依旧躲不过 O(n) 的宿命,却是理解后续 epoll 的关键跳板。

一、poll 是什么?

pollselect 同属 同步 I/O 多路复用:
一次阻塞,等待多个文件描述符就绪,返回时告诉用户“哪些 fd 发生了什么事件”。

int poll(struct pollfd *fds, nfds_t nfds, int timeout);

核心数据:

struct pollfd {
int fd; // 要监视的文件描述符
short events; // 输入:关心的事件掩码
short revents; // 输出:内核返回的就绪事件
};

参数说明

fds是⼀个poll函数监听的结构列表. 每⼀个元素中, 包含了三部分内容: ⽂件描述符, 监听的事件集合, 返回的事件集合.

nfds表⽰fds数组的⻓度.

timeout表⽰poll函数的超时时间, 单位是毫秒(ms).

事件说明
POLLIN可读(含新连接)
POLLOUT可写
POLLERR出错
POLLHUP对端挂起
POLLRDHUPTCP 对端关闭写(Linux 2.6.17+)

poll的优点

不同于select使⽤三个位图来表⽰三个fdset的⽅式,poll使⽤⼀个pollfd的指针实现.

pollfd结构包含了要监视的event和发⽣的event,不再使⽤select“参数-值”传递的⽅式. 接⼝使⽤⽐select⽅便.

poll并没有最⼤数量限制 (但是数量过⼤后性能也是会下降).

poll的缺点

poll中监听的⽂件描述符数⽬增多时

和select函数⼀样,poll返回后,需要轮询pollfd来获取就绪的描述符.

每次调⽤poll都需要把⼤量的pollfd结构从⽤⼾态拷⻉到内核中.

同时连接的⼤量客⼾端在⼀时刻可能只有很少的处于就绪状态, 因此随着监视的描述符数量的增⻓, 其效率也会线性下降.

代码示例
#pragma once
#include <iostream>
#include <sys/select.h>
#include "Common.hpp"
#include "Socket.hpp"
#include <vector>
#include<unistd.h>
#include<sys/poll.h>
using namespace std;
using namespace SocketMoudle;
class pollServer
{
private:
    unique_ptr<Socket> _listenSocket;
    bool _running = true;
    const int defaulted = -1;
    static const int size = 4096;
    struct pollfd pfds[size];
public:
    pollServer(int port) : _listenSocket(make_unique<TcpSocket>(port))
    {
        _listenSocket->BuildListenSocket(port);
        // 初始化辅助数组
        for (int i = 0; i < size; i++)
        {
            pfds[i].fd = defaulted;
            pfds[i].events = 0;
            pfds[i].revents = 0;
        }
        pfds[0].fd = _listenSocket->GetSockFd();
        pfds[0].events = POLLIN;
    }
    ~pollServer() {}
    void start()
    {
        while (_running)
        {
            //int timeout=1000; // 超时时间(MS)
            int n = poll(pfds, size, -1);
            switch (n)
            {
            case -1:
                LOG(LogLevel::ERROR) << "select error";
                break;
            case 0:
                LOG(LogLevel::INFO) << "select timeout";
                break;
            default:
                LOG(LogLevel::INFO) << "select has event";
                Dispatcher(pfds);
                break;
            }
        }
        _running = false;
    }
    void Dispatcher(struct pollfd pfds[])
    {
        // 处理listenSocket的读事件
        for (int i = 0; i < size; i++)
        {
            if (pfds[i].revents & POLLIN)
            {
                if (pfds[i].fd == _listenSocket->GetSockFd())
                {
                    // 处理listenSocket的读事件
                    AcceptNewClient();
                }
                else
                {
                    // 处理其他客户端
                    Recever(i);
                }
            }
        }
    }
    void AcceptNewClient()
    {

        NetAddr client;
        int fd = _listenSocket->Accept(&client); // 不会阻塞,套接字已就绪
        if (fd >= 0)
        {
            LOG(LogLevel::INFO) << "accept a new client:" + client.stringaddr();
            // 将新的客户端加入到辅助数组中
            int i = 0;
            for (; i < size; i++)
            {
                if (pfds[i].fd == defaulted)
                {
                    break;
                }
            }
            if (i == size)
            {
                LOG(LogLevel::WARNING) << "达到最大连接数,无法接收新连接";
                 //如果是指针扩容即可,这里不做处理
                close(fd);
            }
            else
            {
                pfds[i].fd = fd;
                pfds[i].events = POLLIN;
                pfds[i].revents = 0;
            }
        }
    }
    void Recever(int i)
    {
        char buffer[1024];
        ssize_t n = recv(pfds[i].fd, buffer, sizeof(buffer) - 1, 0);
        if (n > 0)
        {
            buffer[n] = 0;
            cout<< "收到客户端数据:" << buffer;
        }
        else if (n == 0)
        {
            LOG(LogLevel::INFO) << "客户端断开连接";
            close(pfds[i].fd);
            // 将客户端fd从辅助数组中移除
            pfds[i].fd = defaulted;
            pfds[i].events = 0;
            pfds[i].revents = 0;
        }
        else
        {
            LOG(LogLevel::ERROR) << "接收数据失败";
            
        }
    }
    void stop()
    {
        _running = false;
    }
};

Epoll

初识epoll

前面我们分别拆解了 selectpoll

  • select 用 位图,上限 1024,O(最高 fd);
  • poll 换成 数组,拆掉 1024,却仍逃不掉 O(n) 轮询。

当 C10K → C100K → C1000K 时,CPU 时间绝大部分浪费在 “遍历一堆未就绪的 fd” 上。
epoll 的出现,就是为了把 “遍历” 变成 “回调”,让复杂度从 O(n) 降到 O(1)

int epoll_create1(int flags); // 建一棵红黑树
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *ev);
int epoll_wait(int epfd, struct epoll_event *evs, int maxevents, int timeout);

它不同于select()是在监听事件时告诉内核要监听什么类型的事件, ⽽是在这⾥先注册要监听的事件类型.

第⼀个参数是epoll_create()的返回值(epoll的句柄).

第⼆个参数表⽰动作,⽤三个宏来表⽰.

第三个参数是需要监听的fd.

第四个参数是告诉内核需要监听什么事.

struct epoll_event {
__uint32_t events; // 事件掩码 EPOLLIN | EPOLLOUT …
epoll_data_t data; // 用户附着数据
};

操作码op

  • EPOLL_CTL_ADD:插入节点
  • EPOLL_CTL_MOD:修改事件
  • EPOLL_CTL_DEL:删除节点

epoll⼯作原理

epoll模型在底层有这样一颗红黑树,储存了文件描述符信息和文件需要关心的事件信息,此外还维护了一个就绪队列,内核会将已就绪的文件信息添加到就绪队列中。其核心原理是通过内核与用户空间共享内存,在内核中维护一个事件表,当有文件描述符就绪时,内核会将其加入就绪队列,用户空间通过 epoll_wait 函数获取就绪的文件描述符,而无需像 select 和 poll 那样遍历整个文件描述符集合。 在 I/O 多路复用机制中,select 和 poll 是 epoll 的 “前辈”,但它们存在一些明显的不足,而 epoll 正是为克服这些不足而出现的。

触发模式

⽔平触发Level Triggered ⼯作模式

epoll默认状态下就是LT⼯作模式.

当epoll检测到socket上事件就绪的时候, 可以不⽴刻进⾏处理. 或者只处理⼀部分. •

如果缓冲区中有2k数据,只读了1K数据, 缓冲区中还剩1K数据, 在第⼆次调⽤ epoll_wait 时, epoll_wait 仍然会⽴刻返回并通知socket读事件就绪.

直到缓冲区上所有的数据都被处理完, epoll_wait 才会⽴刻返回.

⽀持阻塞读写和⾮阻塞读写

边缘触发Edge Triggered⼯作模式

如果我们在第1步将socket添加到epoll描述符的时候使⽤了EPOLLET标志, epoll进⼊ET⼯作模式.

当epoll检测到socket上事件就绪时, 必须⽴刻处理.

如上⾯的例⼦, 虽然只读了1K的数据, 缓冲区还剩1K的数据, 在第⼆次调⽤ epoll_wait 的时候, epoll_wait 不会再返回了.

也就是说, ET模式下, ⽂件描述符上的事件就绪后, 只有⼀次处理机会.

ET的性能⽐LT性能更⾼( epoll_wait 返回的次数少了很多). Nginx默认采⽤ET模式使⽤epoll.

只⽀持⾮阻塞的读写select和poll其实也是⼯作在LT模式下. epoll既可以⽀持LT, 也可以⽀持ET.

模式行为读事件举例
LT(Level Trigger,水平触发)只要 fd 处于就绪状态,每次 epoll_wait 都返回缓冲区有数据就通知
ET(Edge Trigger,边缘触发)状态变化瞬间 只通知一次缓冲区从空→有通知一次,后续不再提醒

ET 模式能显著减少事件通知次数,但必须 一次性把数据读空,否则数据会失效

epoll的优点(和 select 的缺点对应)

1.接⼝使⽤⽅便: 虽然拆分成了三个函数, 但是反⽽使⽤起来更⽅便⾼效. 不需要每次循环都设置关注的⽂件描述符, 也做到了输⼊输出参数分离开

2.数据拷⻉轻量: 只在合适的时候调⽤ EPOLL_CTL_ADD 将⽂件描述符结构拷⻉到内核中, 这个操作并不频繁(⽽select/poll都是每次循环都要进⾏拷⻉)

3.事件回调机制: 避免使⽤遍历, ⽽是使⽤回调函数的⽅式, 将就绪的⽂件描述符结构加⼊到就绪队列中, epoll_wait 返回直接访问就绪队列就知道哪些⽂件描述符就绪. 这个操作时间复杂度O(1). 即使⽂件描述符数⽬很多, 效率也不会受到影响. • 没有数量限制: ⽂件描述符数⽬⽆上限.

对比

维度selectpollepoll
数据结构位图 fd_set数组 struct pollfd红黑树 + 就绪链表
最大 fd 数1024(FD_SETSIZE)无硬上限无硬上限
用户→内核每次整包拷贝每次整包拷贝一次 epoll_ctl 注册,永久有效
内核→用户整包拷回 + 清零整包拷回只拷就绪事件
查找方式轮询 0~nfds-1轮询整个数组回调直接链入就绪队列
时间复杂度O(最高 fd)O(数组长度)O(1) 就绪数
触发模式只有 LT只有 LTLT + ET
跨平台_POSIX_C_SOURCE 2001同左Linux only
代码复杂度宏操作啰嗦数组直观需要处理 ET 非阻塞

代码举例:

ET+非阻塞

// echo_server_et.c
#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <arpa/inet.h>

#define PORT 7777
#define MAX_EVENTS 1024
#define BUF_SIZE 4096

static int set_nonblock(int fd) {
    return fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK);
}

int create_listener() {
    int fd = socket(AF_INET, SOCK_STREAM, 0);
    int opt = 1;
    setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
    struct sockaddr_in addr = {
        .sin_family = AF_INET,
        .sin_port   = htons(PORT),
        .sin_addr.s_addr = htonl(INADDR_ANY)
    };
    bind(fd, (struct sockaddr*)&addr, sizeof(addr));
    listen(fd, 128);
    set_nonblock(fd);
    return fd;
}

void do_read(int epfd, int fd);

void do_accept(int epfd, int lfd) {
    while (1) {
        struct sockaddr_in cli;
        socklen_t len = sizeof(cli);
        int conn = accept(lfd, (struct sockaddr*)&cli, &len);
        if (conn < 0) {
            if (errno == EAGAIN || errno == EWOULDBLOCK) break;
            perror("accept");
            continue;
        }
        set_nonblock(conn);
        struct epoll_event ev = { .events = EPOLLIN | EPOLLET, .data.fd = conn };
        epoll_ctl(epfd, EPOLL_CTL_ADD, conn, &ev);
        printf("new conn %d\n", conn);
    }
}

void do_read(int epfd, int fd) {
    char buf[BUF_SIZE];
    while (1) {
        ssize_t n = read(fd, buf, sizeof(buf));
        if (n > 0) {
            write(fd, buf, n);          // 回显
        } else if (n == 0) {            // 对端 FIN
            close(fd);
            epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
            break;
        } else {
            if (errno == EAGAIN || errno == EWOULDBLOCK)
                break;                  // 读空,ET 需要再次等待
            perror("read");
            close(fd);
            epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
            break;
        }
    }
}

int main() {
    int lfd = create_listener();
    int epfd = epoll_create1(0);
    struct epoll_event ev = { .events = EPOLLIN | EPOLLET, .data.fd = lfd };
    epoll_ctl(epfd, EPOLL_CTL_ADD, lfd, &ev);

    struct epoll_event evs[MAX_EVENTS];
    while (1) {
        int n = epoll_wait(epfd, evs, MAX_EVENTS, -1);
        for (int i = 0; i < n; ++i) {
            int fd = evs[i].data.fd;
            if (fd == lfd) {
                do_accept(epfd, lfd);
            } else if (evs[i].events & EPOLLIN) {
                do_read(epfd, fd);
            } else if (evs[i].events & (EPOLLERR | EPOLLHUP)) {
                close(fd);
                epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
            }
        }
    }
}

今天的更新就到这里啦,如有错误欢迎评论区指出!!!