Linux进程间通信——使用消息队列

下面是自己编写的测试代码,需要特别注意的是3个地方:

      (1)消息队列要发送和接收的结构体是有具体要求的,不是任何数据类型都可以,

       struct msgbuf {
               long mtype;       /* message type, must be > 0 */
               char mtext[1];    /* message data */
           };

   int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);

       ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);

 结构体的第一个字节必须是  long型,发送时mtype必须大于0。后面字节类型可以自定义。

msgsnd:向msgid指定的消息队列末尾追加一个由msgp指向的消息,消息内容大小为msgsz。通常消息通过一个结构体进行描述,一般形式如下:

    

 

这个结构体可分为2个部分,mtype用以标识消息类型,mtext这个部分就是消息的内容,可以是你想要的描述消息内容的任何形式如数组,结构体等等。msgsnd,msgrcv里面所指定的消息大小msgsz指的是消息结构体内容mtext部分的长度,不包括mtype!消息类型必须大于0,至于为什么,看到msgrcv函数时就可以知道。

 msgrcv:从msgid指定的消息队列里面取出由msgtyp指定类型的消息存放于msgp指向的空间。取出的消息数据大小由msgsz指定。成功时,返回拷贝到mtext中实际的字节数。

  msgtyp有3种情况,用以控制取出消息的方式:

    等于0:取出队列中的第一个消息,这样可以以先进先出的方式取消息(因为msgsnd都是把消息添加到消息队列的最后面)。

    大于0:取出mtype于msgtyp相同的消息。

    小于0:取出消息队列中mtype值小于等于msgtyp绝对值的所有消息中mtype值最小的那个消息。(假设mtype设定为消息的优先级,这种方式可以用于控制消息队列取消息的优先级)

  从上面的描述可以看出这就是为什么消息结构体中mtype值为什么一定要大于0的原因

  (2)#define TEST_MSG_KEY       0x2FF   

   关于KEY,

         linux输入命令:ipcs -q   

         打印出使用消息队列进行进程间通信的信息,打印一下看到下面的key值已经被占用,就不能用下面这些key

------ Message Queues --------
key        msqid      owner      perms      used-bytes   messages    
0x40000001 131072     root       666        16           2           
0x000003ff 32769      root       666        8            1           
0x000002fe 65538      root       666        0            0           
0x000002fd 98307      root       666        0            0 

 (3)  msgrcv时如果msgtyp置0,则只读取一次消息    等于0:取出队列中的第一个消息,这样可以以先进先出的方式取消息



A.CPP
#define TEST_SHM_KEY       0x30000001
#define TEST_SEM_KEY       0x30000001
#define TEST_MSG_KEY       0x2FF
typedef struct
{
int num;
int count;
}ShmData;


char  *shmaddr;
char  new_create=0;
int   shm_id=-1;
ShmData *pData;
int   sem_id=-1;
int   sig_id=-1;


int main(int argc,char *argv[] )


{
int i=0;


int ret=-1;
int size_tmp;
size_tmp=sizeof(ShmData)*64;


int count=0;



/* 消息结构体 */
typedef struct _MSG 
{
long   mType;
int    taskId; /* 消息内容 */
}MsgForm;   
 
MsgForm  Msg;
Msg.mType=01;
Msg.taskId=22;

sig_id=ipc_init_msgq(TEST_MSG_KEY);
if(sig_id<0)
{
printf("测试用例15:**********************消息队列创建失败**********\n");
}
else
{
printf("测试用例15:******消息队列创建成功*****sig_id=%d*****\n",sig_id);
}

{
ret=ipc_msgsnd(sig_id,&Msg,sizeof(Msg));
if(ret<0)
{
printf("测试用例16:**********************消息队列发送失败**********\n");
}
else
{
printf("测试用例16:**********************消息队列发送成功:mType=%dtaskId=%d\n",Msg.mType,Msg.taskId);
}
sleep(40);
}

B.CPP
#define TEST_MSG_KEY       0x2FF
int main(int argc,char *argv[] )


{
int i=0;

int ret=-1;
int size_tmp;
  
/* 消息结构体 */
typedef struct _MSG 
{
long mType;
int taskId; /* 消息内容 */
}MsgForm;  
MsgForm  Msg;


  //while(1)
{
  sig_id=ipc_init_msgq(TEST_MSG_KEY);
if(sig_id<0)
{
printf("测试用例15:**********************消息队列创建失败**********\n");
}
else
{
printf("测试用例15:**********************消息队列创建成功sig_id=%d**********\n",sig_id);
}
while(1)
{
ret=ipc_msgrcv(sig_id,&Msg,sizeof(Msg));
if(ret<0)
{
printf("测试用例17:**********************消息队列接收失败*******ret=%d\n",ret);
}
else
{
printf("测试用例17:**********************消息队列接收成功=taskId=%dn",Msg.taskId);
}
sleep(1);
}



int ipc_msgsnd(int id,char *msg,int msglen)
{
    if(id<0)
    {
        return -1;
    }
    return msgsnd(id, msg, msglen, IPC_NOWAIT);
//if (msgsnd(msgQid, &msg, sizeof(MsgForm), IPC_NOWAIT) < 0)
}

int ipc_msgrcv(int id,char *msg,int msglen)
{
    if(id < 0)
    {
        return -1;
    }
    return msgrcv(id, msg, msglen, 0, IPC_NOWAIT); 
//msgrcv(g_taskd_info.taskMsgQid, &msg, sizeof(MsgForm), 0, IPC_NOWAIT) 
}



下面来说说如何用不用消息队列来进行进程间的通信,消息队列与命名管道有很多相似之处。有关命名管道的更多内容可以参阅我的另一篇文章:Linux进程间通信——使用命名管道

一、什么是消息队列
消息队列提供了一种从一个进程向另一个进程发送一个数据块的方法。  每个数据块都被认为含有一个类型,接收进程可以独立地接收含有不同类型的数据结构。我们可以通过发送消息来避免命名管道的同步和阻塞问题。但是消息队列与命名管道一样,每个数据块都有一个最大长度的限制。

Linux用宏MSGMAX和MSGMNB来限制一条消息的最大长度和一个队列的最大长度。

二、在Linux中使用消息队列
Linux提供了一系列消息队列的函数接口来让我们方便地使用它来实现进程间的通信。它的用法与其他两个System V PIC机制,即信号量和共享内存相似。

1、msgget函数
该函数用来创建和访问一个消息队列。它的原型为:
[cpp] view plain copy
  1. int msgget(key_t, key, int msgflg);  
与其他的IPC机制一样,程序必须提供一个键来命名某个特定的消息队列。msgflg是一个权限标志,表示消息队列的访问权限,它与文件的访问权限一样。msgflg可以与IPC_CREAT做或操作,表示当key所命名的消息队列不存在时创建一个消息队列,如果key所命名的消息队列存在时,IPC_CREAT标志会被忽略,而只返回一个标识符。

它返回一个以key命名的消息队列的标识符(非零整数),失败时返回-1.

2、msgsnd函数
该函数用来把消息添加到消息队列中。它的原型为:
[cpp] view plain copy
  1. int msgsend(int msgid, const void *msg_ptr, size_t msg_sz, int msgflg);  
msgid是由msgget函数返回的消息队列标识符。

msg_ptr是一个指向准备发送消息的指针,但是消息的数据结构却有一定的要求,指针msg_ptr所指向的消息结构一定要是以一个长整型成员变量开始的结构体,接收函数将用这个成员来确定消息的类型。所以消息结构要定义成这样:
[cpp] view plain copy
  1. struct my_message{  
  2.     long int message_type;  
  3.     /* The data you wish to transfer*/  
  4. };  
msg_sz是msg_ptr指向的消息的长度,注意是消息的长度,而不是整个结构体的长度,也就是说msg_sz是不包括长整型消息类型成员变量的长度。

msgflg用于控制当前消息队列满或队列消息到达系统范围的限制时将要发生的事情。

如果调用成功,消息数据的一分副本将被放到消息队列中,并返回0,失败时返回-1.

3、msgrcv函数
该函数用来从一个消息队列获取消息,它的原型为
[cpp] view plain copy
  1. int msgrcv(int msgid, void *msg_ptr, size_t msg_st, long int msgtype, int msgflg);  
msgid, msg_ptr, msg_st的作用也函数msgsnd函数的一样。

msgtype可以实现一种简单的接收优先级。如果msgtype为0,就获取队列中的第一个消息。如果它的值大于零,将获取具有相同消息类型的第一个信息。如果它小于零,就获取类型等于或小于msgtype的绝对值的第一个消息。

msgflg用于控制当队列中没有相应类型的消息可以接收时将发生的事情。

调用成功时,该函数返回放到接收缓存区中的字节数,消息被复制到由msg_ptr指向的用户分配的缓存区中,然后删除消息队列中的对应消息。失败时返回-1.

4、msgctl函数
该函数用来控制消息队列,它与共享内存的shmctl函数相似,它的原型为:
[cpp] view plain copy
  1. int msgctl(int msgid, int command, struct msgid_ds *buf);  
command是将要采取的动作,它可以取3个值,
    IPC_STAT:把msgid_ds结构中的数据设置为消息队列的当前关联值,即用消息队列的当前关联值覆盖msgid_ds的值。
    IPC_SET:如果进程有足够的权限,就把消息列队的当前关联值设置为msgid_ds结构中给出的值
    IPC_RMID:删除消息队列

buf是指向msgid_ds结构的指针,它指向消息队列模式和访问权限的结构。msgid_ds结构至少包括以下成员:
[cpp] view plain copy
  1. struct msgid_ds  
  2. {  
  3.     uid_t shm_perm.uid;  
  4.     uid_t shm_perm.gid;  
  5.     mode_t shm_perm.mode;  
  6. };  
成功时返回0,失败时返回-1.

三、使用消息队列进行进程间通信
马不停蹄,介绍完消息队列的定义和可使用的接口之后,我们来看看它是怎么让进程进行通信的。由于可以让不相关的进程进行行通信,所以我们在这里将会编写两个程序,msgreceive和msgsned来表示接收和发送信息。根据正常的情况,我们允许两个程序都可以创建消息,但只有接收者在接收完最后一个消息之后,它才把它删除。

接收信息的程序源文件为msgreceive.c的源代码为:
[cpp] view plain copy
  1. #include <unistd.h>  
  2. #include <stdlib.h>  
  3. #include <stdio.h>  
  4. #include <string.h>  
  5. #include <errno.h>  
  6. #include <sys/msg.h>  
  7.   
  8. struct msg_st  
  9. {  
  10.     long int msg_type;  
  11.     char text[BUFSIZ];  
  12. };  
  13.   
  14. int main()  
  15. {  
  16.     int running = 1;  
  17.     int msgid = -1;  
  18.     struct msg_st data;  
  19.     long int msgtype = 0; //注意1  
  20.   
  21.     //建立消息队列  
  22.     msgid = msgget((key_t)1234, 0666 | IPC_CREAT);  
  23.     if(msgid == -1)  
  24.     {  
  25.         fprintf(stderr, "msgget failed with error: %d\n", errno);  
  26.         exit(EXIT_FAILURE);  
  27.     }  
  28.     //从队列中获取消息,直到遇到end消息为止  
  29.     while(running)  
  30.     {  
  31.         if(msgrcv(msgid, (void*)&data, BUFSIZ, msgtype, 0) == -1)  
  32.         {  
  33.             fprintf(stderr, "msgrcv failed with errno: %d\n", errno);  
  34.             exit(EXIT_FAILURE);  
  35.         }  
  36.         printf("You wrote: %s\n",data.text);  
  37.         //遇到end结束  
  38.         if(strncmp(data.text, "end", 3) == 0)  
  39.             running = 0;  
  40.     }  
  41.     //删除消息队列  
  42.     if(msgctl(msgid, IPC_RMID, 0) == -1)  
  43.     {  
  44.         fprintf(stderr, "msgctl(IPC_RMID) failed\n");  
  45.         exit(EXIT_FAILURE);  
  46.     }  
  47.     exit(EXIT_SUCCESS);  
  48. }  
发送信息的程序的源文件msgsend.c的源代码为:
[cpp] view plain copy
  1. #include <unistd.h>  
  2. #include <stdlib.h>  
  3. #include <stdio.h>  
  4. #include <string.h>  
  5. #include <sys/msg.h>  
  6. #include <errno.h>  
  7.   
  8. #define MAX_TEXT 512  
  9. struct msg_st  
  10. {  
  11.     long int msg_type;  
  12.     char text[MAX_TEXT];  
  13. };  
  14.   
  15. int main()  
  16. {  
  17.     int running = 1;  
  18.     struct msg_st data;  
  19.     char buffer[BUFSIZ];  
  20.     int msgid = -1;  
  21.   
  22.     //建立消息队列  
  23.     msgid = msgget((key_t)1234, 0666 | IPC_CREAT);  
  24.     if(msgid == -1)  
  25.     {  
  26.         fprintf(stderr, "msgget failed with error: %d\n", errno);  
  27.         exit(EXIT_FAILURE);  
  28.     }  
  29.   
  30.     //向消息队列中写消息,直到写入end  
  31.     while(running)  
  32.     {  
  33.         //输入数据  
  34.         printf("Enter some text: ");  
  35.         fgets(buffer, BUFSIZ, stdin);  
  36.         data.msg_type = 1;    //注意2  
  37.         strcpy(data.text, buffer);  
  38.         //向队列发送数据  
  39.         if(msgsnd(msgid, (void*)&data, MAX_TEXT, 0) == -1)  
  40.         {  
  41.             fprintf(stderr, "msgsnd failed\n");  
  42.             exit(EXIT_FAILURE);  
  43.         }  
  44.         //输入end结束输入  
  45.         if(strncmp(buffer, "end", 3) == 0)  
  46.             running = 0;  
  47.         sleep(1);  
  48.     }  
  49.     exit(EXIT_SUCCESS);  
  50. }  
运行结果如下:

Linux进程间通信——使用消息队列

四、例子分析——消息类型

这里主要说明一下消息类型是怎么一回事,注意msgreceive.c文件main函数中定义的变量msgtype(注释为注意1),它作为msgrcv函数的接收信息类型参数的值,其值为0,表示获取队列中第一个可用的消息。再来看看msgsend.c文件中while循环中的语句data.msg_type = 1(注释为注意2),它用来设置发送的信息的信息类型,即其发送的信息的类型为1。所以程序msgreceive能够接收到程序msgsend发送的信息。

如果把注意1,即msgreceive.c文件main函数中的语句由long int msgtype = 0;改变为long int msgtype = 2;会发生什么情况,msgreceive将不能接收到程序msgsend发送的信息。因为在调用msgrcv函数时,如果msgtype(第四个参数)大于零,则将只获取具有相同消息类型的第一个消息,修改后获取的消息类型为2,而msgsend发送的消息类型为1,所以不能被msgreceive程序接收。重新编译msgreceive.c文件并再次执行,其结果如下:

Linux进程间通信——使用消息队列

我们可以看到,msgreceive并没有接收到信息和输出,而且当msgsend输入end结束后,msgreceive也没有结束,通过jobs命令我们可以看到它还在后台运行着。

五、消息队列与命名管道的比较

消息队列跟命名管道有不少的相同之处,通过与命名管道一样,消息队列进行通信的进程可以是不相关的进程,同时它们都是通过发送和接收的方式来传递数据的。在命名管道中,发送数据用write,接收数据用read,则在消息队列中,发送数据用msgsnd,接收数据用msgrcv。而且它们对每个数据都有一个最大长度的限制。

与命名管道相比,消息队列的优势在于,1、消息队列也可以独立于发送和接收进程而存在,从而消除了在同步命名管道的打开和关闭时可能产生的困难。2、同时通过发送消息还可以避免命名管道的同步和阻塞问题,不需要由进程自己来提供同步方法。3、接收程序可以通过消息类型有选择地接收数据,而不是像命名管道中那样,只能默认地接收。