Parallel.ForEach使用自定义TaskScheduler来防止OutOfMemoryException

问题描述:

我正在通过Parallel.ForEach处理大小不一的大小不等的PDF(简单的2MB到几百MB的高DPI扫描),并且偶尔会遇到OutOfMemoryException - 可以理解的是由于该进程为32位,并且由Parallel.ForEach产生的线程占用了未知数量的内存消耗工作。Parallel.ForEach使用自定义TaskScheduler来防止OutOfMemoryException

限制MaxDegreeOfParallelism做的工作,但对于倍的吞吐量时,有一个大的(10K +)批量小的PDF一起工作是不够的,因为可能会有更多的线程工作,由于该线程的内存占用小。这是一个CPU繁重的过程,使用Parallel.ForEach轻松达到100%的CPU,然后碰到偶尔出现的一组大型PDF并且发生OutOfMemoryException。运行性能分析器支持这一点。

从我的理解来看,为我的Parallel.ForEach设置分区并不会提高我的性能。

这导致我使用自定义TaskScheduler传递给我的Parallel.ForEach与MemoryFailPoint检查。搜索周围似乎有关于创建自定义对象的信息稀缺。

寻找Specialized Task Schedulers in .NET 4 Parallel Extensions ExtrasA custom TaskScheduler in C#之间,以及各种答案在这里#2,我已经创建了自己的TaskScheduler和有我QueueTask方法,例如:

protected override void QueueTask(Task task) 
{ 
    lock (tasks) tasks.AddLast(task); 
    try 
    { 
     using (MemoryFailPoint memFailPoint = new MemoryFailPoint(600)) 
     { 
      if (runningOrQueuedCount < maxDegreeOfParallelism) 
      { 
       runningOrQueuedCount++; 
       RunTasks(); 
      } 
     } 
    } 
    catch (InsufficientMemoryException e) 
    {  
     // somehow return thread to pool?   
     Console.WriteLine("InsufficientMemoryException"); 
    } 
} 

虽然try/catch语句是这里有点贵我的目标当600MB的可能的最大大小PDF(+额外的内存开销)会抛出一个OutOfMemoryException异常时会被捕获。当我捕获InsufficientMemoryException时,这个解决方案似乎终止了尝试执行该工作的线程。有了足够大的PDF文件,我的代码最终会成为一个单独的线程Parallel.ForEach。

#2发现Parallel.ForEach和OutOfMemoryExceptions其他问题似乎不适合与线程的动态内存使用和经常最大吞吐量的我的使用情况下,只需利用MaxDegreeOfParallelism为静态的解决方案,例如:

所以有变工内存大小最大吞吐量,或者:

  • 如何通过MemoryFailPoint检查拒绝一个线程返回线程池?
  • 如何在有空闲内存的情况下安全地产生新线程以再次启动工作?

编辑: 在磁盘上的PDF大小可能不是线性表示在存储器规模大小,由于光栅化和光栅化图像的操作部件,其是依赖于PDF内容。

使用LimitedConcurrencyLevelTaskSchedulerSamples for Parallel Programming with the .NET Framework我能够做一个小调整,以获得看起来我想要的东西。下面是修改后的LimitedConcurrencyLevelTaskScheduler类的NotifyThreadPoolOfPendingWork方法:

private void NotifyThreadPoolOfPendingWork() 
{ 
    ThreadPool.UnsafeQueueUserWorkItem(_ => 
    { 
     // Note that the current thread is now processing work items. 
     // This is necessary to enable inlining of tasks into this thread. 
     _currentThreadIsProcessingItems = true; 
     try 
     { 
      // Process all available items in the queue. 
      while (true) 
      { 
       Task item; 
       lock (_tasks) 
       { 
        // When there are no more items to be processed, 
        // note that we're done processing, and get out. 
        if (_tasks.Count == 0) 
        { 
         --_delegatesQueuedOrRunning; 
         break; 
        } 

        // Get the next item from the queue 
        item = _tasks.First.Value; 
        _tasks.RemoveFirst(); 
       } 

       // Execute the task we pulled out of the queue 
       //base.TryExecuteTask(item); 

       try 
       { 
        using (MemoryFailPoint memFailPoint = new MemoryFailPoint(650)) 
        { 
         base.TryExecuteTask(item); 
        } 
       } 
       catch (InsufficientMemoryException e) 
       { 
        Thread.Sleep(500); 

        lock (_tasks) 
        { 
         _tasks.AddLast(item); 
        } 
       } 

      } 
     } 
     // We're done processing items on the current thread 
     finally { _currentThreadIsProcessingItems = false; } 
    }, null); 
} 

我们将看看赶上,但在相反的。我们将我们将要工作的任务添加回任务列表(_tasks),该任务触发事件以获取可用线程来获取该工作。但我们先睡当前的线程,以便它不直接拿起工作,回到检查失败的MemoryFailPoint