并发和顺序执行的性能测试,以计算圆周率为例

以计算PI圆周率为例,可以有如下处理方式:

1.对Linq的Sum()函数。

2.使用最简单的顺序for循环

3.使用Parallel并行类

4.使用Parallel并行类 以及 Partitioner线程安全分区类

同时比较lock和Interlocked在线程同步中的性能

测试源程序:

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace EstimatePerformanceDemo
{
    class Program
    {
        const int NumberOfSteps = 500_000_000;
        static void Main(string[] args)
        {
            Console.WriteLine("Function               | Elapsed Time     | Estimated Pi");
            Console.WriteLine("-----------------------------------------------------------------");

            Time(SerialLinqPi, nameof(SerialLinqPi));
            Time(ParallelLinqPi, nameof(ParallelLinqPi));
            Time(SerialPi, nameof(SerialPi));
            Time(ParallelPi, nameof(ParallelPi));
            Time(ParallelPiUseInterlocked, nameof(ParallelPiUseInterlocked));
            Time(ParallelPartitionerPi, nameof(ParallelPartitionerPi));
            Time(ParallelPartitionerPiUseInterlocked, nameof(ParallelPartitionerPiUseInterlocked));
            Console.ReadLine();
        }

        /// <summary>
        /// 对函数的执行进行计时,并输出所用时间和函数的结果。
        /// </summary>
        /// <param name="estimatePi">计算圆周率的委托函数</param>
        /// <param name="function">方法名</param>
        static void Time(Func<double> estimatePi, string function)
        {
            var sw = Stopwatch.StartNew();
            var pi = estimatePi();
            Console.WriteLine($"{function.PadRight(36)} | {sw.Elapsed} | {pi}");
        }

        /// <summary>
        /// Estimates the value of PI using a LINQ-based implementation.
        /// </summary>
        /// <returns></returns>
        static double SerialLinqPi()
        {
            double step = 1.0 / (double)NumberOfSteps;
            return (from i in Enumerable.Range(0, NumberOfSteps)
                    let x = (i + 0.5) * step
                    select 4.0 / (1.0 + x * x)).Sum() * step;
        }

        /// <summary>
        /// 使用基于LINQ的实现估计PI的值。
        /// </summary>
        /// <returns></returns>
        static double ParallelLinqPi()
        {
            double step = 1.0 / (double)NumberOfSteps;
            return (from i in ParallelEnumerable.Range(0, NumberOfSteps)
                    let x = (i + 0.5) * step
                    select 4.0 / (1.0 + x * x)).Sum() * step;
        }

        /// <summary>
        /// 使用【顺序for循环】估计PI的值。
        /// </summary>
        /// <returns></returns>
        static double SerialPi()
        {
            double sum = 0.0;
            double step = 1.0 / (double)NumberOfSteps;
            for (int i = 0; i < NumberOfSteps; i++)
            {
                double x = (i + 0.5) * step;
                sum += 4.0 / (1.0 + x * x);
            }
            return step * sum;
        }

        /// <summary>
        /// 使用并发【Parallel.For】估计PI的值。默认使用【lock】
        /// </summary>
        /// <returns></returns>
        static double ParallelPi()
        {
            double sum = 0.0;
            double step = 1.0 / (double)NumberOfSteps;
            object monitor = new object();
            Parallel.For(0, NumberOfSteps, () => 0.0, (i, state, local) =>
            {
                double x = (i + 0.5) * step;
                return local + 4.0 / (1.0 + x * x);
            }, local => { lock (monitor) sum += local; });
            return step * sum;
        }

        /// <summary>
        /// 使用并发【Parallel.For】估计PI的值。【使用Interlocked 比 lock性能有所优化】
        /// </summary>
        /// <returns></returns>
        static double ParallelPiUseInterlocked()
        {
            double sum = 0.0;
            double step = 1.0 / (double)NumberOfSteps;
            int monitor = 0;
            Parallel.For(0, NumberOfSteps, () => 0.0, (i, state, local) =>
            {
                double x = (i + 0.5) * step;
                return local + 4.0 / (1.0 + x * x);
            }, local => 
            {
                while (Interlocked.Exchange(ref monitor, 1) != 0)
                { }
                sum += local;
                Interlocked.Exchange(ref monitor, 0);
            });
            return step * sum;
        }

        /// <summary>
        /// 使用并发【Parallel.ForEach】估计PI的值。【使用分区区间】
        /// </summary>
        /// <returns></returns>
        static double ParallelPartitionerPi()
        {
            double sum = 0.0;
            double step = 1.0 / (double)NumberOfSteps;
            object monitor = new object();
            Parallel.ForEach(Partitioner.Create(0, NumberOfSteps), () => 0.0, (range, state, local) =>
            {
                for (int i = range.Item1; i < range.Item2; i++)
                {
                    double x = (i + 0.5) * step;
                    local += 4.0 / (1.0 + x * x);
                }
                return local;
            }, local => { lock (monitor) sum += local; });
            return step * sum;
        }

        /// <summary>
        /// 使用并发【Parallel.ForEach】估计PI的值。【使用分区区间】【使用Interlocked】
        /// </summary>
        /// <returns></returns>
        static double ParallelPartitionerPiUseInterlocked()
        {
            double sum = 0.0;
            double step = 1.0 / (double)NumberOfSteps;
            int monitor = 0;
            Parallel.ForEach(Partitioner.Create(0, NumberOfSteps), () => 0.0, (range, state, local) =>
            {
                for (int i = range.Item1; i < range.Item2; i++)
                {
                    double x = (i + 0.5) * step;
                    local += 4.0 / (1.0 + x * x);
                }
                return local;
            }, local => 
            {
                while (Interlocked.Increment(ref monitor) != 1)
                { }
                sum += local;
                Interlocked.Decrement(ref monitor);
            });
            return step * sum;
        }
    }
}
 

程序运行结果:

测试后发现:使用Parallel和Partitioner以及Interlocked 在超大循环中性能最佳

并发和顺序执行的性能测试,以计算圆周率为例