线程池和内存池
文章目录
一、线程池
1、线程池的概念
服务器程序利用线程技术响应客户请求已经司空见惯,但是线程的使用是有待优化和处理的。单线程执行并不是一个高效的方式,这个时候可能就要考虑高并发,多线程等方式。线程池也是线程优化的一种方式。
在面向对象的过程中,对象的创建和销毁是非常占资源的,每创建一个对象都要获取内存资源以及其他一些资源。这就产生了“池化技术”。
【线程池如何提高服务器程序的性能?】
- T1 = 创建线程;
- T2 = 执行线程 包括访问共享数据、线程同步等;
- T3 = 销毁线程;
- T = T1 + T2 + T3。
单线程的情况下,系统花大量的时间再T1、T3阶段。我们要采取最优的措施,减少T1和T3的系统开销。线程池不仅调整T1,T3产生的时间段,而且它还显著减少了创建线程的数目。
2、线程池的组成部分
(1) 线程管理器:用于创建并管理线程池。
(2)工作线程:线程池中实际执行任务的线程。在初始化线程时会预先创建好固定数目的线程在池中,这些初始化的线程一般处于空闲状态。
(3)任务接口:每个任务必须实现的接口。当线程池的任务队列中有可执行任务时,被空间的工作线程调去执行(线程的闲与忙的状态是通过互斥量实现的),把任务抽象出来形成一个接口,可以做到线程池与具体的任务无关。
(4)任务队列:用来存放没有处理的任务。提供一种缓冲机制。实现这种结构有很多方法,常用的有队列和链表结构。
3、线程池的流程
1. 调用pthread_create()创建若干线程,置入线程池。
2. 调用pthread_cond_wait()等待任务队列不为空的条件上;任务到达时,从线程池取空闲线程处理任务队列中的任务;调用pthread_cond_signal()通知可以有新任务添加进来。
3. 调用pthread_cond_wait()等待任务队列不为满的条件上;向任务队列中添加任务;调用pthread_cond_signal()通知线程池中的线程去处理任务。
4. 调用pthread_detach()销毁线程池中的线程。
// 伪代码
int main(void)
{
threadpool_t *thp = pthreadpool_create();
for (int i = 0; i < max_pool; ++i)
{
num[i] = i;
threadpool_add(thp, process, (void *)&num[i]);
}
threadpool_destroy(thp);
return 0;
}
// 任务函数
void *process(void *arg) { }
// 创建线程池并做一些初始化的工作
threadpool_t *pthreadpool_create()
{
init();
for (i = 0; i < max_thr_num; ++i)
{
pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool);
}
}
// 线程池中的每个线程或者终止或者处理任务
void *threadpool_thread()
{
while (true)
{
// 如果任务队列为空,则调用wait阻塞在等待条件上,当wait被调用时说明有任务,则退出while
while ( (pool->queue_size == 0))
{
pthread_cond_wait(&(pool->queue_not_empty), &(pool->lock));
}
/* 取任务 */
// 通知可以有新的任务添加进来
pthread_cond_signal(&(pool->queue_not_full));
/* 执行任务 */
}
}
// 向任务队列中添任务并通知线程池去处理任务
int threadpool_add()
{
// 如果任务队列为满,则调用wait阻塞在等待条件上,当wait被调用时说明队列不为空,则退出while
while ( (pool->queue_size == pool->queue_max_size) && (!pool->shutdown) )
{
pthread_cond_wait(&(pool->queue_not_full), &(pool->lock));
}
// 添加完任务后,任务队列不为空,通知线程池中的一个线程去处理任务
pthread_cond_signal(&(pool->queue_not_empty));
}
// 销毁线程池
void threadpool_destroy() { }
4、线程池的Demo
#include <stdlib.h>
#include <stdio.h>
#include <pthread.h>
#include <string.h>
#include <unistd.h>
#define true 1
#define false 0
typedef struct
{
void *(*function)(void *); // 函数指针,回调函数
void *arg; // 上面函数的参数
}threadpool_task_t; // 各子线程任务结构体
struct threadpool_t
{
pthread_mutex_t lock; // 线程池的锁(锁住本结构体)
pthread_mutex_t thread_counter; // 记录忙状态线程个数的锁
pthread_cond_t queue_not_full; // 任务队列满时,线程池中的线程阻塞,等待此条件变量
pthread_cond_t queue_not_empty; // 任务队列不为空时,通知等待任务的线程
pthread_t *threads; // 线程池(存放的是每个线程的id)
pthread_t adjust_tid; // 管理者线程(管理线程池)
threadpool_task_t *task_queue; // 任务队列(数组首地址)
int min_thr_num; // 线程池最小线程数
int max_thr_num; // 线程池最大线程数
int busy_thr_num; // 忙状态线程数
int live_thr_num; // 当前存活的线程数
int wait_exit_thr_num; // 要销毁的线程数
int queue_front; // 任务队列的队头
int queue_rear; // 任务队列的队尾
int queue_size; // 任务队列的实际任务数
int queue_max_size; // 任务队列可容纳任务数的上限
int shutdown; // 标志位,线程池使用状态,true或者false
};
void threadpool_free(threadpool_t *pool);
void *threadpool_thread(void *threadpool);
void *adjust_thread(void *threadpool);
// 创建线程池并做一些初始化的工作
threadpool_t *pthreadpool_create(int min_thr_num, int max_thr_num, int queue_max_size)
{
int i;
threadpool_t *pool = NULL;
do
{
if ( (pool = (threadpool_t *)malloc(sizeof(threadpool_t))) == NULL )
{
printf("malloc threadpool fail\n");
break;
}
// 初始化一些基本参数
pool->min_thr_num = min_thr_num;
pool->max_thr_num = max_thr_num;
pool->busy_thr_num = 0;
pool->live_thr_num = min_thr_num;
pool->wait_exit_thr_num = 0;
pool->queue_size = 0;
pool->queue_max_size = queue_max_size;
pool->queue_front = 0;
pool->queue_rear = 0;
pool->shutdown = false;
// 初始化线程池,开辟数组空间
pool->threads = (pthread_t *)malloc(sizeof(pthread_t) * max_thr_num);
if (pool->threads == NULL)
{
printf("malloc threads fail\n");
break;
}
memset(pool->threads, 0, sizeof(pthread_t) * max_thr_num);
// 初始化任务队列
pool->task_queue = (threadpool_task_t *)malloc(sizeof(threadpool_task_t) * queue_max_size);
if (pool->task_queue == NULL)
{
printf("malloc task_queue fail\n");
break;
}
// 初始化锁和条件变量
if (pthread_mutex_init(&(pool->lock), NULL) != 0
|| pthread_mutex_init(&(pool->thread_counter), NULL) != 0
|| pthread_cond_init(&(pool->queue_not_empty), NULL) != 0
|| pthread_cond_init(&(pool->queue_not_full), NULL) != 0)
{
printf("init lock or cond fail\n");
break;
}
// 初始化线程池和管理者线程
for (i = 0; i < max_thr_num; ++i)
{
pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool);
printf("start thread %lu ...\n", pool->threads[i]);
}
pthread_create(&(pool->adjust_tid), NULL, adjust_thread, (void *)pool);
return pool;
}while(0);
// 前面代码调用失败时,释放pool的空间
threadpool_free(pool);
}
// 线程池中的每个线程或者终止或者处理任务
void *threadpool_thread(void *threadpool)
{
threadpool_t *pool = (threadpool_t *)threadpool;
threadpool_task_t task;
while (true)
{
pthread_mutex_lock(&(pool->lock));
// 如果任务队列为空,则调用wait阻塞在等待条件上,当wait被调用时说明有任务,则退出while
while ( (pool->queue_size == 0) && (!pool->shutdown) )
{
printf("thread %lu is waitting\n", pthread_self());
// 等待工作队列不为空的条件
pthread_cond_wait(&(pool->queue_not_empty), &(pool->lock));
// 任务队列为空了,已经没有任务可以执行了,让所有线程自动终止
if (pool->wait_exit_thr_num > 0)
{
pool->wait_exit_thr_num--;
if (pool->live_thr_num > pool->min_thr_num)
{
printf("thread %lu is exitting\n", pthread_self());
pool->live_thr_num--;
pthread_mutex_unlock(&(pool->lock));
pthread_exit(NULL);
}
}
}
if (pool->shutdown)
{
pthread_mutex_unlock(&(pool->lock));
printf("thread %lu is exitting\n", pthread_self());
pthread_detach(pthread_self());
pthread_exit(NULL);
}
// 从任务队列中取任务(队头)
task.function = pool->task_queue[pool->queue_front].function;
task.arg = pool->task_queue[pool->queue_front].arg;
// 队头元素向后移动
pool->queue_front = (pool->queue_front + 1) % pool->queue_max_size;
pool->queue_front++;
// 通知可以有新的任务添加进来
pthread_cond_signal(&(pool->queue_not_full));
pthread_mutex_unlock(&(pool->lock));
// 正在处理任务,忙状态线程数加1
printf("thread %lu is working\n", pthread_self());
pthread_mutex_lock(&(pool->thread_counter));
pool->busy_thr_num++;
pthread_mutex_unlock(&(pool->thread_counter));
// 执行回调函数去处理任务
(*(task.function))(task.arg);
// 任务处理完毕,忙状态线程数减1
printf("thread %lu is end working\n", pthread_self());
pthread_mutex_lock(&(pool->thread_counter));
pool->busy_thr_num--;
pthread_mutex_unlock(&(pool->thread_counter));
}
pthread_exit(NULL);
}
// 管理者线程
void *adjust_thread(void *threadpool)
{
int i;
threadpool_t *pool = (threadpool_t *)threadpool;
while (!pool->shutdown)
{
// 每隔10秒对当前线程池进行管理
sleep(10);
// 通过加锁和解锁访问数据
pthread_mutex_lock(&(pool->lock));
int queue_size = pool->queue_size;
int live_thr_num = pool->live_thr_num;
pthread_mutex_unlock(&(pool->lock));
pthread_mutex_lock(&(pool->thread_counter));
int busy_thr_num = pool->busy_thr_num;
pthread_mutex_unlock(&(pool->thread_counter));
// 线程池扩容和瘦身
}
}
// 向任务队列中添任务并通知线程池去处理任务
int threadpool_add(threadpool_t *pool, void *(*function)(void *arg), void *arg)
{
pthread_mutex_lock(&(pool->lock));
while ( (pool->queue_size == pool->queue_max_size) && (!pool->shutdown) )
{
pthread_cond_wait(&(pool->queue_not_full), &(pool->lock));
}
// 如果线程池需要关闭了, 通知线程池中的线程自动终止
if (pool->shutdown)
{
pthread_cond_signal(&(pool->queue_not_empty));
pthread_mutex_unlock(&(pool->lock));
return 0;
}
if (pool->task_queue[pool->queue_rear].arg != NULL)
{
pool->task_queue[pool->queue_rear].arg == NULL;
}
pool->task_queue[pool->queue_rear].function = function;
pool->task_queue[pool->queue_rear].arg = arg;
pool->queue_rear = (pool->queue_rear + 1) % pool->queue_max_size;
pool->queue_rear++;
// 添加完任务后,任务队列不为空,通知线程池中的一个线程去处理任务
pthread_cond_signal(&(pool->queue_not_empty));
pthread_mutex_unlock(&(pool->lock));
}
// 释放所有的资源
void threadpool_free(threadpool_t *pool)
{
if (pool == NULL)
{
return;
}
if (pool->task_queue)
{
free(pool->task_queue);
}
// 释放和线程池相关的资源
if (pool->threads)
{
free(pool->threads);
pthread_mutex_lock(&(pool->lock));
pthread_mutex_destroy(&(pool->lock));
pthread_mutex_lock(&(pool->thread_counter));
pthread_mutex_destroy(&(pool->thread_counter));
pthread_cond_destroy(&(pool->queue_not_empty));
pthread_cond_destroy(&(pool->queue_not_full));
}
free(pool);
return;
}
// 销毁线程池(通知线程池中的线程自我终止)
void threadpool_destroy(threadpool_t *pool)
{
int i;
if (pool == NULL)
{
return;
}
pool->shutdown = true;
// 销毁管理者线程
pthread_detach(pool->adjust_tid);
for (i = 0; i < pool->live_thr_num; ++i)
{
// 通知所有空闲线程
pthread_cond_signal(&(pool->queue_not_empty));
}
for (i = 0; i < pool->live_thr_num; ++i)
{
// 回收空闲线程
pthread_detach(pool->threads[i]);
}
threadpool_free(pool);
return;
}
// 实际需要处理的任务
void *process(void *arg)
{
sleep(1);
printf("hello\n");
return NULL;
}
int main(void)
{
// 创建线程池,最小3个线程,最大100个线程,任务队列最大容量为100
threadpool_t *thp = pthreadpool_create(3, 100, 100);
printf("thread pool init\n");
int num[20], i;
for (i = 0; i < 20; ++i)
{
num[i] = i;
printf("add task %d\n", i);
// 向线程池中添加任务
// int threadpool_add(threadpool_t *pool, void *(*function)(void *arg), void *arg)
threadpool_add(thp, process, (void *)&num[i]);
}
sleep(5);
threadpool_destroy(thp);
return 0;
}
5、线程池的应用
【线程池适用于】:
-
需要大量的线程来完成任务,且完成任务的时间比较短。web服务器完成网页请求这样的任务,使用线程池技术是非常合适的。 因为单个任务小,而任务数量巨大,一个热门网站的点击次数会很多。
-
对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。
-
接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。
【线程池不适用于】:
-
如果需要使一个任务具有特定优先级。
-
如果具有可能会长时间运行(并因此阻塞其他任务)的任务。
-
如果需要将线程放置到单线程单元中(线程池中的线程均处于多线程单元中)。
-
如果需要永久标识来标识和控制线程,比如想使用专用线程来终止该线程,将其挂起或按名称发现它。
二、线程池的惊群效应
1、惊群效应的概念
惊群效应就是多进程(多线程)在同时阻塞等待同一个事件的时候(休眠状态),如果等待的这个事件发生,那么他就会唤醒等待的所有进程(或者线程),但是最终却只可能有一个进程(线程)获得这个时间的“控制权”,对该事件进行处理,而其他进程(线程)获取“控制权”失败,只能重新进入休眠状态,这种现象和性能浪费就叫做惊群。
为了更好的理解何为惊群,举一个很简单的例子,当你往一群鸽子中间扔一粒谷子,所有的各自都被惊动前来抢夺这粒食物,但是最终注定只可能有一个鸽子满意的抢到食物,没有抢到的鸽子只好回去继续睡觉,等待下一粒谷子的到来。这里鸽子表示进程(线程),那粒谷子就是等待处理的事件。
2、惊群效应存在的问题
-
系统对用户进程/线程频繁的做无效的调度、上下文切换,系统系能大打折扣。
-
为了确保只有一个线程得到资源,用户必须对资源操作进行加锁保护,进一步加大了系统开销。
最常见的例子就是对于socket描述符的accept操作,当多个用户进程/线程监听在同一个端口上时,由于实际只可能accept一次,因此就会产生惊群现象,当然前面已经说过了,这个问题是一个古老的问题,但目前的内核版本已经修复了这个问题,一个链接过来,内核只会唤醒一个子进程出来accept,这样就不用担心惊群效应了。
3、线程池的惊群效应
一个基本的线程池框架是基于生产者和消费者模型的。生产者往队列里面添加任务,而消费者从队列中取任务并进行执行。一般来说,消费时间比较长,一般有许多个消费者。当许多个消费者同时在等待任务队列的时候,也就发生了“惊群效应”。
pthread_cond_signal函数的作用是发送一个信号给另外一个正在处于阻塞等待状态的线程,使其脱离阻塞状态,继续执行.如果没有线程处在阻塞等待状态,pthread_cond_signal也会成功返回。
但使用pthread_cond_signal不会有“惊群现象”产生,它最多只给一个线程发信号。假如有多个线程正在阻塞等待着这个条件变量的话,那么是根据各等待线程优先级的高低确定哪个线程接收到信号开始继续执行。如果各线程优先级相同,则根据等待时间的长短来确定哪个线程获得信号。但无论如何一个pthread_cond_signal调用最多发信一次。所以线程池并不会产生“惊群效应”。同时,这种方式使用多进程共享资源,等待管道或者其他资源等,提供cpu利用率。
4、怎么判断发生了惊群
我们根据strace的返回信息可以确定:
-
系统只会让一个进程真正的接受这个连接,而剩余的进程会获得一个EAGAIN信号。
-
通过返回结果和进程执行的系统调用判断。
5、如何解决惊群效应
Linux内核的3.9版本带来了SO_REUSEPORT特性,该特性支持多个进程或者线程绑定到同一端口,提高服务器程序的性能,允许多个套接字bind()以及listen()同一个TCP或UDP端口,并且在内核层面实现负载均衡。
在未开启SO_REUSEPORT的时候,由一个监听socket将新接收的连接请求交给各个工作者处理,看图示:
在使用SO_REUSEPORT后,多个进程可以同时监听同一个IP:端口,然后由内核决定将新链接发送给哪个进程,显然会降低每个工人接收新链接时锁竞争。
【SO_REUSEPORT解决了什么问题】:
-
允许多个套接字bind()/listen()同一个tcp/udp端口。每一个线程拥有自己的服务器套接字,在服务器套接字上没有锁的竞争。
-
内核层面实现负载均衡。
-
安全层面,监听同一个端口的套接字只能位于同一个用户下面。
-
处理新建连接时,查找listener的时候,能够支持在监听相同IP和端口的多个sock之间均衡选择。
三、内存池
1、内存池的概念
内存池(Memory Pool)是一种内存分配方式。通常我们习惯直接使用new、malloc等API申请内存,这样做的缺点在于所申请内存块的大小不定,当频繁使用时会造成大量的内存碎片并进而降低性能。
内存池则是在真正使用内存之前,预先申请分配一定数量、大小相等(一般情况下)的内存块留作备用。当有新的内存需求时,就从内存池中分出一部分内存块,若内存块不够再继续申请新的内存。这样做的一个显著优点是,使得内存分配效率得到提升。
2、内存池的流程和设计
1. 先申请一块连续的内存空间,该段内存空间能够容纳一定数量的对象。
2. 每个对象连同一个指向下一个对象的指针一起构成一个内存节点(Memory Node)。各个空闲的内存节点通过指针形成一个链表,链表的每一个内存节点都是一块可供分配的内存空间。
3. 某个内存节点一旦分配出去,从空闲内存节点链表中去除。
4. 一旦释放了某个内存节点的空间,又将该节点重新加入空闲内存节点链表。
5. 如果一个内存块的所有内存节点分配完毕,若程序继续申请新的对象空间,则会再次申请一个内存块来容纳新的对象。新申请的内存块会加入内存块链表中。
如上图所示,申请的内存块存放三个可供分配的空闲节点。空闲节点由空闲节点链表管理,如果分配出去,将其从空闲节点链表删除,如果释放,将其重新插入到链表的头部。如果内存块中的空闲节点不够用,则重新申请内存块,申请的内存块由内存块链表来管理。
3、内存池的Demo
#include <iostream>
using namespace std;
// 类模板
template<int ObjectSize, int NumofObjects = 20>
class MemPool
{
private:
//空闲节点结构体
struct FreeNode
{
FreeNode* pNext;
char data[ObjectSize]; // 类的大小(占用的内存)
};
//内存块结构体
struct MemBlock
{
MemBlock* pNext;
FreeNode data[NumofObjects];
};
FreeNode* freeNodeHeader;
MemBlock* memBlockHeader;
public:
MemPool()
{
freeNodeHeader = NULL;
memBlockHeader = NULL;
}
~MemPool()
{
MemBlock* ptr;
while (memBlockHeader)
{
ptr = memBlockHeader->pNext;
delete memBlockHeader;
memBlockHeader = ptr;
}
}
void* malloc();
void free(void*);
};
//分配空闲的节点
template<int ObjectSize, int NumofObjects>
void* MemPool<ObjectSize, NumofObjects>::malloc()
{
// 无空闲节点,申请新内存块(每一次申请的大小都是NumofObjects)
if (freeNodeHeader == NULL)
{
MemBlock* newBlock = new MemBlock;
newBlock->pNext = NULL;
// 设置内存块的第一个节点为空闲节点链表的首节点
freeNodeHeader = &newBlock->data[0];
// 将内存块的其它节点串起来(将内存块挂载到链表上,newBlock->data[i]是一个FreeNode)
for (int i = 1; i < NumofObjects; ++i)
{
newBlock->data[i - 1].pNext = &newBlock->data[i];
}
newBlock->data[NumofObjects - 1].pNext = NULL;
// 如果是首次申请内存块
if (memBlockHeader == NULL)
{
memBlockHeader = newBlock;
}
else
{
// 将新内存块加入到内存块链表
newBlock->pNext = memBlockHeader;
memBlockHeader = newBlock;
}
}
// 如果有空闲链表节点,返回空闲链表节点的第一个节点
void* freeNode = freeNodeHeader;
// 移动空闲链表节点
freeNodeHeader = freeNodeHeader->pNext;
return freeNode;
}
//释放已经分配的节点
template<int ObjectSize, int NumofObjects>
void MemPool<ObjectSize, NumofObjects>::free(void* p)
{
FreeNode* pNode = (FreeNode*)p;
pNode->pNext = freeNodeHeader; // 将释放的节点插入空闲节点头部
freeNodeHeader = pNode;
}
class ActualClass
{
static int count;
int No;
public:
ActualClass()
{
No = count;
count++;
}
void print()
{
cout << this << ": ";
cout << "the " << No << "th object" << endl;
}
void* operator new(size_t size);
void operator delete(void* p);
};
// 定义内存池对象(定义内存池的元素类型和内存池大小)
MemPool<sizeof(ActualClass), 2> mp;
// 重载new和delete操作符
void* ActualClass::operator new(size_t size)
{
return mp.malloc();
}
void ActualClass::operator delete(void* p)
{
mp.free(p);
}
int ActualClass::count = 0;
int main()
{
ActualClass* p1 = new ActualClass;
p1->print();
ActualClass* p2 = new ActualClass;
p2->print();
delete p1;
p1 = new ActualClass;
p1->print();
ActualClass* p3 = new ActualClass;
p3->print();
delete p1;
delete p2;
delete p3;
system("pause");
return 0;
}
4、内存池的特点
-
针对特殊情况,例如需要频繁分配释放固定大小的内存对象时,不需要复杂的分配算法和多线程保护。也不需要维护内存空闲表的额外开销,从而获得较高的性能。
-
由于开辟一定数量的连续内存空间作为内存池块,因而一定程度上提高了程序局部性,提升了程序性能。
-
比较容易控制页边界对齐和内存字节对齐,没有内存碎片的问题。
-
当需要分配管理的内存在100M一下的时候,采用内存池会节省大量的时间,否则会耗费更多的时间。
-
内存池可以防止更多的内存碎片的产生。
-
更方便于管理内存。
四、线程池和内存池的相关问题
1、线程池大小应该设置为多少?
最佳线程数目 = ( (线程等待时间 + 线程CPU时间) / 线程CPU时间 ) * CPU数目
比如平均每个线程CPU运行时间为0.5s,而线程等待时间(非CPU运行时间,比如IO)为1.5s,CPU核心数为8,那么根据上面这个公式估算得到:((0.5+1.5)/0.5)*8=32。这个公式进一步转化为:
最佳线程数目 = (线程等待时间 / 线程CPU时间 + 1)* CPU数目
【结论】:
线程等待时间所占比例越高,需要越多线程。线程CPU时间所占比例越高,需要越少线程。
2、线程池中如果有一个线程出现异常怎么办?
首先,必须使用互斥锁将你的操作在锁保护范围内;其次,就是使用try…catch进行异常捕获,一旦捕获异常就执行回滚操作 ;最后,只要保证同一时刻只有一个线程执行相应的操作并且执行完成后再释放锁,就能保证操作的数据一致性。
参考:https://www.cnblogs.com/cheng07045406/p/3273466.html
https://blog.****.net/K346K346/article/details/49538975
https://blog.****.net/tuantuanls/article/details/41205739
https://blog.****.net/lyztyycode/article/details/78648798
https://blog.****.net/sayhello_world/article/details/72829329
https://blog.****.net/rank_d/article/details/52253868
https://www.cnblogs.com/alwayswangzi/p/7138154.html
https://www.cnblogs.com/yusenwu/p/4685340.html
https://blog.****.net/u011519624/article/details/69263460
https://blog.****.net/fullstack/article/details/23712813