在单个线程上处理来自多个线程的请求 - .NET Core

问题描述:

如果此问题已经有答案,但是如果有答案,我无法在此网站上找到答案。在单个线程上处理来自多个线程的请求 - .NET Core

首先 - 这个问题是特定于.NET核心(V1.1.0在写作时)

我有一个第三方组件,该组件将只处理向它提出的请求,如果他们在同一线程上发起。此程序集是RabbitMQ库和问题的详细信息,可能与can be found here相关或可能不相关。基本上 - 我有多个线程可能会调用这个程序集 - 但是如果请求来自不同的线程 - 则会引发异常。

所以 - 为了解决这个问题,我试图创建一个被阻塞的线程,因此不会过期 - 并且必须以某种方式在此线程上处理对此程序集的所有调用。

我的第一次尝试是创建一个事件并订阅被阻止的线程上的事件。那么任何其他线程都会开始调用这个事件,我认为可能会在正确的线程中找到这个事件,这样我就可以实现我在单线程上处理第三方程序集请求的愿望。

我现在(痛苦左右)了解到it is not possible in .NET core to begin-invoke an event in .Net core :(

例子演示了此问题:

public class Program 
{ 
    public static void Main(string[] args) 
    { 
     Program program = new Program(); 
     ManualResetEvent resetEvent = new ManualResetEvent(false); 
     program.StartNewThreadToBeCalled(resetEvent); 

     program.CallBobMultipleTimes(resetEvent); 

     Console.ReadLine(); 
    } 

    private void CallBobMultipleTimes(ManualResetEvent resetEvent) 
    { 
     resetEvent.WaitOne(); 
     for(int i=0 ; i<100 ; i++) 
      ThreadPool.QueueUserWorkItem(x=>CallBob(null, null)); //Can't BeginInvoke in .NET Core 
    } 

    private void StartNewThreadToBeCalled(ManualResetEvent resetEvent) 
    { 
     ThreadPool.QueueUserWorkItem(x=> 
     { 
      Bob bob = new Bob(); 

      CallBob += (obj, e)=> bob.InvokeMeOnOneThreadOnly(); 
      resetEvent.Set();     
      ManualResetEvent mre = new ManualResetEvent(false); 
      mre.WaitOne(); //Real implementation will block forever - this should be the only thread handles the processing of requests. 
     }); 
    } 

    public event EventHandler CallBob; 
} 

public class Bob 
{ 
    private List<int> ThreadIdentifiers = new List<int>(); 
    private static object SyncObject = new object(); 


    public void InvokeMeOnOneThreadOnly() 
    { 
     lock(SyncObject) 
     { 
      int currentThreadId = Thread.CurrentThread.ManagedThreadId; 
      if(ThreadIdentifiers.Any()) 
      { 
       if(!ThreadIdentifiers.Contains(currentThreadId)) 
        Console.WriteLine("Don't call me from multiple threads!"); 
      } 
      else 
       ThreadIdentifiers.Add(currentThreadId); 
     } 
    } 
} 

我已经通过创建围绕concurrentQueue该通知什么时候加一个包装了这个工作然后我在我的特殊第三方组装线程上处理这个事件,并从这个队列中挑选请求,直到它被用尽......不知道为什么这是在另一种方式不行的时候工作?!

有没有比我找到的更好的方法来处理来自多个不同线程在.NET核心中的单个线程上的多个请求?

+0

由于队列是标准的实现消费者生产模式不是很奇怪ÿ你有工作......没有办法知道为什么“其他方式”不起作用,因为没有对“另一种方式”的明确解释(“通过创建代表”不是一种)。如果你需要一个答案,你需要澄清你以“其他方式”尝试了什么,或者解释你想以什么方式改进队列解决方案 - 在当前状态下,帖子感觉过于宽泛。 –

+0

@Alexei当我发布时,我即将走出工作 - 所以没有时间获得更多更详细的帖子,我会在稍后获得空闲时间时尝试使用代码示例进行扩展。 – Jay

+0

@AlexeiLevenkov我已经更新了一个代码片段,它解释了我遇到的问题。忘记其他我已经尝试过的东西,现在问题说明了我的原始问题,一个回购,它归结了我关于原始问题的代码并解释了我所做的工作。我的问题真的是 - 做这项工作的正确方法是什么?我对并发队列做的事情感觉很糟糕。 – Jay

这很难说出你想从代码中做什么。但是,另一种方法是使用BlockingCollection<T>。后台线程可能值添加到集合和一个单独的线程可以坐下,并尝试获取值从集合中,例如:

while(continueProcessingCollection) 
{ 
    if(blockingCollection.TryTake(out value, TimeSpan.FromSeconds(myTimeout))) 
    { 
     // process value, decide to toggle continueProcessingCollection 
    } 
} 

后台线程可以添加新值阻塞集合:

blockingCollection.Add(newValue); 

而且,地方你需要创建blockingCollection变量:

var blockingCollection = new BlockingCollection<MyType>(); 
+0

我刚刚在MSDN上看过这个,所以我的理解(现在)是,如果你调用TryTake(),它会阻塞,直到有东西被添加到集合中?那么当有东西被添加时,它会解锁并且代码会继续运行? – Jay

+0

@jay如果集合中有东西,它将返回true;如果超时,则返回false。你需要知道停止循环的条件(超时,处理的事物的数量等等)。但是,只要你的超时(我用我的例子1秒)就会阻塞。 –

+0

然后这正是我需要的 - 比我的hacky包装并发队列好得多。谢谢! – Jay

delegateBeginInvoke使用threadpool调度callback这不会解决你的问题。对于多个生产者和一个消费者来说,这基本上是一个Producer–consumer问题。 .NET核心包括some structures实现 IProducerConsumerCollection<T>你可以用来解决这个问题。

不要与Control.BeginInvoke(winform)和Dispatcher.BeginInvoke(WPF)混淆,它会在GUI线程上对回调进行排队。

+0

我会看看这种模式,因为我没有意识到这有一个.net核心接口。谢谢,我会让你知道我是如何得到的 – Jay