如何在ASP.NET Web应用程序中的IHttpHandler上实现队列?
问题描述:
我有一个ASP.NET网络应用程序,它有一个IHttpHandler
来处理POST
请求(基本上,它将处理JSON,然后将处理结果发送回客户端)。如何在ASP.NET Web应用程序中的IHttpHandler上实现队列?
我想在我的应用程序中实现一个队列系统,这样如果有模拟的post请求,它们不会并行处理,而是一个接一个地处理。
问题:
- 什么是最好的方式实现的?
- 在应用程序中,我应该声明
Queue
对象? - 每次向队列添加项目 时,如何附加特定功能(将执行处理的功能)?
我无法在Page_Load()
事件中声明队列,因为当网页打开时不会发送请求。我需要队列始终等待传入的POST请求。如果我在IHttpHandler中声明队列,每当POST请求到来时,队列就会重置。
谢谢!
答
实现此目的的一种方法是改为使用IHttpAsyncHandler和单例服务类。工作线程等待添加到ConcurrentQueue的BlockingCollection的新项目。这将启用传入请求的串行处理。需要在处理第一个请求之前启动服务类中的工作线程,可以从全局Application_Start例程或通过向AssembleInfo.cs文件添加PreApplicationStartMethod来调用启动方法。
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Web;
namespace webgateway
{
public class RequestHandler : IHttpAsyncHandler, IDisposable
{
public IAsyncResult BeginProcessRequest(HttpContext context, AsyncCallback cb, object extraData)
{
RequestContext requestContext = new RequestContext(cb, context, extraData);
Service.Instance.QueueRequest(requestContext);
return requestContext;
}
//Start could be called from the Global asax or by adding this attribute to the Assembly.cs
//[assembly: PreApplicationStartMethod(typeof(webgateway.RequestHandler), "Start")]
public static void Start()
{
Service.Instance.Start();
}
public void EndProcessRequest(IAsyncResult result) {
}
public bool IsReusable { get { return true; } }
public void ProcessRequest(HttpContext context) { }
public void Dispose() { }
}
}
public sealed class Service
{
private static readonly Service _Instance = new Service();
public static Service Instance
{
get { return _Instance; }
}
private Service()
{
}
private static bool _running = false;
private BlockingCollection<RequestContext> blockingQueue = new BlockingCollection<RequestContext>(new ConcurrentQueue<RequestContext>());
public void Start()
{
_running = true;
ThreadPool.QueueUserWorkItem(worker, null);
}
private void worker(object state)
{
RequestContext requestContext;
while (_running)
{
//Block until a new item is added to the queue
if (blockingQueue.TryTake(out requestContext, 10000))
{
//You could delegate the work to another function , class , library or process inline here...
//Simulate a random delay
Thread.Sleep((new Random()).Next(1000, 5000));
//Make sure the client is connected before sending the response
if (requestContext.HttpContext.Response.IsClientConnected)
{
requestContext.HttpContext.Response.BufferOutput = false;
requestContext.HttpContext.Response.ContentType = "text/plain";
requestContext.HttpContext.Response.Write(requestContext.HttpContext.Request["echo"]);
requestContext.HttpContext.Response.Flush();
requestContext.CompleteCall();
}
}
}
}
public void Stop()
{
_running = false;
}
public void QueueRequest(RequestContext requestContext)
{
if (!blockingQueue.TryAdd(requestContext))
{
//handle error
}
}
}
public class RequestContext : IAsyncResult
{
private ManualResetEvent _event;
private object _lock = new Object();
private AsyncCallback _callback;
private HttpContext _httpContext;
private bool _completed;
private bool _completedSynchronously;
private object _state;
public RequestContext(AsyncCallback cb, HttpContext hc, object state)
{
_callback = cb;
_httpContext = hc;
_completedSynchronously = false;
_completed = false;
_state = state;
}
public HttpContext HttpContext
{
get { return _httpContext; }
}
public void CompleteCall()
{
lock (_lock)
{
_completed = true;
if (_event != null)
{
_event.Set();
}
}
_callback?.Invoke(this);
}
public bool IsCompleted
{
get { return _completed; }
}
public bool CompletedSynchronously
{
get { return _completedSynchronously; }
}
public object AsyncState
{
get { return _state; }
}
public WaitHandle AsyncWaitHandle
{
get
{
lock (_lock)
{
if (_event == null)
{
_event = new ManualResetEvent(IsCompleted);
}
return _event;
}
}
}
}