Thrift实践(C++)

文章简介

运用Thrift的TNonblockingServer编写C++实践案例。

 

Thrift服务端编程

从Thrift-0.12.0版本的C++源代码来看服务端编程主要有:多线程模型,一个新的客户端连接创建一个线程处理;多线程线程池模型,将新的客户端连接放入任务队列中由线程池读取处理;事件驱动异步模型,注册监听事件和可读事件,将客户端的数据放入任务队列中由线程池进行处理。事件驱动异步模型更适合运用在互联网大量用户的场景中,也就是TNonblockingServer。

 

Thrift中TNonblockingServer的设计和实现

Thrift实践(C++)

 

 

 

 

 

 

 

 

 

 

图1-1 TNonblockingServer的实现

如图1-1所示,当一个新的客户端连接accept时,会new一个TConnection对象,通过轮询选择算法发送给IOThread线程池,IOThread接收到客户端发来的请求数据时,将它打包成Task放入任务队列中,再由任务处理线程池Thread从任务队列pop出来调用服务端实现的业务逻辑函数处理。下面以用户注册的业务场景为例,实现一个具体的TNonblockingServer案例。

 

Thrift接口文件编写

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

//文件名称user.thrift

namespace cpp thrift.user

 

enum EUserType

{

QQ = 1,

WECHAT = 2

}

 

struct UserRegisterReq

{

1:required string sUserName;

2:required string sMd5Pwd;

3:string sPhoneNumber;

4:optional i32 iUserAge;

}

 

struct UserRegisterResp

{

1:required i32 iUserId;

2:required bool bSucc;

3:optional i32 iRetCode;

4:optional string sErrMsg;

}

 

service UserService{

UserRegisterResp UserRegister(1:EUserType eType, 2:UserRegisterReq oReq)

}

运行thrift -r --gen cpp user.thrift,生成服务端和客户端RPC接口文件。其中required是必传字段,optional是可选字段。

 

服务端代码实现

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

//文件名称nonsocketserver.cpp

#include <stdlib.h>

 

#include <iostream>

 

#include <thrift/protocol/TBinaryProtocol.h>

#include <thrift/transport/TNonblockingServerSocket.h>

#include <thrift/server/TNonblockingServer.h>

#include <thrift/concurrency/PlatformThreadFactory.h>

#include <thrift/concurrency/ThreadManager.h>

 

#include "gen-cpp/UserService.h"

 

namespace thrift

{

namespace user

{

class UserServiceHandler : virtual public UserServiceIf {

public:

UserServiceHandler() {}

virtual void UserRegister(UserRegisterResp& _return, const EUserType::type eType, const UserRegisterReq& oReq) {

std::cout << "etype=" << eType << " oReq=" << oReq << " oResp=" << _return << std::endl;

}

};

}//user

}//thrift

 

int main(int argc, char* argv[])

{

using namespace ::apache::thrift;

using namespace ::apache::thrift::concurrency;

using namespace ::apache::thrift::protocol;

using namespace ::apache::thrift::transport;

using namespace ::apache::thrift::server;

 

using namespace ::thrift::user;

 

int iIOThreadNum = 0, iTaskThreadNum = 0;

if (3 == argc) {

iIOThreadNum = ::strtol(argv[1], NULL, 10);

iTaskThreadNum = ::strtol(argv[2], NULL, 10);

}

 

int port = 9090;

//业务逻辑

stdcxx::shared_ptr<UserServiceHandler> testHandler(new UserServiceHandler());

//业务接口逻辑

stdcxx::shared_ptr<TProcessor> testProcessor(new UserServiceProcessor(testHandler));

//报文协议序列化 protocol

stdcxx::shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactoryT<TBufferBase>());

 

//监听端口 transport

stdcxx::shared_ptr<TNonblockingServerTransport> nbSocket(new transport::TNonblockingServerSocket(port));

//创建非阻塞I/O服务 server

stdcxx::shared_ptr<TNonblockingServer> nonblockingServer(new TNonblockingServer(testProcessor, protocolFactory, nbSocket));

nonblockingServer->setNumIOThreads(iIOThreadNum); //多线程——连接处理

 

//建多线程任务处理

if (iTaskThreadNum > 0) {

stdcxx::shared_ptr<ThreadManager> pThreadManager = ThreadManager::newSimpleThreadManager(iTaskThreadNum);//多线程——任务处理

pThreadManager->threadFactory(stdcxx::shared_ptr<PlatformThreadFactory>(new PlatformThreadFactory));

pThreadManager->start();

nonblockingServer->setThreadManager(pThreadManager);

}

 

//服务启动

nonblockingServer->serve();

//服务停止

nonblockingServer->stop();

 

return 0;

}

 

客户端代码实现

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

//文件名称client.cpp

#include <iostream>

 

#include <thrift/protocol/TBinaryProtocol.h>

#include <thrift/transport/TSocket.h>

#include <thrift/transport/TBufferTransports.h>

 

#include "gen-cpp/UserService.h"

 

int main(int argc, char **argv)

{

using namespace ::apache::thrift;

using namespace ::thrift::user;

 

//I/O transport

stdcxx::shared_ptr<transport::TSocket> socket(new transport::TSocket("127.0.0.1", 9090));

//数据读取方式 transport

stdcxx::shared_ptr<transport::TTransport> transport(new transport::TFramedTransport(socket));

//报文协议序列化 protocol

stdcxx::shared_ptr<protocol::TProtocol> protocol(new protocol::TBinaryProtocol(transport));

 

UserServiceClient client(protocol);

transport->open();

 

EUserType::type eType = EUserType::QQ;

UserRegisterReq oReq;

UserRegisterResp oResp;

client.UserRegister(oResp, eType, oReq);

 

transport->close();

 

std::cout << "eType=" << eType << " oReq=" << oReq << " oReps=" << oResp << std::endl;

 

return 0;

}

 

编译和运行

//编译服务端程序和客户端程序

g++ -c -o nonsocketserver.o nonsocketserver.cpp

g++ -c -o UserService.o gen-cpp/UserService.cpp

g++ -c -o user_types.o gen-cpp/user_types.cpp

g++ -g -Wall -o server nonsocketserver.o UserService.o user_types.o -lthrift -lthriftnb

g++ -c -o client.o client.cpp

g++ -g -Wall -o client client.o UserService.o user_types.o -lthrift -lthriftnb

 

//运行服务端

./server

Thrift: Mon Jan 28 00:01:38 2019 TNonblockingServer: Serving with 1 io threads.

Thrift: Mon Jan 28 00:01:38 2019 TNonblockingServer: using libevent 2.1.8-stable method epoll

Thrift: Mon Jan 28 00:01:38 2019 TNonblocking: IO thread #0 registered for listen.

Thrift: Mon Jan 28 00:01:38 2019 TNonblocking: IO thread #0 registered for notify.

Thrift: Mon Jan 28 00:01:38 2019 TNonblockingServer: IO thread #0 entering loop...

etype=QQ oReq=UserRegisterReq(sUserName=, sMd5Pwd=, sPhoneNumber=, iUserAge=<null>) oResp=UserRegisterResp(iUserId=0, bSucc=0, iRetCode=<null>, sErrMsg=<null>)

 

//运行客户端

./client

eType=QQ oReq=UserRegisterReq(sUserName=, sMd5Pwd=, sPhoneNumber=, iUserAge=<null>) oReps=UserRegisterResp(iUserId=0, bSucc=0, iRetCode=<null>, sErrMsg=<null>)

 

参考文献

[1] Mark Slee, Aditya Agarwal and Marc Kwiatkowski. Thrift: Scalable Cross-Language Services Implementation. 2007

[2] Apache Thrift官方网站, http://thrift.apache.org

[3] Apache Thrift源代码, https://github.com/apache/thrift

[4] Lex & Yacc 官方网站, http://dinosaur.compilertools.net

Thrift实践(C++)