ESFramework介绍之(28)―― Udp组件

ESFramework对Tcp和Udp协议都提供了完整的支持,在ESFramework介绍之(21)-- Tcp组件接口ITcp介绍ESFramework介绍之(23)―― AgileTcp 两篇文章中介绍了Tcp组件,相对于Tcp来说,Udp要简单许多,所以我在这里打算用一篇文章来介绍它。需要首先提出的是,ESFramework.Network命名空间下的所有直接类(即,不包括ESFramework.Network.Tcp和ESFramework.Network.Udp命名空间下的类)即可用于Tcp也可用于Udp。这中可复用性,将Tcp和Udp的区别仅仅隔离在最底层的通信组件中。

由于Udp组件相对简单,所以其实现策略与Tcp组件稍有不同,最主要的不同之处在于,Udp组件不需要分配器组件,而是直接将接收到消息转交给处理器工厂。大家应该还记得这个图:
ESFramework介绍之(28)―― Udp组件

当将其应用于Udp时,我们可以认为,消息分派器组件已经直接内嵌于Udp组件中了,所以上图对于Udp还是适用的。
下面我们首先看看Udp组件的接口定义IEsbUdp:
ESFramework介绍之(28)―― Udp组件
对应的代码如下:
publicinterfaceIEsbUdp:IEsbUdpSender
{
intPort{set;}
IContractHelperContractHelper{
set;}
INetMessageHookNetMessageHook{
set;}
IDataDealerFactory DataDealerFactory{
set;}

voidInitialize();
voidStart();
voidStop();

eventCbInvalidMsgInvalidMsgRecieved;
eventCbServiceCommittedServiceCommitted;
}

publicdelegatevoidCbServiceCommitted(stringuserID,NetMessagemsg);
publicdelegatevoidCbInvalidMsg(byte[]data,IPEndPointremoteIPE);

publicinterfaceIEsbUdpSender
{
voidHookAndSendNetMessage(IPEndPointremoteIpe,NetMessagemsg);
}


对于IEsbUdp接口有如下几点说明:
(1)同Tcp组件一样,Udp组件仍然可以使用Hook,由NetMessageHook属性体现。
(2)IEsbUdp有DataDealerFactory属性注入,而没有依赖与分派器组件,这个上面已经提到了。
(3)由于Udp不保证数据传输的正确、完整性,所以当接收到一个无效的消息时会触发InvalidMsgRecieved事件。而ServiceCommitted事件的含义与Tcp组件是完全一致的。
(4)IEsbUdp接口继承了IEsbUdpSender接口,IEsbUdpSender与Tcp组件中的IHookSender接口的目的是一致的。它用于保证在udp组件之外通过udp组件(有点绕口)发送消息时都能经过Hook链而不会有漏网之Message。

通过接口我们了解了Udp组件的职责后,我们就可以实现这个组件了,ESFramework中默认的IEsbUdp接口的实现是EsbUdp,它通过一个循环的接收线程来专门接收消息,这个循环中的流程如下:
(1)接收一个Udp消息。
(2)检查这个消息的消息头的完整/正确性,如果不正确、则触发InvalidMsgRecieved事件。否则,进入下一步。
(3)解析消息头,并从中获取消息主体长度,然后判断消息主体是否完整,如果不完整、则触发InvalidMsgRecieved事件。否则,进入下一步。
(4)调用Hook链CaptureRecievedMsg。
(5)通过异步的方式,将消息的具体处理工作放到后台线程池的某个线程中处理。
(6)回到第一步,继续接收下个消息。

之所以第五步将消息放到后台线程中处理,是为了避免阻塞接收线程,因为处理消息可能需要大量的cpu时间。我们继续看后台线程处理消息的流程:
(1)根据消息类型从处理器工厂获取对应的消息处理器。
(2)调用消息处理器处理消息
(3)调用Hook链CaptureBeforeSendMsg
(4)发送回复消息。
(5)触发ServiceCommitted事件。

上面两个就是Udp组件的核心流程了,由于Udp是基于非连接的,所以不需要在Udp组件中进行像Tcp那样复杂的连接管理,当然,Udp协议仍然可以进行“伪在线”用户管理,这通过另外一个组件IUserManager做到,这个组件将在后面的文章中介绍。
最后,给出EsbUdp的实现源码:

ESFramework介绍之(28)―― Udp组件ESFramework介绍之(28)―― Udp组件EsbUdp
<!--<br><br>Code highlighting produced by Actipro CodeHighlighter (freeware)<br>http://www.CodeHighlighter.com/<br><br>-->publicclassEsbUdp:IEsbUdp
{
privateUdpClientudpClient;
privateintport=10000;
privateIContractHelpercontractHelper;
privateINetMessageHooknetMessageHook=newEmptyNetMessageHook();
privateIDataDealerFactorydataDealerFactory;

privatevolatileboolgoToStop=true;
privateManualResetEventstopEvent=newManualResetEvent(false);
publiceventCbServiceCommittedServiceCommitted;

publicEsbUdp()
{
this.netMessageHook=newEmptyNetMessageHook();
}

publicvoidInitialize()
{
this.udpClient=newUdpClient(this.port);
}

#regionIEsbUdp成员
publiceventCbInvalidMsgInvalidMsgRecieved;

#regionproperty
publicintPort
{
set
{
this.port=value;
}
}

publicIContractHelperContractHelper
{
set
{
this.contractHelper=value;
}
}

publicIDataDealerFactoryDataDealerFactory
{
set
{
this.dataDealerFactory=value;
}
}

publicINetMessageHookNetMessageHook
{
set
{
if(value!=null)
{
this.netMessageHook=value;
}
else
{
this.netMessageHook=newEmptyNetMessageHook();
}
}
}
#endregion

#regionMethod
publicvoidStart()
{
this.goToStop=false;
this.stopEvent.Reset();
CbSimplecb
=newCbSimple(this.Worker);
cb.BeginInvoke(
null,null);
}

publicvoidStop()
{
this.goToStop=true;
//发送eof消息给自己,使Receive不再阻塞
byte[]eof=newbyte[4];
this.udpClient.Send(eof,4,newIPEndPoint(IPAddress.Parse("127.0.0.1"),this.port));

this.stopEvent.WaitOne();
}

publicvoidHookAndSendNetMessage(IPEndPointremoteIpe,NetMessagemsg)
{
byte[]data=this.netMessageHook.CaptureBeforeSendMsg(msg).ToStream();
this.udpClient.Send(data,data.Length,remoteIpe);
}

#regionWorker
privatevoidWorker()
{
while(!this.goToStop)
{
IPEndPointremoteIPE
=null;
byte[]data=this.udpClient.Receive(refremoteIPE);
if(data.Length<this.contractHelper.MessageHeaderLength)
{
this.ActivateInvalidMsg(data,remoteIPE);
continue;
}

IMessageHeaderheader
=this.contractHelper.ParseMessageHeader(data,0);
if(((this.contractHelper.MessageHeaderLength+header.MessageBodyLength)>data.Length)||(!this.contractHelper.ValidateMessageToken(header)))
{
this.ActivateInvalidMsg(data,remoteIPE);
continue;
}

byte[]body=null;
if(header.MessageBodyLength>0)
{
body
=newbyte[header.MessageBodyLength];
for(inti=0;i<body.Length;i++)
{
body[i]
=data[this.contractHelper.MessageHeaderLength+i];
}
}

NetMessagemsg
=newNetMessage(header,body);
msg.Tag
=remoteIPE;
NetMessagehookedMsg
=this.netMessageHook.CaptureRecievedMsg(msg);

if(this.goToStop)
{
break;
}

CbMessageDealingcb
=newCbMessageDealing(this.MessageDealing);
cb.BeginInvoke(hookedMsg,
null,null);
}

this.stopEvent.Set();
}

//异步处理消息
privatevoidMessageDealing(NetMessagemsg)
{
IDataDealerdealer
=this.dataDealerFactory.CreateDealer(msg.Header.ServiceKey,msg.Header.TypeKey);
NetMessageresMsg
=dealer.DealRequestMessage(msg);

if(resMsg!=null)
{
NetMessagehookedResMsg
=this.netMessageHook.CaptureBeforeSendMsg(resMsg);
byte[]bRes=hookedResMsg.ToStream();
this.udpClient.Send(bRes,bRes.Length,(IPEndPoint)msg.Tag);
if(this.ServiceCommitted!=null)
{
this.ServiceCommitted(resMsg.Header.UserID,hookedResMsg);
}
}
}

privatevoidActivateInvalidMsg(byte[]data,IPEndPointremoteIPE)
{
if(this.InvalidMsgRecieved!=null)
{
this.InvalidMsgRecieved(data,remoteIPE);
}
}
#endregion
#endregion

#endregion

}

publicdelegatevoidCbMessageDealing(NetMessagemsg);

关于ESFramework对Udp的支持,还有几个重要的主题需要介绍,除了上面提到的IUserManager组件,还有IUdpServerAgent组件(还记得ESFramework介绍之(7)-- 服务端代理IServerAgent),另外还有非常重要的一块--基于Udp的NAPT穿透。如果要了解这些内容,请继续关注本系列的文章。


上一篇:ESFramework介绍之(27)-- 支持OverdueMessage

转到:ESFramework 可复用的通信框架(序)