MapGuide源码分析----MapGuide服务器源码分析
同样,在介绍MapGuide 服务器如何处理枚举资源操作之前,让我们首先来看看MapGuide 服务器用于处理服务请求和操作的类,图19‑9显示服务请求处理器类的类图,图19‑10显示了操作处理器类的类图。
图 19‑10 服务请求处理器类的类图
图 19‑11 操作处理器类的类图
MapGuide提供了资源服务、要素服务等多种服务,每种服务包含了大量操作。MapGuide为每种服务提供了一个服务请求处理器类,用于处理这些服务所提供的操作,例如MgResourceServiceHandler、MgFeatureServiceHanlder等。这些类都继承自IMgServiceHandler,并且实现了方法IMgServiceHandler::ProcessOperation(...)。MapGuide使用工厂类MgServiceHanlderFactory来创建一个服务请求处理器类的实例,给定一个服务ID,调用方法MgServiceHandlerFactory::GetHandler(...)可以创建对应的服务请求处理器类的实例。
对于服务中的每种操作,MapGuide也提供了相应的操作处理器类。从图19‑10可以看到,操作处理器类分为四个层次,最高一层是类IMgOperationHandler,它是所有操作处理类的基类,第二层是类MgServiceOperation,第三层是类MgXXXOperation,“XXX”代表服务名称,在这一层每种类型的服务都有一个对应的类,例如MgFeatureOperation、MgResourceOperation等,第四层为真正负责工作的操作处理器类,每种服务中的每个操作都有一个对应的操作处理器类,某种服务中操作处理器类都继承自同一个父类,例如MgEnumerateResource、MgSetResource都继承自MgResourceOperation。
接下来让我们看看MapGuide服务器如何处理枚举资源操作,这个操作流程的时序图如图 19‑11所示。
图 19‑12 MapGuide服务器处理枚举资源操作的时序图
1) 在线程池中找一个空闲的线程执行操作
MapGuide服务器使用了多线程和线程池的技术来提高操作处理和响应请求的性能。在MapGuide服务器启动时,会创建一个操作请求队列和一个用于处理操作请求的线程池。当一个操作请求进入操作队列,MapGuide服务器会在线程池中找一个空闲的线程执行操作。如果没有空闲的线程,那么这个操作请求会处于等待状态,直到线程池中有了空闲的线程。
首先,让我们查看一下服务器的入口函数的源代码,看看服务器是如何启动的。MapGuide服务器可以像普通的应用程序一样运行,还可以在Windows操作系统可以运行为Windows服务,在Linux操作系统运行为守护进程,下面的代码中只保留了运行为普通应用程序部分的代码。
typedef ACE_Singleton SERVER; int ACE_TMAIN(int argc, ACE_TCHAR *argv[]) { int nResult = 0; ...... // 初始化ACE服务 ACE::init(); ...... // 根据命令行参数执行响应的操作 if((ACE_OS::strcasecmp(parameter, ACE_TEXT("?")) == 0) || (ACE_OS::strcasecmp(parameter, MG_WCHAR_TO_TCHAR(MgResources::ServerCmdHelp)) == 0)) { // 显示服务器命令行参数 ShowCommandlineHelp(); ...... } else if((ACE_OS::strcasecmp(parameter, MG_WCHAR_TO_TCHAR(MgResources::ServerCmdRun)) == 0) || (ACE_OS::strcasecmp(parameter, MG_WCHAR_TO_TCHAR(MgResources::ServerCmdInteractive)) == 0)) { ACE_OS::printf(MG_WCHAR_TO_CHAR(MgResources::ServerCmdRunInfo)); ...... // 以普通应用程序的方式运行服务器 nResult = SERVER::instance()->init(argc, argv); if(0 == nResult) { // 启动服务 nResult = SERVER::instance()->open(); // 终止服务 SERVER::instance()->fini(); } ...... } ...... // 终止ACE服务 ACE::fini(); return nResult; } |
在上面的代码中,我们可以看到大量以“ACE”为前缀的类,这些类是ACE自适配通信环境(Adaptive Communication Environment)工具包中的类。ACE是一个开源的工具包,它实现了许多用于并发通信软件的核心模式。ACE的目标用户是在UNIX和Win32平台上开发高性能通信服务和应用的开发者。ACE简化了使用进程间通信、事件多路分离、显式动态链接和并发的OO网络应用和服务的开发。ACE提供了一组丰富的可复用C++ Wrapper Facade(包装外观)和框架组件,可跨越多种平台完成通用的通信软件任务,其中包括:事件多路分离和事件处理器分派、信号处理、服务初始化、进程间通信、共享内存管理、消息路由、分布式服务动态(重)配置、并发执行和同步等。
类MgServer继承自AEC工具包中的类ACE_NT_Service或ACE_Service_Object,对应于服务器的主线程,图19‑12显示了服务器主线程类的类图。MgServer::svc()是主线程的入口,调用方法MgServer::open()会启动主线程并且执行方法MgServer::svc(),调用MgServer::fini()会终止主线程。MapGuide使用了Singleton模板ACE_Singleton将MgServer封装为一个单实例类,为了方便使用MapGuide使用typedef为ACE_Singleton定义了一个别名SERVER,调用方法SERVER::instance()可以创建类MgServer的一个实例。
图 19‑13 服务器主线程类的类图
MgServer::svc()是主线程的入口,该方法会创建一个操作请求队列和一个用于处理操作请求的线程池,它的代码如下所示。事实上,该方法还定义了其它类型的队列和线程池,为了便于理解我们省略掉了这些代码。
int MgServer::svc() { MgServerManager* pServerManager = MgServerManager::GetInstance(); // 创建线程管理器和操作线程 ACE_Thread_Manager threadManager; MgOperationThread clientThreads(threadManager, pServerManager->GetClientThreads()); pServerManager->SetClientMessageQueue(clientThreads.msg_queue_); MgClientAcceptor clientAcceptor(clientAddr, ACE_Reactor::instance(), clientThreads.msg_queue_); nResult = clientAcceptor.Initialize(); if(nResult == 0) { // 启动线程池 nResult = clientThreads.Activate(); if(nResult == 0) { // 通知操作线程停止执行 ACE_Message_Block* mb = new ACE_Message_Block(4); if(mb) { mb->msg_type(ACE_Message_Block::MB_STOP); clientThreads.putq(mb); } // 停止操作线程 clientThreads.close(); // 等待所有操作线程执行完成 threadManager.wait(); threadManager.close(); } } return nResult; } |
上面的代码中创建一个类MgOperationThread的实例,该对象创建了一个线程池,维护了一个操作请求队列。类MgOperationThread 继承自ACE工具包中的模板类AEC_Task,对应于操作线程,图19‑13显示了操作线程类的类图。ACE_Task封装了任务,每个任务都含有一或多个线程,以及一个底层消息队列。方法ACE_Task::svc()是线程的启动入口,调用方法ACE_Task::open()用于初始化任务,调用方法ACE_Task::close()用于终止任务,调用方法ACE_Task::activate()用于启动线程,调用方法ACE_Task::putq()放置消息到任务的消息队列中,调用方法ACE_Task::getq()从任务的消息队列中取出消息。
图 19‑14 操作线程类的类图
方法MgOperationThread::svc()是操作线程的入口,它的代码如下所示。
int MgOperationThread::svc() { INT32 nResult = 0; while (m_bActive) { ACE_Message_Block* messageBlock = NULL; // 从消息队列中取出消息 while (getq(messageBlock) == -1) { ...... } if(messageBlock->msg_type() == ACE_Message_Block::MB_STOP) { m_bActive = false; ACE_Message_Block* mb = new ACE_Message_Block(4); if(mb) { mb->msg_type(ACE_Message_Block::MB_STOP); putq(mb); } } else if(messageBlock->msg_type() == ACE_Message_Block::MB_DATA) { MgServerStreamData* pData = dynamic_cast(messageBlock->data_block()); IMgServiceHandler::MgProcessStatus stat = ProcessMessage(messageBlock); ...... } } return nResult; } IMgServiceHandler::MgProcessStatus MgOperationThread::ProcessMessage(ACE_Message_Block* pMB) { IMgServiceHandler::MgProcessStatus stat = IMgServiceHandler::mpsError; MgServerStreamData* pData = NULL; pData = (MgServerStreamData*) pMB->data_block(); ...... MgStreamParser::ParseStreamHeader(pData); MgStreamParser::ParseDataHeader(pData); MgPacketParser::MgPacketHeader pt = MgPacketParser::GetPacketHeader(pData); switch ( pt ) { case (MgPacketParser::mphOperation): stat = ProcessOperation( pData ); break; ...... } return stat; } |
从上面的代码可以看到,方法MgOperationThread::svc()会调用方法getq(…)循环访问消息队列,取出等待处理的消息。如果是数据类型的消息,那么调用方法MgOperationThread:: ProcessMessage(...)处理这个消息。方法MgOperationThread::ProcessMessage(...)会解析这个消息的头和数据,如果消息数据中包含了一个操作请求,那么调用方法MgOperationThread:: ProcessOperation(…)处理这个操作请求。
2) 处理服务请求
方法MgOperationThread::ProcessOperation(…) 首先会解析消息数据中的服务ID、操作ID、操作的版本号等信息,然后根据服务ID使用工厂类MgServiceHanlderFactory来创建一个服务请求处理器类的实例,最后调用这个服务请求处理器类的方法IMgServiceHandler:: ProcessOperation(...)处理这个服务请求。
本节的示例中客户端发送的是一个枚举资源的操作请求,所以工厂类会创建一个MgResourceServiceHandler的实例,调用方法MgResourceServiceHandler::ProcessOperation(...)处理这个服务请求。
IMgServiceHandler::MgProcessStatus MgOperationThread::ProcessOperation(MgServerStreamData* pData) { IMgServiceHandler::MgProcessStatus stat = IMgServiceHandler::mpsError; MgOperationPacket op; MgStreamHelper* pHelper = NULL; pHelper = pData->GetStreamHelper(); pHelper->GetUINT32(op.m_PacketHeader); pHelper->GetUINT32(op.m_PacketVersion); pHelper->GetUINT32(op.m_ServiceID); pHelper->GetUINT32(op.m_OperationID); pHelper->GetUINT32(op.m_OperationVersion); IMgServiceHandler* pServiceHandler = MgServiceHandlerFactory::Instance()->GetHandler(op.m_ServiceID, pData, op); delete pServiceHandler; pServiceHandler = NULL; stat = pServiceHandler->ProcessOperation(); return stat; } |
3) 处理操作请求
首先,方法MgResourceServiceHandler::ProcessOperation(...)会根据操作请求的ID,调用资源操作工厂类的方法MgResourceOperationFactory::GetOperation(…)创建相应的资源服务操作类MgOpEnumerateResources的实例。然后,调用MgOpEnumerateResources:: Initialize(...)初始化枚举资源操作处理器,调用方法MgServiceManager::RequestService(…)创建服务器资源服务MgServerResourceService的实例。最后,调用方法MgOpEnumerateResources::Execute()处理操作请求,该方法会调用MgServerResourceService::EnumerateResources(…)实际执行枚举资源的功能。
IMgServiceHandler::MgProcessStatus MgResourceServiceHandler::ProcessOperation() { IMgServiceHandler::MgProcessStatus status = IMgServiceHandler::mpsError; auto_ptr handler; MG_TRY() handler.reset(MgResourceOperationFactory::GetOperation( m_packet.m_OperationID, m_packet.m_OperationVersion)); assert(NULL != handler.get()); handler->Initialize(m_data, m_packet); handler->Execute(); status = IMgServiceHandler::mpsDone; MG_CATCH(L"MgResourceServiceHandler.ProcessOperation") if (mgException != NULL && NULL != handler.get()) { status = (handler.get()->HandleException(mgException) ? IMgServiceHandler::mpsDone : IMgServiceHandler::mpsError); } if (IMgServiceHandler::mpsDone != status) { MG_THROW(); } return status; } void MgOpEnumerateResources::Execute() { ...... BeginExecution(); ...... Validate(); Ptr byteReader = m_service->EnumerateResources(resource, depth, type, properties, fromDate, toDate, computeChildren); EndExecution(byteReader); ...... } |