Thrift实践(C++)
文章简介
运用Thrift的TNonblockingServer编写C++实践案例。
Thrift服务端编程
从Thrift-0.12.0版本的C++源代码来看服务端编程主要有:多线程模型,一个新的客户端连接创建一个线程处理;多线程线程池模型,将新的客户端连接放入任务队列中由线程池读取处理;事件驱动异步模型,注册监听事件和可读事件,将客户端的数据放入任务队列中由线程池进行处理。事件驱动异步模型更适合运用在互联网大量用户的场景中,也就是TNonblockingServer。
Thrift中TNonblockingServer的设计和实现
图1-1 TNonblockingServer的实现
如图1-1所示,当一个新的客户端连接accept时,会new一个TConnection对象,通过轮询选择算法发送给IOThread线程池,IOThread接收到客户端发来的请求数据时,将它打包成Task放入任务队列中,再由任务处理线程池Thread从任务队列pop出来调用服务端实现的业务逻辑函数处理。下面以用户注册的业务场景为例,实现一个具体的TNonblockingServer案例。
Thrift接口文件编写
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
//文件名称user.thrift namespace cpp thrift.user
enum EUserType { QQ = 1, WECHAT = 2 }
struct UserRegisterReq { 1:required string sUserName; 2:required string sMd5Pwd; 3:string sPhoneNumber; 4:optional i32 iUserAge; }
struct UserRegisterResp { 1:required i32 iUserId; 2:required bool bSucc; 3:optional i32 iRetCode; 4:optional string sErrMsg; }
service UserService{ UserRegisterResp UserRegister(1:EUserType eType, 2:UserRegisterReq oReq) } |
运行thrift -r --gen cpp user.thrift,生成服务端和客户端RPC接口文件。其中required是必传字段,optional是可选字段。
服务端代码实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
//文件名称nonsocketserver.cpp #include <stdlib.h>
#include <iostream>
#include <thrift/protocol/TBinaryProtocol.h> #include <thrift/transport/TNonblockingServerSocket.h> #include <thrift/server/TNonblockingServer.h> #include <thrift/concurrency/PlatformThreadFactory.h> #include <thrift/concurrency/ThreadManager.h>
#include "gen-cpp/UserService.h"
namespace thrift { namespace user { class UserServiceHandler : virtual public UserServiceIf { public: UserServiceHandler() {} virtual void UserRegister(UserRegisterResp& _return, const EUserType::type eType, const UserRegisterReq& oReq) { std::cout << "etype=" << eType << " oReq=" << oReq << " oResp=" << _return << std::endl; } }; }//user }//thrift
int main(int argc, char* argv[]) { using namespace ::apache::thrift; using namespace ::apache::thrift::concurrency; using namespace ::apache::thrift::protocol; using namespace ::apache::thrift::transport; using namespace ::apache::thrift::server;
using namespace ::thrift::user;
int iIOThreadNum = 0, iTaskThreadNum = 0; if (3 == argc) { iIOThreadNum = ::strtol(argv[1], NULL, 10); iTaskThreadNum = ::strtol(argv[2], NULL, 10); }
int port = 9090; //业务逻辑 stdcxx::shared_ptr<UserServiceHandler> testHandler(new UserServiceHandler()); //业务接口逻辑 stdcxx::shared_ptr<TProcessor> testProcessor(new UserServiceProcessor(testHandler)); //报文协议序列化 protocol stdcxx::shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactoryT<TBufferBase>());
//监听端口 transport stdcxx::shared_ptr<TNonblockingServerTransport> nbSocket(new transport::TNonblockingServerSocket(port)); //创建非阻塞I/O服务 server stdcxx::shared_ptr<TNonblockingServer> nonblockingServer(new TNonblockingServer(testProcessor, protocolFactory, nbSocket)); nonblockingServer->setNumIOThreads(iIOThreadNum); //多线程——连接处理
//创建多线程任务处理 if (iTaskThreadNum > 0) { stdcxx::shared_ptr<ThreadManager> pThreadManager = ThreadManager::newSimpleThreadManager(iTaskThreadNum);//多线程——任务处理 pThreadManager->threadFactory(stdcxx::shared_ptr<PlatformThreadFactory>(new PlatformThreadFactory)); pThreadManager->start(); nonblockingServer->setThreadManager(pThreadManager); }
//服务启动 nonblockingServer->serve(); //服务停止 nonblockingServer->stop();
return 0; } |
客户端代码实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
//文件名称client.cpp #include <iostream>
#include <thrift/protocol/TBinaryProtocol.h> #include <thrift/transport/TSocket.h> #include <thrift/transport/TBufferTransports.h>
#include "gen-cpp/UserService.h"
int main(int argc, char **argv) { using namespace ::apache::thrift; using namespace ::thrift::user;
//I/O transport stdcxx::shared_ptr<transport::TSocket> socket(new transport::TSocket("127.0.0.1", 9090)); //数据读取方式 transport stdcxx::shared_ptr<transport::TTransport> transport(new transport::TFramedTransport(socket)); //报文协议序列化 protocol stdcxx::shared_ptr<protocol::TProtocol> protocol(new protocol::TBinaryProtocol(transport));
UserServiceClient client(protocol); transport->open();
EUserType::type eType = EUserType::QQ; UserRegisterReq oReq; UserRegisterResp oResp; client.UserRegister(oResp, eType, oReq);
transport->close();
std::cout << "eType=" << eType << " oReq=" << oReq << " oReps=" << oResp << std::endl;
return 0; } |
编译和运行
//编译服务端程序和客户端程序 g++ -c -o nonsocketserver.o nonsocketserver.cpp g++ -c -o UserService.o gen-cpp/UserService.cpp g++ -c -o user_types.o gen-cpp/user_types.cpp g++ -g -Wall -o server nonsocketserver.o UserService.o user_types.o -lthrift -lthriftnb g++ -c -o client.o client.cpp g++ -g -Wall -o client client.o UserService.o user_types.o -lthrift -lthriftnb
//运行服务端 ./server Thrift: Mon Jan 28 00:01:38 2019 TNonblockingServer: Serving with 1 io threads. Thrift: Mon Jan 28 00:01:38 2019 TNonblockingServer: using libevent 2.1.8-stable method epoll Thrift: Mon Jan 28 00:01:38 2019 TNonblocking: IO thread #0 registered for listen. Thrift: Mon Jan 28 00:01:38 2019 TNonblocking: IO thread #0 registered for notify. Thrift: Mon Jan 28 00:01:38 2019 TNonblockingServer: IO thread #0 entering loop... etype=QQ oReq=UserRegisterReq(sUserName=, sMd5Pwd=, sPhoneNumber=, iUserAge=<null>) oResp=UserRegisterResp(iUserId=0, bSucc=0, iRetCode=<null>, sErrMsg=<null>)
//运行客户端 ./client eType=QQ oReq=UserRegisterReq(sUserName=, sMd5Pwd=, sPhoneNumber=, iUserAge=<null>) oReps=UserRegisterResp(iUserId=0, bSucc=0, iRetCode=<null>, sErrMsg=<null>) |
参考文献
[1] Mark Slee, Aditya Agarwal and Marc Kwiatkowski. Thrift: Scalable Cross-Language Services Implementation. 2007
[2] Apache Thrift官方网站, http://thrift.apache.org
[3] Apache Thrift源代码, https://github.com/apache/thrift
[4] Lex & Yacc 官方网站, http://dinosaur.compilertools.net