SuperSocket笔记
https://note.youdao.com/share/?id=eec22facef3d915b5efb258a715ad882&type=note#/
SuperSocket是一款优秀的SOCKET框架,功能强大,以下是我的学习笔记。
一、官网
主页: http://www.supersocket.net
官网文档: http://docs.supersocket.net
二、源码浅析
Socket通讯不管怎么封装,其核心必然是Listen、Accept、Receive、Send等关键,因此从最底层开始往上抽丝剥茧,解读SuperSocket的逻辑。
项目SuperSocket.SocketEngine.Net45 实现对Socket链接、收发数据的封装。
项目SuperSocket.SocketBase.Net45实现对AppServer AppSession等业务逻辑的抽象封装。
整个SuperSocket是基于.net SocketAsyncEventArgs实现的,相关介绍请参考:http://www.cnblogs.com/smark/p/3573107.html
http://blog.****.net/zhangjunjian127/article/details/7067797
1、SocketServerBase:一个SocketServer可以监听多个端口
public SocketServerBase(IAppServer appServer, ListenerInfo[] listeners)
{....} //ListenerInfo[]表示多个端口
public virtual bool Start()
{
......
//创建多个监听
for (var i = 0; i < ListenerInfos.Length; i++)
{
var listener = CreateListener(ListenerInfos[i]); //创建一个Socket监听
listener.Error += new ErrorHandler(OnListenerError);
listener.Stopped += new EventHandler(OnListenerStopped);
listener.NewClientAccepted += new NewClientAcceptHandler(OnNewClientAccepted); //OnNewClientAccepted是抽象方法,在AsyncSocketServer中实现;
if (listener.Start(AppServer.Config))
{
Listeners.Add(listener);
........
}
}
......
}
//子类TcpSocketServerBase实现抽象方法创建监听
protected override ISocketListener CreateListener(ListenerInfo listenerInfo)
{
return new TcpAsyncSocketListener(listenerInfo);
}
2、SocketListenerBase,抽象类,实现接口ISocketListener,定义基础的start stop error等事件;TcpAsyncSocketListener ,SocketListenerBase子类,封装了socket监听:
public override bool Start(IServerConfig config)
{
m_ListenSocket = new Socket(this.Info.EndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp); //创建socket;
......
ProcessAccept(acceptEventArg);//处理accept
}
void ProcessAccept(SocketAsyncEventArgs e)
{
......
if (socket != null)
OnNewClientAccepted(socket, null);//触发OnNewClientAccepted,通过这个事件,又回到了AsyncSocketServer.OnNewClientAccepted
........
}
3. AsyncSocketServer , TcpSocketServerBase的子类,是获取新连接,监听收数据的关键
public override bool Start()
{
m_BufferManager = new BufferManager(bufferSize * AppServer.Config.MaxConnectionNumber, bufferSize); //预生成了一块缓冲区,根据ServerConfig的最大链接数预生成
//预创建MaxConnectionNumber个SocketAsyncEventArgsProxy
var socketArgsProxyList = new List<SocketAsyncEventArgsProxy>(AppServer.Config.MaxConnectionNumber);
for (int i = 0; i < AppServer.Config.MaxConnectionNumber; i++)
{
//Pre-allocate a set of reusable SocketAsyncEventArgs
socketEventArg = new SocketAsyncEventArgs();
m_BufferManager.SetBuffer(socketEventArg);
socketArgsProxyList.Add(new SocketAsyncEventArgsProxy(socketEventArg));
}
//创建了读写时的队列,基于ConcurrentStack,是.net 并行框架的一个库,是无锁的,提高效率;参考资料:http://www.cnblogs.com/zw369/p/3990908.html
m_ReadWritePool = new ConcurrentStack<SocketAsyncEventArgsProxy>(socketArgsProxyList);
......
}
//接着分析NewClientAccepted事件
protected override void OnNewClientAccepted(ISocketListener listener, Socket client, object state)
{
........
ProcessNewClient(client, listener.Info.Security);
}
//处理新的Seesion接入(新的客户端Socket接入)
private IAppSession ProcessNewClient(Socket client, SslProtocols security)
{
if (!m_ReadWritePool.TryPop(out socketEventArgsProxy)).....
//AsyncSocketSession封装了处理某一个client Socket数据收发的类
socketSession = new AsyncSocketSession(client, socketEventArgsProxy);
//创建了一个AppSession对象,CreateSession在TcpSocketServerBase中实现;主要是对clientSocket进行设置缓冲区大小,超时时间等;
AppSession代表一个和客户端的逻辑连接,官方说明:http://docs.supersocket.net/v1-6/zh-CN/Implement-your-AppServer-and-AppSession
通过AppSession,将AsyncSocketServer 与AppServer关联起来了;
最终是调用了AppServerBase.CreateAppSession,并执行了ExecuteConnectionFilters,ConnectionFilter的目的是对客户端的请求进行验证,更多ConnectionFilter参考官网说明:http://docs.supersocket.net/v1-6/zh-CN/Connection-Filter;
返回的AppSession是什么类型?后面继续讲
var session = CreateSession(client, socketSession);
......
//AsyncStreamSocketSession继承了INegotiateSocketSession,用于处理SSL加密;
var negotiateSession = socketSession as INegotiateSocketSession;
if (negotiateSession == null) //一般情况是AsyncSocketSession
{
//AppServer注册了session的Closed事件并绑到了SessionClosed事件上,同时触发了NewSessionConnected事件;
if (RegisterSession(session))
{
AppServer.AsyncRun(() => socketSession.Start());//异步启动接收数据;这里是最重点的部分;
}
return session;
}
negotiateSession.NegotiateCompleted += OnSocketSessionNegotiateCompleted;
negotiateSession.Negotiate();
}
从上面部分代码的解读,可以清楚了解到SuperSocket是如何监听一个端口的,如何响应一个客户端Socket的链接请求的(NewSessionConnected),以及如何响应客户端Socket的关闭事件的(SessionClosed)。
4、接下来解读如何处理收数据、解析数据
AsyncSocketSession:
//开始接收数据
public override void Start()
{
StartReceive(SocketAsyncProxy.SocketEventArgs);
if (!m_IsReset)
StartSession();
}
private void StartReceive(SocketAsyncEventArgs e, int offsetDelta)
{......
// the connection is closing or closed
if (!OnReceiveStarted())
return;
willRaiseEvent = Client.ReceiveAsync(e);
......
ProcessReceive(e)
}
public void ProcessReceive(SocketAsyncEventArgs e)
{
......
//由AppSession进行进一步的数据处理;
offsetDelta = this.AppSession.ProcessRequest(e.Buffer, e.Offset, e.BytesTransferred, true);
......
//read the next block of data sent from the client
StartReceive(e, offsetDelta);
}
AppSession :
int IAppSession.ProcessRequest(byte[] readBuffer, int offset, int length, bool toBeCopied)
{
int rest, offsetDelta;
while (true)
{
//解析数据,FilterRequest的解读在下一段;
var requestInfo = FilterRequest(readBuffer, offset, length, toBeCopied, out rest, out offsetDelta);
if (requestInfo != null)
{
try
{
//执行指令,将解析出来的数据进一步进行业务逻辑处理;解读在后面...
AppServer.ExecuteCommand(this, requestInfo);
}
........
}
if (rest <= 0)
{
return offsetDelta;
}
//Still have data has not been processed
offset = offset + length - rest;
length = rest;
}//循环进行数据解析,直到缓冲区的数据全部解析完,目的是为了防止有粘包...
}
////解析数据
TRequestInfo FilterRequest(byte[] readBuffer, int offset, int length, bool toBeCopied, out int rest, out int offsetDelta)
{
//触发AppServer的RawDataReceived事件;从这里可以看出来,RawDataReceived是原始的数据包,如果返回false则不会继续往下执行。
if (!AppServer.OnRawDataReceived(this, readBuffer, offset, length))
{
rest = 0;
offsetDelta = 0;
return null;
}
var currentRequestLength = m_ReceiveFilter.LeftBufferSize;
//调用ReceiveFilter处理接收到的数据,ReceiveFilter的官网说明:http://docs.supersocket.net/v1-6/zh-CN/Implement-Your-Own-Communication-Protocol-with-IRequestInfo,-IReceiveFilter-and-etc
官方内置了一些常用的Filter,http://docs.supersocket.net/v1-6/zh-CN/The-Built-in-Common-Format-Protocol-Implementation-Templates
利用自定义的Filter可以实现自己业务数据的解析;
var requestInfo = m_ReceiveFilter.Filter(readBuffer, offset, length, toBeCopied, out rest);
if (m_ReceiveFilter.State == FilterState.Error)
{
rest = 0;
offsetDelta = 0;
Close(CloseReason.ProtocolError);
return null;
}
var offsetAdapter = m_ReceiveFilter as IOffsetAdapter;
offsetDelta = offsetAdapter != null ? offsetAdapter.OffsetDelta : 0;
.......
return requestInfo;
}
//执行指令。AppServerBase类中。
protected virtual void ExecuteCommand(TAppSession session, TRequestInfo requestInfo)
{
if (m_RequestHandler == null)
{
//从命令加载器获取对应命令,参考官网说明:http://docs.supersocket.net/v1-6/zh-CN/Command-and-Command-Loader
var commandProxy = GetCommandByName(requestInfo.Key);
if (commandProxy != null)
{
var command = commandProxy.Command;
var commandFilters = commandProxy.Filters;
session.CurrentCommand = requestInfo.Key;
var cancelled = false;
if (commandFilters == null)
{
//执行命令
command.ExecuteCommand(session, requestInfo);
}
else
{
//如果有命令过滤器,参考官网说明:http://docs.supersocket.net/v1-6/zh-CN/Command-Filter,命令过滤器的作用是可以在执行前,执行后进行进一步的业务处理,用Attribute实现的,类似一种AOP的作用;
var commandContext = new CommandExecutingContext();
commandContext.Initialize(session, requestInfo, command);
for (var i = 0; i < commandFilters.Length; i++)
{
var filter = commandFilters[i];
//命令执行前
filter.OnCommandExecuting(commandContext);
if (commandContext.Cancel)
{
cancelled = true;
if(Logger.IsInfoEnabled)
Logger.Info(session, string.Format("The executing of the command {0} was cancelled by the command filter {1}.", command.Name, filter.GetType().ToString()));
break;
}
}
if (!cancelled)
{
try
{
//执行命令
command.ExecuteCommand(session, requestInfo);
}
catch (Exception exc)
{
commandContext.Exception = exc;
}
for (var i = 0; i < commandFilters.Length; i++)
{
var filter = commandFilters[i];
//命令执行后
filter.OnCommandExecuted(commandContext);
}
if (commandContext.Exception != null && !commandContext.ExceptionHandled)
{
try
{
session.InternalHandleExcetion(commandContext.Exception);
}
catch
{
}
}
}
}
if(!cancelled)
{
session.PrevCommand = requestInfo.Key;
if (Config.LogCommand && Logger.IsInfoEnabled)
Logger.Info(session, string.Format("Command - {0}", requestInfo.Key));
}
}
else
{
session.InternalHandleUnknownRequest(requestInfo);
}
session.LastActiveTime = DateTime.Now;
}
else
{
session.CurrentCommand = requestInfo.Key;
try
{
//触发NewRequestReceived事件,由事件响应处理指令;
m_RequestHandler(session, requestInfo);
}
catch (Exception e)
{
session.InternalHandleExcetion(e);
}
......
}
.......
}
5 发数据
官网资料:http://docs.supersocket.net/v1-6/zh-CN/Push-Data-to-Clients-from-Server-Initiatively
发数据逻辑相对要简单多了,通过AppSession发送数据,AppSession通过内部的SocketSession发送数据。
三、小结
通过上述代码解读后,对SuperSocket的结构有了一个清晰的概念,对照官网这张结构图,会更直观:
但是从代码中可以看出来,在Command执行期间,如果存在大运算或CPU密集型计算,仍然有可能造成阻塞,因为指令执行这一部分并没有用到线程池,所以在处理业务逻辑时要着重注意!