实现一个简单的线程池

问题描述:

目前我在需要一种简单高效的线程池实现的。我在这里和谷歌搜索了很多有趣的链接,但到目前为止我找到的东西似乎都很合适。我在网络上发现的大多数实现都太复杂或者缺乏一些我需要的关键特性。实现一个简单的线程池

此外,我不希望使用的代码,我不明白,所以我决定编写它自己(有时重新发明*帮助我向前推动自己在知识和经验方面)。我当然明白线程池背后的基本思想,但是一些实现细节对我来说还是有点不清楚。这可能是因为我需要的线程池有点特别。让我描述一下。我有一个任务在特定的(大)缓冲区上完成了数十万次。我测量过,如果我为这个任务使用线程,性能会好很多 - 缓冲区被分割成子缓冲区,每个线程都在子缓冲区上执行任务并返回结果。然后所有线程的所有结果加在一起,给我最终的解决方案。

然而,因为这样做很多时候我失去了,因为这么多的线程创建的宝贵的时间(因为自带的线程创建的开销)。所以我想有一个线程池来执行这个任务,而不是每次创建一组新的线程。

更清楚,这是我到目前为止有:

  • 斯普利特缓冲成相同大小
  • 对于每个子缓冲器的N个子缓冲区,创建一个线程并运行它在副缓冲
  • 等待所有线程完成(WaitForMultipleObjects的),加在一起的结果和destory螺纹
  • 重复

我想要实现的是这样的:

  • 拆分缓冲成相同大小
  • 分配每个子缓冲器从线程池的螺纹(其具有正好N个线程)
  • 的N个子缓冲器
  • 一旦线程结束,让它睡觉,直到当所有的线程完成(睡觉),另一个任务是准备
  • 加在一起,他们通过唤醒线程产生
  • 重复的结果,并为它们分配新的任务

正如你可以看到,这是一个有点特殊的线程池,因为我需要等待线程完成。基本上我想摆脱创建线程的所有时间的开销,因为程序经过成千上万次迭代的,因此它可以创建&销毁线程的milions在其生命周期。好消息是,我不需要在线程之间进行任何同步,他们都获得了自己的数据和存储空间。但是,我必须等到所有线程完成后才能有最终解决方案,因为下一个任务取决于前一个任务的结果。

我的主要问题是与线程的管理:

  • 如何让我的线程“休眠”,并唤醒他们一次新的任务是准备好了?
  • 我该如何等待所有线程完成?

我会感谢任何帮助。如果我不够清楚,也可以随时提问。谢谢!

+1

我没有看到它有什么特别之处,阻塞线程完成是完全正常的。由OS实现的线程池将工作得很好。使用QueueUserWorkItem(),CreateEvent和SetEvent进行同步。 – 2012-03-06 11:22:31

+0

@HansPassant:嗨,谢谢你的建议。你可能会把它作为一个答案与更多的细节?我不熟悉你提到的功能。 – PeterK 2012-03-06 11:37:38

+0

您发现WaitForMultipleObjects()的地方相同,请使用MSDN Library。 – 2012-03-06 11:46:16

对于我的首选方法用于与线程进行通信是经由条件变量。因为您可以在变化时定义所需的条件和信号。在你的情况下,你可以将与一个队列结合使用,子缓冲区通过该队列,所以每个线程在队列为空时等待。所述结果然后可以在另一个队列其中管理队列等待,直到所有线程已经发布的结果到队列放(这个队列中的参考作为请求连同子缓冲器传递)。

如何让我的线程“休眠”,并唤醒他们一旦有新的任务是 准备好了吗?

您使用互斥或​​信号,根据您的情况(在某些情况下,你可能需要使用一个条件变量或自动/手动resetevent),使线程彼此等待或唤醒时的东西发生。

我如何等待所有的线程完成?

你必须使用join()每个线程在等待它完成。所以如果你有一个线程集合,你可能想要在每个仍在运行的线程上调用join。

作为一个单独的音符:线程池就已经存在,你可以只使用之类的东西,而不是Boost.Threadpool重新发明*的。

+0

请问为什么你更喜欢互斥和信号来调节变量?我想我已经看到了某个地方,但我不确定这是否是一种好的做法。 – 2012-03-06 10:03:13

+0

@ J.N。我从来没有见过或读过任何互斥体或信号量不好的想法。 – 2012-03-06 10:09:51

+1

当作为互斥体或信号量使用时,我也不是。另一方面,互斥体和信号量并不意味着阻止线程等待其他人完成任务,而是意味着共享资源。 – 2012-03-06 10:13:01

你看过其他线程池的实现吗?例如http://threadpool.sourceforge.net/。你想要完成的不是全新的。一种使线程等待新任务的方法是在另一个任务准备好时阻塞互斥体并解除该互斥体。您也可以让线程通知使用某种来自线程的通知返回给父级。

在我行我已经使用线程池被工作/线程严重,并已使用ØMQ跨线程的通信,这使得线程从ØMQ一个read()请求时,它已经准备好新的工作阻止。

随着一点点研究,并随着时间和精力一点点,你应该能够找出如何可以建立或利用现有的框架/工具来建立你所需要的。然后当你有一些你遇到问题的代码时,你可以回到SO。

“正如你所看到的,这是一个有点特殊的线程池,因为我需要等待线程完成。” - 不完全的。您希望处理作业中最后一项任务的线程提供作业完成通知。完成通知是一个theadPool的正常功能,否则,始发线程将无法处理一组完整的结果。池通常同时处理多个任务/任务层次结构,因此完成通知方法应该是线程无关的 - 不需要join()或任何类型的东西。此外,没有WaitForMultipleObject() - 使用难以管理且仅限于64个对象的同步对象数组。

线程池通常有一个线程池在生产者 - 消费者队列中等待任务。这些任务通常从一些提供线程池服务的'Ctask'类继承。完成深化和通知的机制就是其中之一。

生产者 - 消费者队列本质上是由一个互斥体,并在队列计数任务的信号量和线程等待的保护,多址“正常”队列类。池线程每个通过该队列,他们永远循环下去,等待队列信号,从锁定队列弹出的任务,然后和callling的run()中的任务的方法获得。

每个任务都会有加载数据成员线程池,因为它被提交到池中。这允许任务在需要时提交更多任务。每个任务的

完成通常是通过调用事件方法是任务中的一员,由始发线程加载之前的任务提交给池某通知。

任务也应该有一个子任务的原子倒计时整数和事件等对其他任务的完成。

怎么可能这项工作在你的榜样?你可以有一个'主要'任务提交数组处理任务并等待他们全部完成。

teh池中应该有更多的线程比核心。我建议两倍。

该数组需要拆分,以便为每个部分使用单独的任务。有多少任务 - 足够使可用内核全部用完,但不会产生太多以至于生成过多的上下文切换。对于任何合理大小的阵列,可以说64个任务是合理的分割 - 超过可用处理器的典型数量。此外,这些任务不应该按顺序拆分以避免虚假分享。

因此,这种“主要”阵列处理任务。使用数组引用加载它,并将其完成事件设置为指向某种发信号事件的方法。将任务提交给池,等待事件。

任务被加载到一个线程。它的run()方法使用两个循环并创建32个数组处理任务,每个任务都有自己的起始索引和长度到数组中,但具有非连续的起始索引。该任务使用自己的继承的submit()方法将32个新任务中的每一个加载到池中。以及对线程实际上排队的任务执行,这个提交(也)原子递增完成计数整数,并将任务私人完成事件完成事件排队的任务之前。私有完成事件原子 - 递减完成计数并且如果为零则表示事件。在提交所有32个数组处理事件后,主要任务将等待私人完成事件。

因此,32个数组处理任务在线程上运行。随着每个完成,正在运行它的线程将调用其完成事件,该事件会减少主任务中的完成计数整数。最后,最后一个数组处理任务完成并且完成计数整数递减为零,以便通知运行主任务的线程正在等待的事件。主任务调用它自己的完成事件,因此发信号通知主任务发起者正在等待的事件。

主要任务发起者在完全处理阵列时运行。

..或者,你可以使用一个已经工作,别人的建议线程池类。