并发和顺序执行的性能测试,以计算圆周率为例
以计算PI圆周率为例,可以有如下处理方式:
1.对Linq的Sum()函数。
2.使用最简单的顺序for循环
3.使用Parallel并行类
4.使用Parallel并行类 以及 Partitioner线程安全分区类
同时比较lock和Interlocked在线程同步中的性能
测试源程序:
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace EstimatePerformanceDemo
{
class Program
{
const int NumberOfSteps = 500_000_000;
static void Main(string[] args)
{
Console.WriteLine("Function | Elapsed Time | Estimated Pi");
Console.WriteLine("-----------------------------------------------------------------");
Time(SerialLinqPi, nameof(SerialLinqPi));
Time(ParallelLinqPi, nameof(ParallelLinqPi));
Time(SerialPi, nameof(SerialPi));
Time(ParallelPi, nameof(ParallelPi));
Time(ParallelPiUseInterlocked, nameof(ParallelPiUseInterlocked));
Time(ParallelPartitionerPi, nameof(ParallelPartitionerPi));
Time(ParallelPartitionerPiUseInterlocked, nameof(ParallelPartitionerPiUseInterlocked));
Console.ReadLine();
}
/// <summary>
/// 对函数的执行进行计时,并输出所用时间和函数的结果。
/// </summary>
/// <param name="estimatePi">计算圆周率的委托函数</param>
/// <param name="function">方法名</param>
static void Time(Func<double> estimatePi, string function)
{
var sw = Stopwatch.StartNew();
var pi = estimatePi();
Console.WriteLine($"{function.PadRight(36)} | {sw.Elapsed} | {pi}");
}
/// <summary>
/// Estimates the value of PI using a LINQ-based implementation.
/// </summary>
/// <returns></returns>
static double SerialLinqPi()
{
double step = 1.0 / (double)NumberOfSteps;
return (from i in Enumerable.Range(0, NumberOfSteps)
let x = (i + 0.5) * step
select 4.0 / (1.0 + x * x)).Sum() * step;
}
/// <summary>
/// 使用基于LINQ的实现估计PI的值。
/// </summary>
/// <returns></returns>
static double ParallelLinqPi()
{
double step = 1.0 / (double)NumberOfSteps;
return (from i in ParallelEnumerable.Range(0, NumberOfSteps)
let x = (i + 0.5) * step
select 4.0 / (1.0 + x * x)).Sum() * step;
}
/// <summary>
/// 使用【顺序for循环】估计PI的值。
/// </summary>
/// <returns></returns>
static double SerialPi()
{
double sum = 0.0;
double step = 1.0 / (double)NumberOfSteps;
for (int i = 0; i < NumberOfSteps; i++)
{
double x = (i + 0.5) * step;
sum += 4.0 / (1.0 + x * x);
}
return step * sum;
}
/// <summary>
/// 使用并发【Parallel.For】估计PI的值。默认使用【lock】
/// </summary>
/// <returns></returns>
static double ParallelPi()
{
double sum = 0.0;
double step = 1.0 / (double)NumberOfSteps;
object monitor = new object();
Parallel.For(0, NumberOfSteps, () => 0.0, (i, state, local) =>
{
double x = (i + 0.5) * step;
return local + 4.0 / (1.0 + x * x);
}, local => { lock (monitor) sum += local; });
return step * sum;
}
/// <summary>
/// 使用并发【Parallel.For】估计PI的值。【使用Interlocked 比 lock性能有所优化】
/// </summary>
/// <returns></returns>
static double ParallelPiUseInterlocked()
{
double sum = 0.0;
double step = 1.0 / (double)NumberOfSteps;
int monitor = 0;
Parallel.For(0, NumberOfSteps, () => 0.0, (i, state, local) =>
{
double x = (i + 0.5) * step;
return local + 4.0 / (1.0 + x * x);
}, local =>
{
while (Interlocked.Exchange(ref monitor, 1) != 0)
{ }
sum += local;
Interlocked.Exchange(ref monitor, 0);
});
return step * sum;
}
/// <summary>
/// 使用并发【Parallel.ForEach】估计PI的值。【使用分区区间】
/// </summary>
/// <returns></returns>
static double ParallelPartitionerPi()
{
double sum = 0.0;
double step = 1.0 / (double)NumberOfSteps;
object monitor = new object();
Parallel.ForEach(Partitioner.Create(0, NumberOfSteps), () => 0.0, (range, state, local) =>
{
for (int i = range.Item1; i < range.Item2; i++)
{
double x = (i + 0.5) * step;
local += 4.0 / (1.0 + x * x);
}
return local;
}, local => { lock (monitor) sum += local; });
return step * sum;
}
/// <summary>
/// 使用并发【Parallel.ForEach】估计PI的值。【使用分区区间】【使用Interlocked】
/// </summary>
/// <returns></returns>
static double ParallelPartitionerPiUseInterlocked()
{
double sum = 0.0;
double step = 1.0 / (double)NumberOfSteps;
int monitor = 0;
Parallel.ForEach(Partitioner.Create(0, NumberOfSteps), () => 0.0, (range, state, local) =>
{
for (int i = range.Item1; i < range.Item2; i++)
{
double x = (i + 0.5) * step;
local += 4.0 / (1.0 + x * x);
}
return local;
}, local =>
{
while (Interlocked.Increment(ref monitor) != 1)
{ }
sum += local;
Interlocked.Decrement(ref monitor);
});
return step * sum;
}
}
}
程序运行结果:
测试后发现:使用Parallel和Partitioner以及Interlocked 在超大循环中性能最佳