EPOLL使用的简单总结3——ET模式+线程池
1.当前使用线程池的实现
线程池本质就是提前创建好的多个线程(在程序初始化的时候创建的多个线程),在加上对线程的操作就可以叫线程池。
这里创建的线程池并不像Java那样对线程有很好的控制和管理。这里只负责线程的创建和销毁。线程的工作也是提前安排好,而不是在运行时分配。
这样做的目的是合理利用硬件资源,让资源在程序运行的时候最大化的集中到IO操作上。
理由一:之前文章说过,机器性能杀手有【环境切换】,所谓环境切换一般就是指线程的切换。
理由二:创建线程时,内存会为它分配运行所需要的空间。开辟新的堆栈。配置线程上下文和描述表。都会消耗内核资源。
要实现的线程池结构:
其中实款要实现为要实现的部分。
如上图,使用4个线程的理由:我的服务器运行在一个4核的机器上。
这样做的理由:一个cpu某一时刻只能运行一个线程,如果有第二线程就要涉及到线程在cpu的切换会消耗cpu的资源,上面说过了。
2.epoll+ET回顾
程序的大致流程如图
3.epoll+ET+线程池
延续上图,初步设计的程序结构为
4.代码
- 线程池:
.h
#ifndef PTHREAD_POOL_H
#define PTHREAD_POOL_H
#include <iostream>
#include <stdlib.h>
#include <vector>
#include <pthread.h>
#include <string.h>
#include <sys/epoll.h>
//using namespace std;
typedef struct ConnFdNode{
int i;
int connfd;
}ConnFdNode;
int make_pthread(int pthreadNum);
void *pthread_work(void *arg);
int destroy_pthread();
bool have_work();
int set_work(int i, int connfd);
ConnFdNode get_work();
void debug_pool();
#endif //PTHREAD_POOL_H
.cpp
#include "pthread_pool.h"
extern std::vector<struct epoll_event> events;
extern int epollfd;
extern struct epoll_event epfd;
/*工作队列的定义*/
typedef std::vector<struct ConnFdNode> WorkList;
WorkList waitlist;
pthread_cond_t has_product = PTHREAD_COND_INITIALIZER; //条件变量初始化
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; //互斥锁初始化
pthread_t ntid[4]; //初始化,线程描述符,MAX为最大线程创建数
int prhread_num = 0; //线程数目
/*****************************
> 线程工作函数
******************************/
void *pthread_work(void *arg)
{
pthread_t tid;
tid = pthread_self(); //获取当前线程id
int connfd , i; //定义events下标,和链接描述符,以备调用
ConnFdNode cNode; //定义工作队列结构体,用于返回队列中的内容
char buf[100]; //读写相关
bzero(buf, sizeof(buf));
int n ,nread, nwrite;
while(1)
{
pthread_mutex_lock(&lock); //枷锁,队列操作同步
pthread_cond_wait(&has_product, &lock); //等待被唤醒
cNode = get_work(); //获取队列结构体,取出events下标,和链接描述符
connfd = cNode.connfd;
i = cNode.i;
if (events[i].events & EPOLLIN) //如果可读读出数据
{
while((nread = read(connfd, buf, 100)) > 0) //read to over
{
n += nread;
}
if (n > 0)
{
std::cout << tid <<"::" << connfd <<" Date: ["<< buf <<"]" << "events"<<i << std::endl;
epfd.data.fd = connfd;
epfd.events = events[i].events | EPOLLOUT;
if(epoll_ctl(epollfd, EPOLL_CTL_MOD, connfd, &epfd) == -1)
{
std::cout << "epoll_ctl return -1"<< std::endl;
exit(1);
}
}else if (nread == 0)
{
std::cout << connfd << "is go" << std::endl;
close(connfd);
epfd = events[i];
epoll_ctl(epollfd, EPOLL_CTL_DEL, connfd, &epfd);
}
}
/* if (events[i].events & EPOLLOUT)
{
while(n > 0)//write ot over
{
nwrite = write(connfd, buf, n);
n -= nwrite;
}
}*/
pthread_mutex_unlock(&lock);
}
}
/*****************************
> 线程创建
> 参数:要创建线程数目
> 返回:成功返回为所有线程创建返回值之和
******************************/
int make_pthread(int pthreadNum)
{
int err[pthreadNum], error;
prhread_num = pthreadNum;
for(int i=0; i<pthreadNum; i++)
{
err[i] = pthread_create(&ntid[i], NULL, pthread_work, NULL);
if(err[i] != 0)
{
std::cout << "make pthread error :" << err[i] << std::endl;
exit(1);
}
error += err[i];
std::cout << "\033[32mNO.\033[0m"<< i+1 << "\033[32m, pthread creation successful!\033[0m" << std::endl;
}
return error;
}
/*****************************
> 销毁所有线程
> 返回:0
******************************/
int destroy_pthread()
{
for(int i=0; i< prhread_num; ++i)
{
pthread_join(ntid[i], NULL);
}
return 0;
}
/*****************************
> 如果工作队列不为空,唤醒等待的线程
> 返回:为空 0; 不为空 1
******************************/
bool have_work()
{
bool Y_N = 0;
if (!waitlist.empty())
{
Y_N = 1;
pthread_cond_signal(&has_product);
}
return Y_N;
}
/**********************************************************************/
/*****************************
> 添加工作到工作队列
> 参数:i:events下标,connfd:链接描述符
> 返回:events下标
******************************/
int set_work(int i, int connfd)
{
ConnFdNode cNode;
cNode.i = i;
cNode.connfd = connfd;
//std::cout << "cNode:" << i << cNode.i <<" "<< cNode.epfd.data.fd << std::endl;
pthread_mutex_lock(&lock);
waitlist.push_back(cNode);
have_work();
pthread_mutex_unlock(&lock);
return i;
}
/*****************************
> 取出工作从工作队列
> 返回:工作队列结构体
******************************/
ConnFdNode get_work()
{
ConnFdNode cNode;
cNode.i = waitlist.front().i;
cNode.connfd = waitlist.front().connfd;
waitlist.erase(waitlist.begin());
return cNode;
}
/*****************************
> 用于测试打印
******************************/
void debug_pool()
{
std::cout << "Wait list :" << waitlist.size() << std::endl;
for (int i = 0; i < waitlist.size(); ++i)
{
std::cout << waitlist[i].i << " " ;
}
std::cout << std::endl;
}
- 主函数
.h
存放库头文件,这里没有声明
.cpp
#include "myepollpthreadpoolserver.h"
typedef std::vector<struct epoll_event> EpollList;
EpollList events(16);
int epollfd;
struct epoll_event epfd;
extern pthread_mutex_t lock;
int main()
{
int listenfd;
listenfd = Socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0); // fei zu se IO fu yong
struct sockaddr_in serveraddr;
bzero(&serveraddr, sizeof(serveraddr));
serveraddr.sin_family = AF_INET;
serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
serveraddr.sin_port = htons(8000);
int opt = 1;
setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
Bind(listenfd, (struct sockaddr *)&serveraddr, sizeof(serveraddr));
Listen(listenfd, 20);
// clinet init date
struct sockaddr_in clientaddr;
socklen_t clientlen;
int connfd;
//epoll
//typedef std::vector<struct epoll_event> EpollList;
//int epollfd;
epollfd = epoll_create1(EPOLL_CLOEXEC);
//Creates a handle to epoll, the size of which tells the kernel how many listeners there are.
/*ET*/
//struct epoll_event epfd;
epfd.data.fd = listenfd;
epfd.events = EPOLLIN | EPOLLET ;
epoll_ctl(epollfd, EPOLL_CTL_ADD, listenfd, &epfd);
//EpollList events(16);//You can listen for 16 at first
int nready;
make_pthread(4); //创建线程
while(1)
{
nready = epoll_wait(epollfd, &*events.begin(), static_cast<int>(events.size()), -1);
if (nready == -1)
{
if(errno == EINTR)
continue;
perror("epoll_wait");
}
if(nready == 0)
continue;
if ((size_t)nready == events.size())
{
events.resize(events.size() * 2);
}
for(int i=0; i < nready; ++i)
{
if (events[i].data.fd == listenfd)
{
clientlen = sizeof(clientaddr);
connfd = Accept4(listenfd, (struct sockaddr*)&clientaddr, &clientlen,
SOCK_NONBLOCK | SOCK_CLOEXEC);// fei zu se IO fu yong
std::cout << connfd << "is come!" << std::endl;
/*ET*/
pthread_mutex_lock(&lock);
epfd.data.fd = connfd;
epfd.events = EPOLLIN | EPOLLET;
epoll_ctl(epollfd, EPOLL_CTL_ADD, connfd, &epfd);
pthread_mutex_unlock(&lock);
}else if (events[i].events & EPOLLIN)
{
connfd = events[i].data.fd;
if (connfd < 0)
{
continue;
}
set_work(i, connfd); //通知线程池
}
/*ET*/
/*if (events[i].events & EPOLLOUT)
{
connfd = events[i].data.fd;
if (connfd < 0)
{
continue;
}
//std::cout << connfd << "is write!!!!!!!!!!!!!!!!!" << std::endl;
//write(connfd, "systim date .....", n);
set_work(i, connfd);
/*while(n > 0)//write ot over
{
nwrite = write(connfd, buf, n);
n -= nwrite;
}
}*/
}
}
destroy_pthread(); //销毁线程
return 0;
}
5.以上存在的问题
- 设计粗糙
- 代码混乱
- 对同步问题和生产者消费者问题的解决粗糙
- 线程池粗糙
- 无法接受数据密集测试
- 存在潜在错误
此系列文,还在更新中