一个模拟的负载均衡系统的实现
一、标题
一个模拟的负载均衡系统的实现
二、题目
为了构建可伸缩的,高可用的网络服务,很多大型网站都采用了负载均衡技术。
利用负载均衡技术,可以将多台廉价的、低性能的服务器,组合成一台性能强劲的,高可用的虚拟服务器。
负载均衡的常见实现方式大致如下:
将网络服务的地址(如公网IP地址、tcp套接字等)部署在负载均衡器上,而不是真实的服务器上;
将负载均衡器作为网络服务的总入口,接收用户的所有访问请求;
负载均衡器接收到用户的访问请求后,将访问请求按照一定的策略分发给某一台真实的服务器进行处理;
真实服务器,对访问请求进行处理后,将处理结果发送给负载均衡器;
负载均衡器接收到真实服务器的处理结果,将他发送给用户。
下图展示了一个负载均衡系统的组网结构,图中包含了1台负载均衡器,3台真实的服务器。公网IP配置在负载均衡器上,负载均衡器与真实服务器之间,则通过私网地址进行通讯。
图1 负载均衡组网结构
有关负载均衡的技术原理及更多详细信息,请上网查阅相关资料或阅读相关书籍。
本题的任务,是在PC机上实现一个模拟的负载均衡系统。
他包含如下3个可执行程序:
服务端 (server.exe)――辅助程序,通过UDP端口,提供时间查询服务。
负载均衡器(LB.exe) ――核心程序,用于实现负载均衡功能。
客户端 (client.exe)――辅助程序,通过UDP端口,访问时间查询服务。
3个程序的协作关系如下图所示,其中客户端与服务端程序,需要起多个进程。
图 2 系统协作
上图中,每一个方框表示一个进程。每一个进程拥有一个唯一的id(注意,这是由用户配置的id,并非操作系统为进程分配的pid),进程之间一律通过UDP协议进行通信。系统运行起来之后,客户端通过UDP协议向负载均衡器发送“时间请求”消息,负载均衡器通过UDP协议将消息分发给某个服务端进行处理。服务端返回“时间应答”消息给负载均衡器,负载均衡器将“时间应答”消息返回给客户端。
为了简化实现,本模拟系统中,所有消息,都采用如下结构体进行封装。通过msg_type字段的值,来区分不同类型的消息。
typedef struct
{
/* 消息的发送进程是谁,就填谁的id */
unsigned src_id;
/* 消息的接收进程是谁,就填谁的id */
unsigned dst_id;
/* 发送“时间请求”消息时填写,
回复“时间应答”消息时,其值要与请求消息保持一致。 */
unsigned usr_id;
/* 消息类型:0, 时间请求;1, 时间答应;2, 心跳请求;3, 心跳应答 */
unsigned msg_type;
/* 服务端回复“时间应答”消息时,
在data中填入当前时间的字符串,形式如“2013-06-20 13:56:28”即可 */
char data[32];
} t_msg;
三、功能需求
a)服务端程序
服务端程序,需要起多个进程,每个进程拥有一个唯一的id,绑定到一个唯一的UDP端口上。每个服务端进程通过自己绑定的UDP端口接收“时间请求”消息,如果消息中的dst_id等于自己的id,就向对端发送“时间应答”消息。否则,就丢弃此消息。
每个服务端进程的id、udp端口号,可以通过命令行参数传入,可以通过配置文件配置,也可以在进程运行时指定。三种方式,只要支持任意一种就行了。
服务端程序,需要具备一个调试开关。在运行过程中,可以打开/关闭调试开关。当调试开关打开后,服务端进程需要将自己接收/发送的每一个消息,都实时显示给用户看。
服务端程序,需要具备统计功能。在运行过程中,可以随时可看,每个服务端进程接收了多少条消息(正确的多少条,错误的多少条),应答了多少条消息。
b)负载均衡程序
负载均衡程序,只要启动一个进程即可。此进程拥有一个唯一的id,绑定到两个不同的UDP端口上。一个UDP端口(下文称为client_udp_port)用于收发客户端的消息,一个UDP端口(下文称为server_udp_port)用于收发服务端的消息。
负载均衡进程的id是多少,绑定的两个udp端口号是多少,支持多少个服务端,每个服务端的id、udp端口各是多少,均通过配置文件进行配置的。负载均衡进程启动时读入这些信息,运行过程中,不会改变。
对于客户端有多少个,每个客户端的id是多少,UDP端口号是多少,负载均衡进程对这些信息是一无所知的,也是无法预测的。
负载均衡进程通过client_udp_port接收到客户端的“时间请求”消息后,如果消息中的dst_id不等于自己的id,就丢弃此消息。否则,就按照轮转算法选出一个服务端,将时间请求消息中的dst_id改成此服务端的id后,将消息通过server_udp_port分发给该服务端处理。
负载均衡进程通过server_udp_port接收到客户端的“时间应答”消息后,将消息中的src_id改成自己的id,然后将消息通过client_udp_port发送给消息中的dst_id所指示的客户端。
负载均衡程序,需要具备一个调试开关。在运行过程中,可以打开/关闭调试开关。当调试开关打开后,程序需要将自己接收/发送的每一个消息,都实时显示给用户看。
负载均衡程序,需要具备统计功能。在运行过程中,可以随时可看,本进程从客户端接收了多少条消息(正确的多少条,错误的多少条),向客户端发送了多少条消息,从服务端接收了多少条消息(正确的多少条,错误的多少条),向服务端发送了多少条消息。
负载均衡程序,还需要具备日志功能。在运行过程中,如果出现异常事件(如UDP接收、发送失败等),需要记录日志,供后续分析查看。日志中,尽当尽可能包含详细的信息,如异常事件发生的时间、事件描述、事件原因等。
c)客户端程序
客户端程序,需要起多个进程,每个进程拥有一个id,一个usr_id,绑定到一个默认分配的UDP端口上。每个客户端进程启动后,通过自己绑定的UDP端口向负载均衡器发送n条“时间请求”消息,并接收相应的时间应答消息。时间请求消息中的src_id填写自己的id,usr_id填写自己的usr_id,dst_id填写负载均衡器进程的id。
每个客户端进程的id、usr_id,发送的消息条数n,可以通过命令行参数传入,可以通过配置文件配置,也可以在进程启动时指定。三种方式,只要支持任意一种就行了。
注意,不同客户端进程的id可以相同、但usr_id不可以相同。
客户端进程,在运行过程中,需要将自己接收/发送的每一个消息,都实时显示给用户看。客户端进程完成自己的任务后,显示一下相关的统计信息,即可退出。统计信息包括本进程发送了多少条消息,接收了多少条消息(正确的多少条,错误的多少条)。
四、代码实现
服务器端
#include <iostream>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#include <netinet/in.h>
#include <sys/select.h>
#include <signal.h>
#include <assert.h>
#include <fcntl.h>
#include <unistd.h>
using namespace std;
#define IPSTR "127.0.0.1"
//定义一个通用的数据结构
struct msg
{
int src_id; //发送者的id
int des_id; //接受者的id
int usr_id;
//消息的类型,
int msg_type;
//数据部分
char data[32];
};
class CServer
{
public:
//创建对象的时候必须给定id 和端口号
CServer(int i,int p):id(i),port(p){}
//将服务器的id 和 port 写入配置文件
void InitCnf();
//初始化套接字,创建套接字并绑定端口
int InitSock();
//保存日志文件
void CreatLog(msg &data);
//实时显示用户的请求信息与服务器的发送信息
void display(msg &data);
//处理socket端的程序
void dealSock(int sockfd, bool &flag);
//接受负载均衡器发来的数据
bool request(int sockfd, msg &data, struct sockaddr_in &caddr);
//当接受到正确的信息后,服务器向负载均衡器回复数据包
void response(int sockfd, msg &data, struct sockaddr_in &addr);
//调试器开关 通过监控键盘终端的文件描述符,输入数据为debug 时开关打开
void debug(bool &flag);
//拼接回复的信息
void creatData(msg &data);
//负责总的调用过程
void mainfun();
private:
int id; //记录服务器的id
int port; //记录服务器的端口
};
void CServer::dealSock(int sockfd, bool &flag)
{
msg data;
struct sockaddr_in addr;
if(request(sockfd, data, addr))
{
if(flag)
{
display(data);
//接受数据放入日志文件
CreatLog(data);
creatData(data);
//回复数据加入日志文件
CreatLog(data);
display(data);
}
else
{
CreatLog(data);
creatData(data);
//回复数据加入日志文件
CreatLog(data);
}
response(sockfd, data, addr);
}
}
//在这之前首先初始化配置文件,创建一个对象
void CServer::mainfun()
{
fd_set fdset;
FD_ZERO( &fdset );
int fd = 0;
int sockfd = InitSock();
FD_SET(fd, &fdset);
FD_SET(sockfd, &fdset);
//默认不开启调试器
bool flag = false;
while(1)
{
int n = select(sockfd+1, &fdset, NULL, NULL, NULL);
assert(n != -1);
if(FD_ISSET(fd, &fdset))
{
debug(flag);
}
if(FD_ISSET(sockfd, &fdset))
{
//处理sock的数据
dealSock(sockfd, flag);
}
FD_ZERO( &fdset );
FD_SET(fd, &fdset);
FD_SET(sockfd, &fdset);
}
}
//通过终端控制调试器
void CServer::debug(bool &flag)
{
char buff[128] = {0};
int n = read(0, buff, 127);
if(strncmp(buff, "debug", 5) == 0)
{
//进入调试状态
cout << "===============进入debug模式============" << endl;
flag = true;
}
else if(strncmp(buff, "off", 3) == 0)
{
//关闭调试状态
cout << "==============退出debug模式============="<< endl;
flag = false;
}
}
//拼接回复信息
void CServer::creatData(msg &data)
{
data.src_id = data.des_id ^ data.src_id;
data.des_id = data.des_id ^ data.src_id;
data.src_id = data.des_id ^ data.src_id;
data.msg_type += 1; //设置为回复类型
strcpy(data.data,"ok");
}
void CServer::InitCnf()
{
FILE *fp = fopen("server.cnf","a");
char buff[128] = {0};
sprintf(buff,"%d,%d \n",id,port);
//将服务器的id 和 port 写入配置文件
fwrite(buff, strlen(buff), 1, fp);
fclose(fp);
}
//初始化套接字, 返回一个创建好的套接字
int CServer::InitSock()
{
int sockfd = socket(AF_INET, SOCK_DGRAM, 0);
assert( sockfd != -1 );
struct sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = inet_addr(IPSTR);
//绑定端口
int res = bind(sockfd, (struct sockaddr*) &addr, sizeof(addr));
assert(res != -1);
//返回创建的套接字
return sockfd;
}
//保存到本地的log.log中
void CServer::CreatLog(msg &data)
{
//保存日志 传入一个msg 格式的数据,保存到本地文档
FILE *fp = fopen("log.log","a");
assert( fp != NULL );
char buff[128] = {0};
sprintf(buff,"发送方id:%d,接受方id:%d,客户端usr_id:%d,请求方式:%d,数据:%s\n",
data.src_id, data.des_id, data.usr_id, data.msg_type,data.data);
fwrite(buff, strlen(buff), 1, fp);
fclose(fp);
}
void CServer::display(msg &data)
{
cout << "发送方id:" << data.src_id << "接收方id:" << data.des_id << "数据包类型:" << data.msg_type;
cout << "客户端usr_id:" << data.usr_id << "数据:" << data.data << endl;
cout << endl;
}
//接受负载均衡器发来的消息,只接受发给本地id 的数据包
//接受加过滤的功能
//如果是发送给自己的数据包,则接受,返回true;
//否则返回false 丢弃数据包
bool CServer::request(int sockfd, msg &data, struct sockaddr_in &caddr)
{
socklen_t len = sizeof(caddr);
//接受数据
recvfrom(sockfd, &data, sizeof(data), 0, (struct sockaddr*) &caddr, &len);
if(data.des_id == id)
{
return true;
}
return false;
}
//服务器向客户端回复数据
void CServer::response(int sockfd, msg &data, struct sockaddr_in &addr)
{
sendto(sockfd, &data, sizeof(data), 0, (struct sockaddr*) &addr, sizeof(addr));
}
int main(int argc, char *argv[])
{
//接受输入的id 和 port
int id,port;
if(argc < 3)
{
cout << "参数错误" << endl;
return 1;
}
id = strtol(argv[1],NULL,10);
port = strtol(argv[2],NULL,10);
//创建一个服务器对象
CServer ser(id,port);
ser.InitCnf();
ser.mainfun();
return 0;
}
客户端
#include <iostream>
#include <sys/socket.h>
#include <assert.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/select.h>
using namespace std;
#define PORT 8888
#define IPSTR "127.0.0.1"
#define ID 1 //负载均衡器的id
#define REQUEST 0 //数据包的类型
//定义结构体
typedef struct
{
int src_id;
int des_id;
int usr_id;
int msg_type;
char data[32];
}msg;
class CClient
{
public:
//构建对象必须指定id 和 usr_id
CClient(int i,int u_id):id(i),usr_id(u_id){}
//初始化一个套接字
int initSock(struct sockaddr_in &addr);
//发送数据
void sendMsg(int sockfd, struct sockaddr_in &addr, msg &data);
//接受数据
bool recvMsg(int sockfd, struct sockaddr_in &addr, msg &data);
//显示数据
void display(msg &data);
//显示统计结果
void displayRes();
//构造发送的数据
void creatData(msg &data);
//主控函数
void mainfun(int n);
private:
int id;
int usr_id;
struct Count
{
Count(int r = 0, int s = 0, int e = 0):recv(r),send(s),error(e){}
int recv; //接受的个数
int send; //发送的数据包的个数
int error; //错误的个数
};
Count cou;
};
//主控函数
void CClient::mainfun(int n)
{
struct sockaddr_in addr;
int sockfd = initSock(addr);
fd_set fdset;
FD_ZERO( &fdset );
FD_SET(sockfd, &fdset);
msg data;
for(int i = 0; i < n; i++)
{
creatData(data);
display(data);
sendMsg(sockfd, addr, data);
//设置2秒的超时时间
struct timeval tv = {2,0};
int n = select(sockfd+1, &fdset, NULL, NULL, &tv);
if( n > 0 )
{
recvMsg(sockfd, addr, data);
display(data);
}
FD_ZERO( &fdset );
FD_SET(sockfd, &fdset);
}
}
void CClient::creatData(msg &data)
{
data.src_id = id;
//负载均衡器使用的服务器
data.des_id = ID;
data.usr_id = usr_id;
data.msg_type = REQUEST;
strcpy(data.data,"time");
}
//返回文件描述符 并设置addr
int CClient::initSock(struct sockaddr_in &addr)
{
int sockfd = socket(AF_INET, SOCK_DGRAM, 0);
assert(sockfd != -1);
//连接负载均衡服务器
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(PORT);
addr.sin_addr.s_addr = inet_addr(IPSTR);
return sockfd;
}
//显示数据
void CClient::display(msg &data)
{
char buff[128] = {0};
sprintf(buff, "发送方id:%d, 接受方id:%d, 客户端usr_id:%d, 数据包类型: %d, 数据:%s"
, data.src_id, data.des_id, data.usr_id, data.msg_type, data.data);
cout << buff << endl;
}
//发送消息
void CClient::sendMsg(int sockfd, struct sockaddr_in &addr, msg &data)
{
sendto(sockfd, &data, sizeof(data), 0, (struct sockaddr*) &addr, sizeof(addr));
cou.send++;
}
//接收消息出判断是否是自己的数据
bool CClient::recvMsg(int sockfd, struct sockaddr_in &addr, msg &data)
{
socklen_t len = sizeof(addr);
recvfrom(sockfd, &data, sizeof(data), 0, (struct sockaddr*) &addr, &len);
cou.recv++;
if(data.des_id == id)
{
return true;
}
return false;
}
int main(int argc, char *argv[])
{
if(argc < 4)
{
cout << "参数错误" << endl;
return -1;
}
int id = strtol(argv[1],NULL,10);
int usr_id = strtol(argv[2], NULL, 10);
int n = strtol(argv[3], NULL, 10);
CClient cli(id,usr_id);
cli.mainfun(n);
return 0;
}
负载均衡器
#include <iostream>
#include <sys/socket.h>
#include <queue>
#include <unistd.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/select.h>
#include <string.h>
#include <assert.h>
#include <unordered_map>
#include <stdio.h>
#include <stdlib.h>
using namespace std;
#define IPSTR "127.0.0.1"
typedef struct
{
int src_id;
int des_id;
int usr_id;
int msg_type;
char data[32];
}msg;
class CBlance
{
public:
//读取配置文件,获取id 和 端口
void readConfig();
//通过配置文件获取服务端的信息
void InitSerInfo();
//处理来自服务器的数据
void dealServerMsg(int sockSer, int sockCli, unordered_multimap<int, pair<int, struct sockaddr_in> > &cli_table);
//处理来自客户端的数据
void dealClientMsg(int sockSer, int sockCli, unordered_multimap<int, pair<int, struct sockaddr_in> > &cli_table);
//服务器端关闭套接字
void delSerAddId(int &id,struct sockaddr_in &addr);
//绑定端口,获取sock 套接字
int InitSock(int &port);
//将内容写入日志文件
void writeLog(msg &data);
//接收消息recvMsg
bool recvMsg(int sockfd, struct sockaddr_in &addr, msg &data);
//发送消息sendMsg
bool sendMsg(int sockfd, struct sockaddr_in &addr, msg &data);
//轮循函数,动态分配服务器
void balanceLoop(int &id, struct sockaddr_in &addr);
//主调函数
void mainfun();
private:
int id;
int serPort;
int cliPort;
queue<struct sockaddr_in> server_addr; //存放服务器的地址
queue<int> server_id; //存放服务器的id
//bind 两个ip 一个与客户端通信,一个与服务器通信
struct Count
{
Count(int r = 0, int s = 0, int e = 0):
recv(r),send(s),error(e)
{}
int recv; //接收数据包的个数
int send; //发送数据包的个数
int error; //错误数据包的个数
};
Count c;
};
//主调函数
void CBlance::mainfun()
{
readConfig();
InitSerInfo();
int sockSer = InitSock(serPort);
int sockCli = InitSock(cliPort);
int max = sockSer > sockCli ? sockSer:sockCli;
fd_set fdset;
FD_ZERO( &fdset );
FD_SET(sockSer, &fdset);
FD_SET(sockCli, &fdset);
// id usr_id addr
unordered_multimap<int, pair<int,struct sockaddr_in> > cli_table; //客户端的请求表,可重复
while( 1 )
{
int n = select(max+1, &fdset, NULL, NULL, NULL);
if(n > 0)
{
if(FD_ISSET(sockSer, &fdset))
{
//接收到来自服务器的数据
dealServerMsg(sockSer, sockCli, cli_table);
}
if(FD_ISSET(sockCli, &fdset))
{
//接收到来自客户端的数据
dealClientMsg(sockSer, sockCli, cli_table);
}
}
FD_ZERO(&fdset);
FD_SET(sockSer, &fdset);
FD_SET(sockCli, &fdset);
}
}
//处理来自服务器的数据
void CBlance::dealServerMsg(int sockSer, int sockCli, unordered_multimap<int, pair<int,struct sockaddr_in> > &cli_table)
{
struct sockaddr_in addr;
msg data;
if(recvMsg(sockSer, addr, data))
{
//将接收到的消息写入日志文件
writeLog(data);
if(data.des_id == id)
{
auto it = cli_table.find(data.usr_id);
data.src_id = id;
data.des_id = it->second.first; //目的id
if(it != cli_table.end())
{
sendMsg(sockCli, it->second.second, data);
writeLog(data);
//删除一条记录
cli_table.erase(it);
}
}
//else 对错误数据的处理
}
}
//处理来自客户端的数据
void CBlance::dealClientMsg(int sockSer, int sockCli, unordered_multimap<int , pair<int, struct sockaddr_in> > &cli_table)
{
struct sockaddr_in addr;
msg data;
if(recvMsg(sockCli, addr, data))
{
writeLog(data);
if(data.des_id == id)
{
//将客户端的消息放入cli_table中
cli_table.insert({data.usr_id, {data.src_id, addr} });
int ser_id;
balanceLoop(ser_id,addr);
data.src_id = id;
data.des_id = ser_id;
//向服务器转发消息
sendMsg(sockSer, addr, data);
//将发送的数据写入日志文件
writeLog(data);
}
//else 如果直接丢弃
}
}
//服务器端关闭套接字
void CBlance::delSerAddId(int &id,struct sockaddr_in &addr)
{
//删除服务器端关闭的套接字
int size = server_addr.size();
for(int i = 0; i < size; i++)
{
if(id != server_id.front())
{
server_id.push(server_id.front());
server_addr.push(server_addr.front());
server_id.pop();
server_addr.pop();
}
else
{
server_id.pop();
server_addr.pop();
break;
}
}
}
//轮循函数,动态分配服务器
void CBlance::balanceLoop(int &id, struct sockaddr_in &addr)
{
addr = server_addr.front();
id = server_id.front();
//将分配的服务器放入队尾
server_addr.pop();
server_id.pop();
server_addr.push(addr);
server_id.push(id);
}
//读入服务器的配置信息
void CBlance::InitSerInfo()
{
FILE *fp = fopen("server.cnf","r");
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = inet_addr(IPSTR);
char buff[128] = {0};
while( fgets(buff, 127, fp) > 0 )
{
int id,port;
sscanf(buff, "%d,%d", &id, &port);
addr.sin_port = htons(port);
//将服务端的地址和id 放入队列
server_addr.push(addr);
server_id.push(id);
}
fclose(fp);
}
//读取配置文件中的id 和 serPort cliPort
void CBlance::readConfig()
{
FILE *fp = fopen("balance.cnf", "r");
assert(fp != NULL);
char buff[128] = {0};
if(fgets(buff, 127 ,fp) != NULL)
{
sscanf(buff, "%d,%d,%d", &id, &serPort, &cliPort);
}
fclose(fp);
}
//返回创建的套接字
int CBlance::InitSock(int &port)
{
int sockfd = socket(AF_INET, SOCK_DGRAM, 0);
assert(sockfd != -1);
struct sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = inet_addr(IPSTR);
int res = bind(sockfd, (struct sockaddr*) &addr, sizeof(addr));
assert(res != -1);
return sockfd;
}
//将数据写入配置文件中
void CBlance::writeLog(msg &data)
{
FILE *fp = fopen("balance.log", "a");
assert(fp != NULL);
char buff[128] = {0};
sprintf(buff, "发送方id:%d,接收方id:%d,客户端usr_id:%d,数据包类型:%d,数据内容:%s\n"
,data.src_id, data.des_id, data.usr_id, data.msg_type, data.data);
fwrite(buff, strlen(buff), 1, fp);
fclose(fp);
}
//接收消息recvMsg
bool CBlance::recvMsg(int sockfd, struct sockaddr_in &addr, msg &data)
{
socklen_t len = sizeof(addr);
int n = recvfrom(sockfd, &data, sizeof(data), 0, (struct sockaddr*)&addr, &len);
//对面关闭链接
if(n == 0) { return false; }
//读取出错
else if(n < 0) { return false; }
//正常读取
else { return true; }
}
//发送消息sendMsg
bool CBlance::sendMsg(int sockfd, struct sockaddr_in &addr, msg &data)
{
int n = sendto(sockfd, &data, sizeof(data), 0, (struct sockaddr*) &addr, sizeof(addr));
//发送失败
if(n == -1) { return false; }
//正常发送
else { return true; }
}
int main()
{
CBlance cb;
cb.mainfun();
return 0;
}