您可以捐助,支持我们的公益事业。

1元 10元 50元





认证码:  验证码,看不清楚?请点击刷新验证码 必填



  求知 文章 文库 Lib 视频 iPerson 课程 认证 咨询 工具 讲座 Model Center   Code  
会员   
   
 
     
   
 
 订阅
从内核到应用层,完全掌握epoll网络编程利器
 
作者: 往事敬秋风
  116  次浏览      3 次
 2024-5-7
 
编辑推荐:
本文主要介绍了从内核到应用层,完全掌握epoll网络编程利器相关知识。希望对你的学习有帮助。
本文来自于微信公众号深度Linux,由火龙果软件Linda编辑、推荐。

epoll是Linux内核为处理大批量文件描述符而作了改进的poll,是Linux下多路复用IO接口select/poll的增强版本,它能显著提高程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率。另一点原因就是获取事件的时候,它无须遍历整个被侦听的描述符集,只要遍历那些被内核IO事件异步唤醒而加入Ready队列的描述符集合就行了。

epoll除了提供select/poll那种IO事件的水平触发(Level Triggered)外,还提供了边缘触发(Edge Triggered),这就使得用户空间程序有可能缓存IO状态,减少epoll_wait/epoll_pwait的调用,提高应用程序效率。

一、初识epoll

1.1工作方式

LT(level triggered)

水平触发,缺省方式,同时支持block和no-block socket,在这种做法中,内核告诉我们一个文件描述符是否被就绪了,如果就绪了,你就可以对这个就绪的fd进行IO操作。如果你不作任何操作,内核还是会继续通知你的,所以,这种模式编程出错的可能性较小。传统的select\poll都是这种模型的代表。

ET(edge-triggered)

边沿触发,高速工作方式,只支持no-block socket。在这种模式下,当描述符从未就绪变为就绪状态时,内核通过epoll告诉你。然后它会假设你知道文件描述符已经就绪,并且不会再为那个描述符发送更多的就绪通知,直到你做了某些操作导致那个文件描述符不再为就绪状态了(比如:你在发送、接受或者接受请求,或者发送接受的数据少于一定量时导致了一个EWOULDBLOCK错误)。但是请注意,如果一直不对这个fs做IO操作(从而导致它再次变成未就绪状态),内核不会发送更多的通知。

区别:LT事件不会丢弃,而是只要读buffer里面有数据可以让用户读取,则不断的通知你。而ET则只在事件发生之时通知。

1.2epoll的接口

int epoll_create(int size)

创建一个epoll句柄,size用来告诉内核,这个句柄监听的数目一共有多大,当创建好句柄以后,他就会占用一个fd值,在linux下如果查看/proc/进程id/fd/,是能够看到这个fd的,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。

int epoll_ctl(int epfd, int op, int fd, struct epoll_event event)*

epoll的事件注册函数, 它不同于select是在监听事件时告诉内核要监听什么类型的事件,而是在这里先注册要监听的事件类型。第一个参数是epoll_crete()的返回值,第二个参数表示动作,用三个宏来表示:LL_CTL_ADD: 注册新的 fd 到 epfd 中;EPOLL_CTL_MOD:修改已经注册的fd的监听事件;EPOLL_CTL_DEL:从epfd中删除一个fd第三个参数是要监听的fd,第四个参数是告诉内核需要监听什么事件,struct epoll_event结构如下:

    typedef union epoll_data{
void *ptr;
int fd;
__uint32_t u32;
__uint64_t u64;
}epoll_data_t;

struct epoll_event{
__uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
}

events 可以是以下几个宏的集合:

EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);

EPOLLOUT:表示对应的文件描述符可以写;

EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);

EPOLLERR:表示对应的文件描述符发生错误;

EPOLLHUP:表示对应的文件描述符被挂断;

EPOLLET:将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。

EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里

int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

等待事件的产生,类似于select()调用。参数events用来从内核得到事件的集合,maxevents告之内核这个events有多大,这个 maxevents的值不能大于创建epoll_create()时的size,参数timeout是超时时间(毫秒,0会立即返回,-1将不确定,也有说法说是永久阻塞)。该函数返回需要处理的事件数目,如返回0表示已超时。

关于ET、LT两种工作模式:可以得出这样的结论

ET 模式仅当状态发生变化的时候才获得通知,这里所谓的状态的变化并不包括缓冲区中还有未处理的数据,也就是说,如果要采用ET模式,需要一直 read/write直到出错为止,很多人反映为什么采用ET模式只接收了一部分数据就再也得不到通知了,大多因为这样;而LT模式是只要有数据没有处理 就会一直通知下去的。

二、epoll原理

为什么需要epoll?

epoll是Linux操作系统提供的一种事件驱动的I/O模型,用于高效地处理大量并发连接的网络编程。它相比于传统的select和poll方法,具有更高的性能和扩展性。使用epoll可以实现以下几个优势:

高效处理大量并发连接:epoll采用了事件驱动的方式,只有当有可读或可写事件发生时才会通知应用程序,避免了遍历所有文件描述符的开销。

内核与用户空间数据拷贝少:使用epoll时,内核将就绪的文件描述符直接填充到用户空间的事件数组中,减少了内核与用户空间之间数据拷贝次数。

支持边缘触发(Edge Triggered)模式:边缘触发模式下,仅在状态变化时才通知应用程序。这意味着每次通知只包含最新状态的文件描述符信息,可以有效避免低效循环检查。

支持水平触发(Level Triggered)模式:水平触发模式下,在就绪期间不断地进行通知,直到应用程序处理完该文件描述符。

select与poll的缺陷?

select 和 poll 都是Unix系统中用来监视一组文件描述符的变化的系统调用。它们可以监视文件描述符的三种变化:可读性、可写性和异常条件。select 和 poll 的主要缺陷如下:

文件描述符数量限制:select 和 poll 都有一个限制,就是它们只能监视少于1024个文件描述符的变化。这对于现代的网络编程来说是不够的,因为一个进程往往需要监视成千上万的连接。

效率问题:虽然 select 和 poll 可以监视多个文件描述符,但是它们在每次调用的时候都需要传递所有要监视的文件描述符集合,这会导致效率的降低。

信息不足:select 和 poll 返回的只是哪些文件描述符已经准备好了,但是它们并不告诉你具体是哪一个。这就需要对所有要监视的文件描述符进行遍历,直到找到准备好的文件描述符为止。

信号中断:select 和 poll 调用可以被信号中断,这可能会导致调用失败。

为了解决这些问题,现代操作系统中引入了新的系统调用 epoll 来替代 select 和 poll。epoll 没有文件描述符的限制,它可以监视大量的文件描述符,并且可以实现即开即用,无需传递所有文件描述符集合。此外,epoll 可以直接告诉你哪些文件描述符已经准备好,这大大提高了处理效率。

2.1epoll操作

epoll 在 linux 内核中申请了一个简易的文件系统,把原先的一个 select 或者 poll 调用分为了三个部分:调用 epoll_create 建立一个 epoll 对象(在 epoll 文件系统中给这个句柄分配资源)、调用 epoll_ctl 向 epoll 对象中添加连接的套接字、调用 epoll_wait 收集发生事件的连接。这样只需要在进程启动的时候建立一个 epoll 对象,并在需要的时候向它添加或者删除连接就可以了,因此,在实际收集的时候,epoll_wait 的效率会非常高,因为调用的时候只是传递了发生 IO 事件的连接。

epoll 实现

我们以 linux 内核 2.6 为例,说明一下 epoll 是如何高效的处理事件的,当某一个进程调用 epoll_create 方法的时候,Linux 内核会创建一个 eventpoll 结构体,这个结构体中有两个重要的成员。

第一个是 rb_root rbr,这是红黑树的根节点,存储着所有添加到 epoll 中的事件,也就是这个 epoll 监控的事件。
第二个是 list_head rdllist 这是一个双向链表,保存着将要通过 epoll_wait 返回给用户的、满足条件的事件。

每一个 epoll 对象都有一个独立的 eventpoll 结构体,这个结构体会在内核空间中创造独立的内存,用于存储使用 epoll_ctl 方法向 epoll 对象中添加进来的事件。这些事件都会挂到 rbr 红黑树中,这样就能够高效的识别重复添加的节点。

所有添加到 epoll 中的事件都会与设备(如网卡等)驱动程序建立回调关系,也就是说,相应的事件发生时会调用这里的方法。这个回调方法在内核中叫做 ep_poll_callback,它把这样的事件放到 rdllist 双向链表中。在 epoll 中,对于每一个事件都会建立一个 epitem 结构体。

当调用 epoll_wait 检查是否有发生事件的连接时,只需要检查 eventpoll 对象中的 rdllist 双向链表中是否有 epitem 元素,如果 rdllist 链表不为空,则把这里的事件复制到用户态内存中的同时,将事件数量返回给用户。通过这种方法,epoll_wait 的效率非常高。epoll-ctl 在向 epoll 对象中添加、修改、删除事件时,从 rbr 红黑树中查找事件也非常快。这样,epoll 就能够轻易的处理百万级的并发连接。

epoll工作模式

epoll 有两种工作模式,LT(水平触发)模式与 ET(边缘触发)模式。默认情况下,epoll 采用 LT 模式工作。两个的区别是:

Level_triggered(水平触发):当被监控的文件描述符上有可读写事件发生时,epoll_wait()会通知处理程序去读写。如果这次没有把数据一次性全部读写完(如读写缓冲区太小),那么下次调用 epoll_wait() 时,它还会通知你在上没读写完的文件描述符上继续读写,当然如果你一直不去读写,它会一直通知你。如果系统中有大量你不需要读写的就绪文件描述符,而它们每次都会返回,这样会大大降低处理程序检索自己关心的就绪文件描述符的效率。

Edge_triggered(边缘触发):当被监控的文件描述符上有可读写事件发生时,epoll_wait() 会通知处理程序去读写。如果这次没有把数据全部读写完(如读写缓冲区太小),那么下次调用 epoll_wait()时,它不会通知你,也就是它只会通知你一次,直到该文件描述符上出现第二次可读写事件才会通知你。这种模式比水平触发效率高,系统不会充斥大量你不关心的就绪文件描述符。

当然,在 LT 模式下开发基于 epoll 的应用要简单一些,不太容易出错,而在 ET 模式下事件发生时,如果没有彻底地将缓冲区的数据处理完,则会导致缓冲区的用户请求得不到响应。注意,默认情况下 Nginx 采用 ET 模式使用 epoll 的。

2.2 I/O 多路复用

(1)阻塞OR非阻塞

我们知道,对于 linux 来说,I/O 设备为特殊的文件,读写和文件是差不多的,但是 I/O 设备因为读写与内存读写相比,速度差距非常大。与 cpu 读写速度更是没法比,所以相比于对内存的读写,I/O 操作总是拖后腿的那个。网络 I/O 更是如此,我们很多时候不知道网络 I/O 什么时候到来,就好比我们点了一份外卖,不知道外卖小哥们什么时候送过来,这个时候有两个处理办法:

第一个是我们可以先去睡觉,外卖小哥送到楼下了自然会给我们打电话,这个时候我们在醒来取外卖就可以了。
第二个是我们可以每隔一段时间就给外卖小哥打个电话,这样就能实时掌握外卖的动态信息了。

第一种方式对应的就是阻塞的 I/O 处理方式,进程在进行 I/O 操作的时候,进入睡眠,如果有 I/O 时间到达,就唤醒这个进程。第二种方式对应的是非阻塞轮询的方式,进程在进行 I/O 操作后,每隔一段时间向内核询问是否有 I/O 事件到达,如果有就立刻处理。

阻塞的原理

工作队列

阻塞是进程调度的关键一环,指的是进程在等待某事件(如接收到网络数据)发生之前的等待状态,recv、select和epoll都是阻塞方法,以简单网络编程为例

下图中的计算机中运行着A、B、C三个进程,其中进程A执行着上述基础网络程序,一开始,这3个进程都被操作系统的工作队列所引用,处于运行状态,会分时执行

等待队列

当进程A执行到创建socket的语句时,操作系统会创建一个由文件系统管理的socket对象(如下图)。这个socket对象包含了发送缓冲区、接收缓冲区、等待队列等成员。等待队列是个非常重要的结构,它指向所有需要等待该socket事件的进程。

当程序执行到recv时,操作系统会将进程A从工作队列移动到该socket的等待队列中(如下图)。由于工作队列只剩下了进程B和C,依据进程调度,cpu会轮流执行这两个进程的程序,不会执行进程A的程序。所以进程A被阻塞,不会往下执行代码,也不会占用cpu资源。

ps:操作系统添加等待队列只是添加了对这个“等待中”进程的引用,以便在接收到数据时获取进程对象、将其唤醒,而非直接将进程管理纳入自己之下。上图为了方便说明,直接将进程挂到等待队列之下。

唤醒进程

当socket接收到数据后,操作系统将该socket等待队列上的进程重新放回到工作队列,该进程变成运行状态,继续执行代码。也由于socket的接收缓冲区已经有了数据,recv可以返回接收到的数据。

(2)线程池OR轮询

在现实中,我们当然选择第一种方式,但是在计算机中,情况就要复杂一些。我们知道,在 linux 中,不管是线程还是进程都会占用一定的资源,也就是说,系统总的线程和进程数是一定的。如果有许多的线程或者进程被挂起,无疑是白白消耗了系统的资源。而且,线程或者进程的切换也是需要一定的成本的,需要上下文切换,如果频繁的进行上下文切换,系统会损失很大的性能。一个网络服务器经常需要连接成千上万个客户端,而它能创建的线程可能之后几百个,线程耗光就不能对外提供服务了。这些都是我们在选择 I/O 机制的时候需要考虑的。这种阻塞的 I/O 模式下,一个线程只能处理一个流的 I/O 事件,这是问题的根源。

这个时候我们首先想到的是采用线程池的方式限制同时访问的线程数,这样就能够解决线程不足的问题了。但是这又会有第二个问题了,多余的任务会通过队列的方式存储在内存只能够,这样很容易在客户端过多的情况下出现内存不足的情况。

还有一种方式是采用轮询的方式,我们只要不停的把所有流从头到尾问一遍,又从头开始。这样就可以处理多个流了。

(3)代理

采用轮询的方式虽然能够处理多个 I/O 事件,但是也有一个明显的缺点,那就是会导致 CPU 空转。试想一下,如果所有的流中都没有数据,那么 CPU 时间就被白白的浪费了。

为了避免CPU空转,可以引进了一个代理。这个代理比较厉害,可以同时观察许多流的I/O事件,在空闲的时候,会把当前线程阻塞掉,当有一个或多个流有I/O事件时,就从阻塞态中醒来,于是我们的程序就会轮询一遍所有的流,这就是 select 与 poll 所做的事情,可见,采用 I/O 复用极大的提高了系统的效率。

2.3内核接收网络数据全过程

如下图所示,进程在recv阻塞期间,计算机收到了对端传送的数据(步骤①)。数据经由网卡传送到内存(步骤②),然后网卡通过中断信号通知CPU有数据到达,CPU执行中断程序(步骤③)。此处的中断程序主要有两项功能,先将网络数据写入到对应socket的接收缓冲区里面(步骤④),再唤醒进程A(步骤⑤),重新将进程A放入工作队列中。

唤醒线程的过程如下图所示:

2.4epoll原理(源码)

(1)创建epoll对象

如下图所示,当某个进程调用epoll_create方法时,内核会创建一个eventpoll对象(也就是程序中epfd所代表的对象)。eventpoll对象也是文件系统中的一员,和socket一样,它也会有等待队列。

最后 epoll_create 生成的文件描述符如下图所示:

/*
* 此结构体存储在file->private_data中
*/
struct eventpoll {
// 自旋锁,在kernel内部用自旋锁加锁,就可以同时多线(进)程对此结构体进行操作
// 主要是保护ready_list
spinlock_t lock;
// 这个互斥锁是为了保证在eventloop使用对应的文件描述符的时候,文件描述符不会被移除掉
struct mutex mtx;
// epoll_wait使用的等待队列,和进程唤醒有关
wait_queue_head_t wq;
// file->poll使用的等待队列,和进程唤醒有关
wait_queue_head_t poll_wait;
// 就绪的描述符队列
struct list_head rdllist;
// 通过红黑树来组织当前epoll关注的文件描述符
struct rb_root rbr;
// 在向用户空间传输就绪事件的时候,将同时发生事件的文件描述符链入到这个链表里面
struct epitem *ovflist;
// 对应的user
struct user_struct *user;
// 对应的文件描述符
struct file *file;
// 下面两个是用于环路检测的优化
int visited;
struct list_head visited_list_link;
};
本文讲述的是 kernel 是如何将就绪事件传递给 epoll 并唤醒对应进程上,因此在这里主要聚焦于 (wait_queue_head_t wq) 等成员。

 

就绪列表的数据结构

就绪列表引用着就绪的socket,所以它应能够快速的插入数据。

程序可能随时调用epoll_ctl添加监视socket,也可能随时删除。当删除时,若该socket已经存放在就绪列表中,它也应该被移除。

所以就绪列表应是一种能够快速插入和删除的数据结构。双向链表就是这样一种数据结构,epoll使用双向链表来实现就绪队列(对应上图的rdllist)。

索引结构

既然epoll将“维护监视队列”和“进程阻塞”分离,也意味着需要有个数据结构来保存监视的socket。至少要方便的添加和移除,还要便于搜索,以避免重复添加。红黑树是一种自平衡二叉查找树,搜索、插入和删除时间复杂度都是O(log(N)),效率较好。epoll使用了红黑树作为索引结构(对应上图的rbr)

ps:因为操作系统要兼顾多种功能,以及由更多需要保存的数据,rdlist并非直接引用socket,而是通过epitem间接引用,红黑树的节点也是epitem对象。同样,文件系统也并非直接引用着socket。为方便理解,本文中省略了某些间接结构。

维护监视列表

创建epoll对象后,可以用epoll_ctl添加或删除所要监听的socket。以添加socket为例,如下图,如果通过epoll_ctl添加sock1、sock2和sock3的监视,内核会将eventpoll添加到这三个socket的等待队列中。

添加所要监听的socket

当socket收到数据后,中断程序会操作eventpoll对象,而不是直接操作进程。

接收数据

当socket收到数据后,中断程序会给eventpoll的“就绪列表”添加socket引用。如下图展示的是sock2和sock3收到数据后,中断程序让rdlist引用这两个socket。

给就绪列表添加引用

eventpoll对象相当于是socket和进程之间的中介,socket的数据接收并不直接影响进程,而是通过改变eventpoll的就绪列表来改变进程状态。

当程序执行到epoll_wait时,如果rdlist已经引用了socket,那么epoll_wait直接返回,如果rdlist为空,阻塞进程。

阻塞和唤醒进程

假设计算机中正在运行进程A和进程B,在某时刻进程A运行到了epoll_wait语句。如下图所示,内核会将进程A放入eventpoll的等待队列中,阻塞进程。

epoll_wait阻塞进程

当socket接收到数据,中断程序一方面修改rdlist,另一方面唤醒eventpoll等待队列中的进程,进程A再次进入运行状态(如下图)。也因为rdlist的存在,进程A可以知道哪些socket发生了变化。

epoll唤醒进程

值得注意的是,我们在 close 对应的文件描述符的时候,会自动调用 eventpoll_release 将对应的 file 从其关联的 epoll_fd 中删除。

close fd
|->filp_close
|->fput
|->__fput
|->eventpoll_release
|->ep_remove
所以我们在关闭对应的文件描述符后,并不需要通过 epoll_ctl_del 来删掉对应 epoll 中相应的描述符。

三、如何使用epoll

通过在包含一个头文件#include

应用举例

下面,我引用google code中别人写的一个简单程序来进行说明。svn路径:http://sechat.googlecode.com/svn/trunk/

该程序一个简单的聊天室程序,用Linux C++写的,服务器主要是用epoll模型实现,支持高并发,我测试在有10000个客户端连接服务器的时候,server处理时间不到1秒,当然客户端只是与服务器连接之后,接受服务器的欢迎消息而已,并没有做其他的通信。虽然程序比较简单,但是在我们考虑服务器高并发时也提供了一个思路。在这个程序中,我已经把所有的调试信息和一些与epoll无关的信息干掉,并添加必要的注释,应该很容易理解。

程序共包含2个头文件和3个cpp文件。其中3个cpp文件中,每一个cpp文件都是一个应用程序,server.cpp:服务器程序,client.cpp:单个客户端程序,tester.cpp:模拟高并发,开启10000个客户端去连服务器。

utils.h头文件,就包含一个设置socket为不阻塞函数,如下:

int setnonblocking(int sockfd)
{
CHK(fcntl(sockfd, F_SETFL, fcntl(sockfd, F_GETFD, 0)|O_NONBLOCK));
return 0;
}

 

local.h头文件,一些常量的定义和函数的声明,如下:

#define BUF_SIZE 1024                 //默认缓冲区
#define SERVER_PORT 44444 //监听端口
#define SERVER_HOST "192.168.34.15" //服务器IP地址
#define EPOLL_RUN_TIMEOUT -1 //epoll的超时时间
#define EPOLL_SIZE 10000 //epoll监听的客户端的最大数目

#define STR_WELCOME "Welcome to seChat! You ID is: Client #%d"
#define STR_MESSAGE "Client #%d>> %s"
#define STR_NOONE_CONNECTED "Noone connected to server except you!"
#define CMD_EXIT "EXIT"

//两个有用的宏定义:检查和赋值并且检测
#define CHK(eval) if(eval < 0){perror("eval"); exit(-1);}
#define CHK2(res, eval) if((res = eval) < 0){perror("eval"); exit(-1);}

//================================================================================================
//函数名:setnonblocking
//函数描述:设置socket为不阻塞
//输入:[in] sockfd socket标示符
//输出:无
//返回:0
//================================================================================================
int setnonblocking(int sockfd);

//================================================================================================
//函数名:handle_message
//函数描述:处理每个客户端socket
//输入:[in] new_fd socket标示符
//输出:无
//返回:返回从客户端接受的数据的长度
//================================================================================================
int handle_message(int new_fd);

server.cpp文件,epoll模型就在这里实现,如下:

#include "local.h"
#include "utils.h"

using namespace std;

// 存放客户端socket描述符的list
list<int> clients_list;

int main(int argc, char *argv[])
{
int listener; //监听socket
struct sockaddr_in addr, their_addr;
addr.sin_family = PF_INET;
addr.sin_port = htons(SERVER_PORT);
addr.sin_addr.s_addr = inet_addr(SERVER_HOST);
socklen_t socklen;
socklen = sizeof(struct sockaddr_in);

static struct epoll_event ev, events[EPOLL_SIZE];
ev.events = EPOLLIN | EPOLLET; //对读感兴趣,边沿触发

char message[BUF_SIZE];

int epfd; //epoll描述符
clock_t tStart; //计算程序运行时间

int client, res, epoll_events_count;

CHK2(listener, socket(PF_INET, SOCK_STREAM, 0)); //初始化监听socket
setnonblocking(listener); //设置监听socket为不阻塞
CHK(bind(listener, (struct sockaddr *)&addr, sizeof(addr))); //绑定监听socket
CHK(listen(listener, 1)); //设置监听

CHK2(epfd,epoll_create(EPOLL_SIZE)); //创建一个epoll描述符,并将监听socket加入epoll
ev.data.fd = listener;
CHK(epoll_ctl(epfd, EPOLL_CTL_ADD, listener, &ev));

while(1)
{
CHK2(epoll_events_count,epoll_wait(epfd, events, EPOLL_SIZE, EPOLL_RUN_TIMEOUT));
tStart = clock();
for(int i = 0; i < epoll_events_count ; i++)
{
if(events[i].data.fd == listener) //新的连接到来,将连接添加到epoll中,并发送欢迎消息
{
CHK2(client,accept(listener, (struct sockaddr *) &their_addr, &socklen));
setnonblocking(client);
ev.data.fd = client;
CHK(epoll_ctl(epfd, EPOLL_CTL_ADD, client, &ev));

clients_list.push_back(client); // 添加新的客户端到list
bzero(message, BUF_SIZE);
res = sprintf(message, STR_WELCOME, client);
CHK2(res, send(client, message, BUF_SIZE, 0));

}else
{
CHK2(res,handle_message(events[i].data.fd)); //注意:这里并没有调用epoll_ctl重新设置socket的事件类型,但还是可以继续收到客户端发送过来的信息
}
}
printf("Statistics: %d events handled at: %.2f second(s)\n", epoll_events_count, (double)(clock() - tStart)/CLOCKS_PER_SEC);
}

close(listener);
close(epfd);

return 0;
}

int handle_message(int client)
{
char buf[BUF_SIZE], message[BUF_SIZE];
bzero(buf, BUF_SIZE);
bzero(message, BUF_SIZE);

int len;

CHK2(len,recv(client, buf, BUF_SIZE, 0)); //接受客户端信息

if(len == 0) //客户端关闭或出错,关闭socket,并从list移除socket
{
CHK(close(client));
clients_list.remove(client);
}
else //向客户端发送信息
{
if(clients_list.size() == 1)
{
CHK(send(client, STR_NOONE_CONNECTED, strlen(STR_NOONE_CONNECTED), 0));
return len;
}

sprintf(message, STR_MESSAGE, client, buf);
list<int>::iterator it;
for(it = clients_list.begin(); it != clients_list.end(); it++)
{
if(*it != client)
{
CHK(send(*it, message, BUF_SIZE, 0));
}
}
}

return len;
}

tester.cpp文件,模拟服务器的高并发,开启10000个客户端去连接服务器,如下:

#include "local.h"
#include "utils.h"

using namespace std;

char message[BUF_SIZE]; //接受服务器信息
list<int> list_of_clients; //存放所有客户端
int res;
clock_t tStart;

int main(int argc, char *argv[])
{
int sock;
struct sockaddr_in addr;
addr.sin_family = PF_INET;
addr.sin_port = htons(SERVER_PORT);
addr.sin_addr.s_addr = inet_addr(SERVER_HOST);

tStart = clock();

for(int i=0 ; i<EPOLL_SIZE; i++) //生成EPOLL_SIZE个客户端,这里是10000个,模拟高并发
{
CHK2(sock,socket(PF_INET, SOCK_STREAM, 0));
CHK(connect(sock, (struct sockaddr *)&addr, sizeof(addr)) < 0);
list_of_clients.push_back(sock);

bzero(&message, BUF_SIZE);
CHK2(res,recv(sock, message, BUF_SIZE, 0));
printf("%s\n", message);
}

list<int>::iterator it; //移除所有客户端
for(it = list_of_clients.begin(); it != list_of_clients.end() ; it++)
close(*it);

printf("Test passed at: %.2f second(s)\n", (double)(clock() - tStart)/CLOCKS_PER_SEC);
printf("Total server connections was: %d\n", EPOLL_SIZE);

return 0;
}

 

我就不给出程序的执行结果的截图了,不过下面这张截图是代码作者自己测试的,可以看出,并发10000无压力呀

单个客户端去连接服务器,client.cpp文件,如下:

#include "local.h"
#include "utils.h"

using namespace std;

char message[BUF_SIZE];

/*
流程:
调用fork产生两个进程,两个进程通过管道进行通信
子进程:等待客户输入,并将客户输入的信息通过管道写给父进程
父进程:接受服务器的信息并显示,将从子进程接受到的信息发送给服务器
*/
int main(int argc, char *argv[])
{
int sock, pid, pipe_fd[2], epfd;

struct sockaddr_in addr;
addr.sin_family = PF_INET;
addr.sin_port = htons(SERVER_PORT);
addr.sin_addr.s_addr = inet_addr(SERVER_HOST);

static struct epoll_event ev, events[2];
ev.events = EPOLLIN | EPOLLET;

//退出标志
int continue_to_work = 1;

CHK2(sock,socket(PF_INET, SOCK_STREAM, 0));
CHK(connect(sock, (struct sockaddr *)&addr, sizeof(addr)) < 0);

CHK(pipe(pipe_fd));

CHK2(epfd,epoll_create(EPOLL_SIZE));

ev.data.fd = sock;
CHK(epoll_ctl(epfd, EPOLL_CTL_ADD, sock, &ev));

ev.data.fd = pipe_fd[0];
CHK(epoll_ctl(epfd, EPOLL_CTL_ADD, pipe_fd[0], &ev));

// 调用fork产生两个进程
CHK2(pid,fork());
switch(pid)
{
case 0: // 子进程
close(pipe_fd[0]); // 关闭读端
printf("Enter 'exit' to exit\n");
while(continue_to_work)
{
bzero(&message, BUF_SIZE);
fgets(message, BUF_SIZE, stdin);

// 当收到exit命令时,退出
if(strncasecmp(message, CMD_EXIT, strlen(CMD_EXIT)) == 0)
{
continue_to_work = 0;
}
else
{
CHK(write(pipe_fd[1], message, strlen(message) - 1));
}
}
break;
default: // 父进程
close(pipe_fd[1]); // 关闭写端
int epoll_events_count, res;
while(continue_to_work)
{
CHK2(epoll_events_count,epoll_wait(epfd, events, 2, EPOLL_RUN_TIMEOUT));

for(int i = 0; i < epoll_events_count ; i++)
{
bzero(&message, BUF_SIZE);
if(events[i].data.fd == sock) //从服务器接受信息
{
CHK2(res,recv(sock, message, BUF_SIZE, 0));
if(res == 0) //服务器已关闭
{
CHK(close(sock));
continue_to_work = 0;
}
else
{
printf("%s\n", message);
}
}
else //从子进程接受信息
{
CHK2(res, read(events[i].data.fd, message, BUF_SIZE));
if(res == 0)
{
continue_to_work = 0;
}
else
{
CHK(send(sock, message, BUF_SIZE, 0));
}
}
}
}
}
if(pid)
{
close(pipe_fd[0]);
close(sock);
}else
{
close(pipe_fd[1]);
}

return 0;
}

 

   
116 次浏览       3
相关文章

一文了解汽车嵌入式AUTOSAR架构
嵌入式Linux系统移植的四大步骤
嵌入式中设计模式的艺术
嵌入式软件架构设计 模块化 & 分层设计
相关文档

企点嵌入式PHP的探索实践
ARM与STM简介
ARM架构详解
华为鸿蒙深度研究
相关课程

嵌入式C高质量编程
嵌入式操作系统组件及BSP裁剪与测试
基于VxWorks的嵌入式开发、调试与测试
嵌入式单元测试最佳实践

最新活动计划
MBSE(基于模型的系统工程)6-20[北京]
大模型微调原理与实操 6-20[厦门]
基于模型的数据治理与中台 6-25[北京]
DoDAF规范、模型与实例 6-24[北京]
UAF架构体系与实践 7-4[北京]
Linux内核编程及设备驱动 7-25[北京]
 
 
最新文章
基于FPGA的异构计算在多媒体中的应用
深入Linux内核架构——简介与概述
Linux内核系统架构介绍
浅析嵌入式C优化技巧
进程间通信(IPC)介绍
最新课程
嵌入式Linux驱动开发
代码整洁之道-态度、技艺与习惯
嵌入式软件测试
嵌入式C高质量编程
嵌入式软件可靠性设计
成功案例
某军工所 嵌入式软件架构
中航工业某研究所 嵌入式软件开发指南
某轨道交通 嵌入式软件高级设计实践
深圳 嵌入式软件架构设计—高级实践
某企业 基于IPD的嵌入式软件开发
更多...