NET下RabbitMQ实践[实战篇]

之前的文章中,介绍了如何将RabbitMQ以WCF方式进行发布。今天就介绍一下我们产品中如何使用RabbitMQ的!

在Discuz!NT企业版中,提供了对HTTP错误日志的记录功能,这一点对企业版非常重要,另外存储错误日志使用了MongoDB,理由很简单,MongoDB的添加操作飞快,即使数量过亿之后插入速度依旧不减。

在开始正文之前,先说明一下本文的代码分析顺序,即:程序入口==》RabbitMQ客户端===>RabbitMQ服务端。好了,闲话少说,开始正文!

首先是程序入口,也就是WCF+RabbitMQ客户端实现:

因为Discuz!NT使用了HttpModule方式来接管HTTP链接请求,而在.NET的HttpModule模板中,可以通过如下方法来接管程序运行时发生的ERROR,如下:

context.Error+=newEventHandler(Application_OnError);


而“记录错误日志"的功能入口就在这里:

NET下RabbitMQ实践[实战篇]NET下RabbitMQ实践[实战篇]
publicvoidApplication_OnError(Objectsender,EventArgse)
{
stringrequestUrl=DNTRequest.GetUrl();
HttpApplicationapplication
=(HttpApplication)sender;
HttpContextcontext
=application.Context;

#ifEntLib
if(RabbitMQConfigs.GetConfig()!=null&&RabbitMQConfigs.GetConfig().HttpModuleErrLog.Enable)//当开启errlog错误日志记录功能时
{
RabbitMQClientHelper.GetHttpModuleErrLogClient().AsyncAddLog(
newHttpModuleErrLogData(LogLevel.High,context.Server.GetLastError().ToString()));//异步方式
//RabbitMQHelper.GetHttpModuleErrLogClient().AddLog(newHttpModuleErrLogData(LogLevel.High,"wrongmessageinfomation!"));//同步方式
return;
}
#endif
...
}


当然从代码可以看出,记录日志的工作基本是通过配置文件控制的,即“HttpModuleErrLog.Enable”。

而RabbitMQClientHelper是一个封装类,主要用于反射生成IHttpModuleErrlogClient接口实例,该实例就是“基于WCF发布的RabbitMQ”的客户端访问对象。

NET下RabbitMQ实践[实战篇]NET下RabbitMQ实践[实战篇]
///<summary>
///RabbitMQ
///</summary>
publicclassRabbitMQClientHelper
{
staticIHttpModuleErrlogClientihttpModuleErrLogClient;

privatestaticobjectlockHelper=newobject();

publicstaticIHttpModuleErrlogClientGetHttpModuleErrLogClient()
{
if(ihttpModuleErrLogClient==null)
{
lock(lockHelper)
{
if(ihttpModuleErrLogClient==null)
{
try
{
if(RabbitMQConfigs.GetConfig().HttpModuleErrLog.Enable)
{
ihttpModuleErrLogClient
=(IHttpModuleErrlogClient)Activator.CreateInstance(Type.GetType(
"Discuz.EntLib.RabbitMQ.Client.HttpModuleErrLogClient,Discuz.EntLib.RabbitMQ.Client",false,true));
}
}
catch
{
thrownewException("请检查Discuz.EntLib.RabbitMQ.dll文件是否被放置到了bin目录下!");
}
}
}
}
returnihttpModuleErrLogClient;
}
}


可以看出它反射的是Discuz.EntLib.RabbitMQ.dll文件的HttpModuleErrLogClient对象(注:使用反射的原因主要是解决企业版代码与普遍版代码在项目引用上的相互依赖),下面就是其接口和具体要求实现:

NET下RabbitMQ实践[实战篇]NET下RabbitMQ实践[实战篇]
///<summary>
///IHttpModuleErrlogClient客户端接口类,用于反射实例化绑定
///</summary>
publicinterfaceIHttpModuleErrlogClient
{
voidAddLog(HttpModuleErrLogDatahttpModuleErrLogData);

voidAsyncAddLog(HttpModuleErrLogDatahttpModuleErrLogData);
}

publicclassHttpModuleErrLogClient:IHttpModuleErrlogClient
{
publicvoidAddLog(HttpModuleErrLogDatahttpModuleErrLogData)
{
try
{
//((RabbitMQBinding)binding).OneWayOnly=true;
ChannelFactory<IHttpModuleErrLogService>m_factory=newChannelFactory<IHttpModuleErrLogService>(GetBinding(),"soap.amqp:///HttpModuleErrLogService");
m_factory.Open();
IHttpModuleErrLogServicem_client
=m_factory.CreateChannel();
m_client.AddLog(httpModuleErrLogData);
((IClientChannel)m_client).Close();
m_factory.Close();
}
catch(System.Exceptione)
{
stringmsg=e.Message;
}
}

privatedelegatevoiddelegateAddLog(HttpModuleErrLogDatahttpModuleErrLogData);

publicvoidAsyncAddLog(HttpModuleErrLogDatahttpModuleErrLogData)
{
delegateAddLogAddLog_aysncallback
=newdelegateAddLog(AddLog);
AddLog_aysncallback.BeginInvoke(httpModuleErrLogData,
null,null);
}

publicBindingGetBinding()
{
returnnewRabbitMQBinding(RabbitMQConfigs.GetConfig().HttpModuleErrLog.RabbitMQAddress);
}
}


可以看出,AddLog方法与上一篇中的客户端内容基本上没什么太大差别,只不过它提供了同步和异步访问两种方式,这样做的目的主要是用户可根据生产环境来灵活配置。

下面就来看一下RabbitMQ的服务端实现,首先看一下其运行效果,如下图:
NET下RabbitMQ实践[实战篇]


接着看一下启动rabbitmq服务的代码:

NET下RabbitMQ实践[实战篇]NET下RabbitMQ实践[实战篇]
publicvoidStartService(System.ServiceModel.Channels.Bindingbinding)
{
m_host
=newServiceHost(typeof(HttpModuleErrLogService),newUri("soap.amqp:///"));
//((RabbitMQBinding)binding).OneWayOnly=true;
m_host.AddServiceEndpoint(typeof(IHttpModuleErrLogService),binding,"HttpModuleErrLogService");
m_host.Open();
m_serviceStarted
=true;
}


上面代码会添加IHttpModuleErrLogService接口实现类HttpModuleErrLogService 的Endpoint,并启动它,下面就是该接口声明:

NET下RabbitMQ实践[实战篇]NET下RabbitMQ实践[实战篇]
///<summary>
///IHttpModuleErrLogService接口类
///</summary>
[ServiceContract]
publicinterfaceIHttpModuleErrLogService
{
///<summary>
///添加httpModuleErrLogData日志信息
///</summary>
///<paramname="httpModuleErrLogData"></param>
[OperationContract]
voidAddLog(HttpModuleErrLogDatahttpModuleErrLogData);
}



代码很简单,就是定义了一个添加日志的方法:void AddLog(HttpModuleErrLogData httpModuleErrLogData)

下面就是接口的具体实现,首先是类声明及初始化代码:

NET下RabbitMQ实践[实战篇]NET下RabbitMQ实践[实战篇]
[ServiceBehavior(InstanceContextMode=InstanceContextMode.Single)]//Single-为所有客户端调用分配一个服务实例。
publicclassHttpModuleErrLogService:IHttpModuleErrLogService
{
///<summary>
///获取HttpModuleErrLogInfo配置文件对象实例
///</summary>
privatestaticHttpModuleErrLogInfohttpModuleErrorLogInfo=RabbitMQConfigs.GetConfig().HttpModuleErrLog;
///<summary>
///定时器对象
///</summary>
privatestaticSystem.Timers.Timer_timer;
///<summary>
///定时器的时间
///</summary>
privatestaticint_elapsed=0;

publicstaticvoidInitial(System.Windows.Forms.RichTextBoxmsgBox,intelapsed)
{
_msgBox
=msgBox;
_elapsed
=elapsed;

//初始定时器
if(_elapsed>0)
{
_timer
=newSystem.Timers.Timer(){Interval=elapsed*1000,Enabled=true,AutoReset=true};
_timer.Elapsed
+=newSystem.Timers.ElapsedEventHandler(Timer_Elapsed);
_timer.Start();
}
}

///<summary>
///时间到时执行出队操作
///</summary>
///<paramname="sender"></param>
///<paramname="e"></param>
privatestaticvoidTimer_Elapsed(objectsender,System.Timers.ElapsedEventArgse)
{
Dequeue();
}


可以看出,这里使用了静态定时器对象,来进行定时访问队列信息功能(“非同步出队”操作),这样设计的原因主要是为用户提供适合的配置方式,即如果不使用定时器(为0时),则系统会在日志入队后,就立即启动出队(“同步出队”)操作获取日志信息并插入到MongoDB数据库中。
下面介绍一下入队操作实现:

NET下RabbitMQ实践[实战篇]NET下RabbitMQ实践[实战篇]
///<summary>
///添加httpModuleErrLogData日志信息
///</summary>
///<paramname="httpModuleErrLogData"></param>
publicvoidAddLog(HttpModuleErrLogDatahttpModuleErrLogData)
{
Enqueue(httpModuleErrLogData);

if(_elapsed<=0)//如果使用定时器(为0时),则立即执行出队操作
Dequeue();
}

///<summary>
///交换机名称
///</summary>
privateconststringEXCHANGE="ex1";
///<summary>
///交换方法,更多内容参见:http://melin.javaeye.com/blog/691265
///</summary>
privateconststringEXCHANGE_TYPE="direct";
///<summary>
///路由key,更多内容参见:http://sunjun041640.blog.163.com/blog/static/256268322010328102029919/
///</summary>
privateconststringROUTING_KEY="m1";

///<summary>
///日志入队
///</summary>
///<paramname="httpModuleErrLogData"></param>
publicstaticvoidEnqueue(HttpModuleErrLogDatahttpModuleErrLogData)
{
Uriuri
=newUri(httpModuleErrorLogInfo.RabbitMQAddress);
ConnectionFactorycf
=newConnectionFactory()
{
UserName
=httpModuleErrorLogInfo.UserName,
Password
=httpModuleErrorLogInfo.PassWord,
VirtualHost
="dnt_mq",
RequestedHeartbeat
=0,
Endpoint
=newAmqpTcpEndpoint(uri)
};
using(IConnectionconn=cf.CreateConnection())
{
using(IModelch=conn.CreateModel())
{
if(EXCHANGE_TYPE!=null)
{
ch.ExchangeDeclare(EXCHANGE,EXCHANGE_TYPE);
//,true,true,false,false,true,null);
ch.QueueDeclare(httpModuleErrorLogInfo.QueueName,true);//true,true,true,false,false,null);
ch.QueueBind(httpModuleErrorLogInfo.QueueName,EXCHANGE,ROUTING_KEY,false,null);
}
IMapMessageBuilderb
=newMapMessageBuilder(ch);
IDictionarytarget
=b.Headers;
target[
"header"]="HttpErrLog";
IDictionarytargetBody
=b.Body;
targetBody[
"body"]=SerializationHelper.Serialize(httpModuleErrLogData);
((IBasicProperties)b.GetContentHeader()).DeliveryMode
=2;//persistMode
ch.BasicPublish(EXCHANGE,ROUTING_KEY,
(IBasicProperties)b.GetContentHeader(),
b.GetContentBody());
}
}
}


代码很简单,主要构造rabbitmq链接(ConnectionFactory)并初始化相应参数如用户名,密码,ROUTING_KEY等。

然后将传入的日志对象序列化成字符串对象,赋值给targetBody["body"],这样做主要是因为我没找到更好的方法来赋值(之前尝试直接绑定httpModuleErrLogData到targetBody["body"],但在出队操作中找不到合适方法将httpModuleErrLogData对象解析出来)。

下面就是出队操作:

NET下RabbitMQ实践[实战篇]NET下RabbitMQ实践[实战篇]
///<summary>
///日志出队
///</summary>
publicstaticvoidDequeue()
{
stringserverAddress=httpModuleErrorLogInfo.RabbitMQAddress.Replace("amqp://","").TrimEnd('/');//"10.0.4.85:5672";
ConnectionFactorycf=newConnectionFactory()
{
UserName
=httpModuleErrorLogInfo.UserName,
Password
=httpModuleErrorLogInfo.PassWord,
VirtualHost
="dnt_mq",
RequestedHeartbeat
=0,
Address
=serverAddress
};

using(IConnectionconn=cf.CreateConnection())
{
using(IModelch=conn.CreateModel())
{
while(true)
{
BasicGetResultres
=ch.BasicGet(httpModuleErrorLogInfo.QueueName,false);
if(res!=null)
{
try
{
stringobjstr=System.Text.UTF8Encoding.UTF8.GetString(res.Body).Replace("\0\0\0body\0\n","");//去掉头部信息
objectobj=SerializationHelper.DeSerialize(typeof(HttpModuleErrLogData),objstr);
HttpModuleErrLogDatahttpModuleErrLogData
=objasHttpModuleErrLogData;
if(httpModuleErrLogData!=null)
{
MongoDbHelper.Insert(
newMongo(httpModuleErrorLogInfo.MongoDB),"dnt_httpmoduleerrlog",LoadAttachment(httpModuleErrLogData));
_msgBox.BeginInvoke(
newShowMsg(SetMsgRichBox),"\r发生时间:"+httpModuleErrLogData.TimeStamp+"\r错误等级:"+httpModuleErrLogData.Level+"\r详细信息:"+httpModuleErrLogData.Message);
ch.BasicAck(res.DeliveryTag,
false);
}
}
catch{}
}
else
break;
}
}
}
}


出队操作也是先实例化链接到rabbitmq 的实例,并循环使用BasicGet方法来单条获取队列信息,并最终将res.Body的数据序列化成HttpModuleErrLogData对象,并最终插入到mongodb数据库中。同时将获取的队列消息显示出来:

_msgBox.BeginInvoke(newShowMsg(SetMsgRichBox),"\r发生时间:"+httpModuleErrLogData.TimeStamp+"\r错误等级:"+httpModuleErrLogData.Level+"\r详细信息:"+httpModuleErrLogData.Message);


这里使用异步方式显示出队的日志信息,其声明的delegate 方法“ShowMsg”如下:

NET下RabbitMQ实践[实战篇]NET下RabbitMQ实践[实战篇]
///<summary>
///声明委托
///</summary>
///<paramname="message"></param>
publicdelegatevoidShowMsg(stringmessage);
///<summary>
///绑定到上面delegate的方法
///</summary>
///<paramname="outPut"></param>
publicstaticvoidSetMsgRichBox(stringoutPut)
{
_msgBox.Text
+="\r==================================\r下列错误信息出队时间=>"+DateTime.Now+outPut+"\r";
}


同时使用LoadAttachment方法来实现HttpModuleErrLogData到mongodb的Document类型的转换:

NET下RabbitMQ实践[实战篇]NET下RabbitMQ实践[实战篇]
///<summary>
///将HttpModuleErrLogData转换成Document类型
///</summary>
///<paramname="httpModuleErrLogData"></param>
///<returns></returns>
publicstaticDocumentLoadAttachment(HttpModuleErrLogDatahttpModuleErrLogData)
{
Documentdoc
=newDocument();
doc[
"_id"]=httpModuleErrLogData.Oid;
doc[
"level"]=httpModuleErrLogData.Level;
doc[
"message"]=httpModuleErrLogData.Message;
doc[
"timestamp"]=httpModuleErrLogData.TimeStamp;
returndoc;
}



到这里,主要的功能介绍就差不多了。当然本文所阐述的只是一个原型,相信会随着对rabbitmq的理解深入而不断完善,感兴趣的朋友欢迎讨论交流,以纠正我认识上的偏差,呵呵。

原文链接:http://www.cnblogs.com/daizhj/archive/2010/10/25/1860442.html
Tags:discuz!nt,Rabbitmq,NET,mongodb
BLOG: http://daizhj.cnblogs.com/
作者:daizhj,代震军