Linux 平台下基于 select 的文本回显程序

关于阻塞与非阻塞

一般我们传统的TCP或者UDP套接字编程(如图1 2所示)是默认采取阻塞的方式的。

阻塞

Linux 平台下基于 select 的文本回显程序

阻塞编程
  • 发出一个不能立即完成的套接字调用时,其进程将被投入到睡眠,进程或线程就被阻塞,函数不能立即返回,等待相应的操作完成。
  • 输入操作函数调用时,如果没有网络数据到达,则 进程被投入到睡眠,直到有数据到达才被唤醒。
  • TCP协议可以是单字节也可以是一个完整的TCP分 段中的数据到达来唤醒进程。如果想要等到固定数 目的数据可读时唤醒,可以调用readn函数或指定 MSG_WAITALL标志。
  • 阻塞调用是指调用结果返回之前,当前线程会被挂起。调用线程只有在得到结果之后才会返回。

Linux 平台下基于 select 的文本回显程序

图1 TCP socket编程流程图

Linux 平台下基于 select 的文本回显程序

图2 UDP socket编程流程图

非阻塞

Linux 平台下基于 select 的文本回显程序

非阻塞编程
  • 进程或线程执行此函数时不必非要等待事件的发生,一旦执行肯定返回,以返回值的不同来反映函数的执行情况,如果事件发生则与阻塞方式相同,若事件没有发生则返回一个代码来告知事件未发生,而进程或线程继续执行,所以效率较高。
  • 非阻塞调用指在不能立刻得到结果之前,该调用不会阻塞当前线程。

其实,套接字函数也可以实现非阻塞。
有两种方式设置非阻塞模式:

  1. 套接字描述符属性设置函数fcntl。
  2. 设置套接字函数中标志参数MSG_DONTWAIT设置套接字函数中标志参数MSG_DONTWAIT。

Fcntl函数是将套接字描述符设置为非阻塞属 性,即对于该描述符的所有操作都是非阻塞 的。

套接字函数标志参数MSG_DONTWAIT只设置当 前的函数为非阻塞的方式。

I/O复用

接下来,重点是I/O复用函数select的介绍:

I/O复用的应用场景

  • 处理多个描述符
  • TCP服务器既要处理监听套接字,又要处理已连 接套接字
  • 服务器要处理多个服务或多个协议

关于select函数

select函数允许进程指示内核等待多个事件中的任何一个发生,并只在有一个或多个事件发生或经历一段指定的时间后才唤醒它。

  • 格式

int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout)

  • 参数
  • nfds 指定待测的描述符的范围,它的值是集合中所有待测描述符中,数值最大的那个值,再加1。
  • readfds、writefds和exceptfds分别是可读、可写和异常的描述符集。
  • timeout表示select阻塞等待的最大时间。
  • timeval结构格式

struct timeval {
long tv_sec; //秒
long tv_usec; //微妙 };

  • 返回值:负值:select错误;正值:某些文件可读写或出错;0:等待超时,没有可读写或错误的文件。
  • fd_set集合处理宏
  • void FD_ZERO(fd_set *set);
    清除set集合中的内容
  • void FD_CLR(int fd, fd_set *set); 删除set集中的fd描述符;
  • void FD_SET(int fd, fd_set *set); 增加fd到描述符集set;
  • int FD_ISSET(int fd, fd_set *set); 判断set中满足条件的描述符是否是fd。

服务器代码

/* selectTCPServer.c */
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <unistd.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <errno.h>
#include <sys/time.h>
#include <sys/select.h>

// 服务器端口号
#define MYPORT 1234
// listen队列中等待的连接数
#define BACKLOG 5
// 缓冲区大小
#define BUF_SIZE 1024

int main(void)
{
    int i, n, maxfd;
    int nready;
    // select所需的文件描述符集合 
    fd_set rset, allset;
	// socket文件描述符
    int listenfd, connectfd;
    //FD_SETSIZE为select函数支持的最大描述符个数
    int client[BACKLOG];

    // 声明两个套接字sockaddr_in结构体变量,分别表示客户端和服务器
    struct sockaddr_in server_addr;
    struct sockaddr_in client_addr;

    // 地址信息结构体大小
    socklen_t sin_size;
    sin_size = sizeof(client_addr);
    
    // 交互信息缓冲区
    char buf[BUF_SIZE];
    // 服务器端标准输入
    char stdbuf[BUF_SIZE];

    // 创建初始套接字
    if ((listenfd = socket(AF_INET, SOCK_STREAM, 0)) == -1)
    {
        perror("Create socket failed");
        exit(1);
    }
    
    // 初始化服务器端地址
	bzero(&server_addr, sizeof(server_addr));
	// 定义服务器端的协议IPv4
    server_addr.sin_family = AF_INET;
    // 初始化服务器端的套接字,并用htons和htonl将端口和地址转成网络字节序
    server_addr.sin_port = htons(MYPORT);
    // INADDR_ANY表示允许任何IP地址
    server_addr.sin_addr.s_addr = htonl(INADDR_ANY);

    //设置socket属性   
    int opt = SO_REUSEADDR;  
    setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));

    // 绑定监听套接字,出错则显示出错信息并退出
    if (bind(listenfd, (struct sockaddr *)&server_addr, sizeof(server_addr)) == -1)
    {
        perror("Bind error");
        exit(1);
    }
    
    // 监听客户端
    if (listen(listenfd, BACKLOG) < 0)
    {
        perror("listen() error");
        exit(1);
    }
    printf("listen on port %d\n", MYPORT);

    //初始化select  
    maxfd = listenfd;  
    // 初始化连接套接字集合
    memset(&client, -1, sizeof(client));
    // 清空集合
    FD_ZERO(&allset);
    //将监听socket加入select检测的描述符集合
    FD_SET(listenfd, &allset);
    // 将标准输入套接字加入select检测的描述符集合
    FD_SET(0, &allset);
    while (1)
    {
        rset = allset;
        //调用select   
        nready = select(maxfd + 1, &rset, NULL, NULL, NULL);
        if (nready < 0)
        {
            perror("Select ");
            exit(1);
        }
        // 标准输入监听
	    if (FD_ISSET(0, &rset))
        {
            // 给stdbuf分配空间接收输入
            memset(stdbuf, 0, BUF_SIZE);
            // 接受标准输入
            gets(stdbuf);
            // 若输入为exit则退出所有客户端服务器并关闭所有连接
            if (strcmp(stdbuf, "exit") == 0)
            {
                // 给客户端发送消息,并关闭所有客户端连接
                for (i = 0; client[i] != -1 && i < BACKLOG; i++)
                {
                    send(client[i], stdbuf, sizeof(stdbuf), 0);
                    close(client[i]);
                }
                // 关闭监听套接字
                if (close(listenfd) == -1)
                {
                    perror("Close listensocket failed ");
                    exit(1);
                }
                // 成功退出
                return EXIT_SUCCESS;
            }
        }
        // 动态添加监听的套接字并加入rset集合
        if (FD_ISSET(listenfd, &rset))
        {
            // 接受客户端的连接请求,返回连接套接字
            connectfd = accept(listenfd, (struct sockaddr *)&client_addr, &sin_size);
            if (connectfd < 0)
            {
                perror("accept");
                exit(1);
            }
            // 查找 client 数组中是否还有空余位置
            for (i = 0; i < BACKLOG; i++)
            {
                // 如果集合有空位,则将套接字放进并加入allset集合
                if (client[i] == -1)
                {
                    client[i] = connectfd;
                    FD_SET(connectfd, &allset);
                    // 更新maxfd
                    if (connectfd > maxfd)
                    {
                        maxfd = connectfd;
                    }
                    printf("Client[%d] join.\n", client[i]);
                    break;
                }
                // 查找不到,告诉客户端已经满了
                if (i == (BACKLOG-1))
                {
                    strcpy(buf, "queue has been full");
                    send(connectfd, buf, sizeof(buf), 0);
                    close(connectfd);
                }
            }
        }
	    // 创建多个客户端文件描述符
	    for (i = 0; i < BACKLOG; i++)
	    {
            //如果客户端描述符小于0,则没有客户端连接,检测下一个
            if (client[i] == -1)
            {
                continue;
            }
            // 有客户连接,检测是否有数据
            if (FD_ISSET(client[i], &rset))
            {
                printf("Receive from connect client %d.\n", client[i]);
                n = recv(client[i], buf, BUF_SIZE, 0);
                if (n == -1)
                {
                    perror("Fail to receive");
                    exit(1);
                }
                // 如果接收到客户端发送的exit
                if (strcmp(buf, "exit") == 0)
                {
                    // 关闭连接
                    close(client[i]);
                    // 清出allset集合
                    FD_CLR(client[i], &allset);
                    printf("Client %d exit\n", client[i]);
                    // 数组位变为未占用
                    client[i] = -1;
                }
                else
                {
                    // 处理文本尾部
                    buf[n] = '\0';
                    // 显示文本
                    printf("The message of client %d is: %s\n", client[i], buf);
                    // 发送信息给客户端
                    if (send(client[i], buf, sizeof(buf), 0) == -1)
                    {
                        perror("Fail to reply");
                        exit(1);
                    }
                }
            }
	    }
    }
}

客户机代码

/* selectTCPClient.c */  
#include <stdio.h>  
#include <string.h>  
#include <stdlib.h>
#include <sys/types.h>  
#include <sys/socket.h>  
#include <netinet/in.h>  
#include <arpa/inet.h>
#include <unistd.h>
#include <sys/stat.h>  
#include <fcntl.h>
#include <errno.h>
#include <netdb.h>
  
#define BUF_SIZE 1024 
  
int main(int argc,char* argv[])
{ 
    //客户机只需要一个套接字文件描述符,用于和服务机通信  
    int clientSocket;  
    //描述服务器的socket  
    struct sockaddr_in serverAddr;
    // socklen_t addr_len = sizeof(serverAddr);
    char sendbuf[BUF_SIZE];  
    char recvbuf[BUF_SIZE];  
    int iDataNum;

    if (argc != 3)
    {
        fputs("Usage: ./Server serverIP serverPORT\n", stderr);
        exit(1);
    }

    if((clientSocket = socket(AF_INET, SOCK_STREAM, 0)) < 0)  
    {
        perror("socket");  
        return -1;  
    }
    printf("clientSocket:%d\n",clientSocket);
  
    serverAddr.sin_family = AF_INET;
    // 服务器端口号
    int SERVER_PORT;
    sscanf(argv[2], "%d", &SERVER_PORT);
    serverAddr.sin_port = htons(SERVER_PORT);   // SERVER_PORT 
    //指定服务器端的ip,本地测试:127.0.0.1  
    //inet_addr()函数,将点分十进制IP转换成网络字节序IP
    serverAddr.sin_addr.s_addr = inet_addr(argv[1]);
    // 建立套接字和给定服务器地址之间的连接
    int con = connect(clientSocket, (struct sockaddr *)&serverAddr, sizeof(serverAddr));
  	printf("connect:%d",con);
    if( con < 0)  
    {
        perror("connect");
        return -1;  
    }  
  
    printf("connect with destination host...\n");  
  
    while(1)  
    {  
        printf("Input your world:>");  
        gets(sendbuf);
        printf("\n");  
  
        send(clientSocket, sendbuf, strlen(sendbuf), 0);  
        if(strcmp(sendbuf, "exit") == 0)
        {
            break;
        }  
        iDataNum = recv(clientSocket, recvbuf, BUF_SIZE, 0);  
        recvbuf[iDataNum] = '\0';  
        printf("received data of my world is: %s\n", recvbuf);  
    }  
    close(clientSocket);  
    return 0;  
} 

编译:
gcc -Wall selectTCPServer.c -o selectTCPServer
gcc -Wall selectTCPClient.c -o selectTCPClient

运行:
./selectTCPServer
./selectTCPClient 127.0.0.1 1234