IPC- 消息队列 详解
Linux下进程间通信方式:
管道( pipe ):管道是一种半双工的通信方式,数据只能单向流动,而且只能在具有亲缘关系的进程间使用。进程的亲缘关系通常是指父子进程关系。
有名管道 (named pipe) : 有名管道也是半双工的通信方式,但是它允许无亲缘关系进程间的通信。
信号量( semophore ) : 信号量是一个计数器,可以用来控制多个进程对共享资源的访问。它常作为一种锁机制,防止某进程正在访问共享资源时,其他进程也访问该资源。因此,主要作为进程间以及同一进程内不同线程之间的同步手段。
消息队列( message queue ) : 消息队列是由消息的链表,存放在内核中并由消息队列标识符标识。消息队列克服了信号传递信息少、管道只能承载无格式字节流以及缓冲区大小受限等缺点。
信号 ( sinal ) : 信号是一种比较复杂的通信方式,用于通知接收进程某个事件已经发生。
共享内存( shared memory ) :共享内存就是映射一段能被其他进程所访问的内存,这段共享内存由一个进程创建,但多个进程都可以访问。共享内存是最快的 IPC 方式,它是针对其他进程间通信方式运行效率低而专门设计的。它往往与其他通信机制,如信号两,配合使用,来实现进程间的同步和通信。
套接字( socket ) : 套解口也是一种进程间通信机制,与其他通信机制不同的是,它可用于不同及其间的进程通信。
System V 与 POSIX的区别
System V IPC存在时间比较老,许多系统都支持,但是接口复杂,并且可能各平台上实现略有区别(如ftok的实现及限制)。
POSIX是新标准,现在多数UNIX也已实现,我觉得如果只是开发的话,那么还是POSIX好,因为语法简单,并且各平台上实现都一样。
一、什么是消息队列
消息队列提供了一种从一个进程向另一个进程发送一个数据块的方法。 每个数据块都被认为含有一个类型,接收进程可以独立地接收含有不同类型的数据结构。我们可以通过发送消息来避免命名管道的同步和阻塞问题。但是消息队列与命名管道一样,每个数据块都有一个最大长度的限制。
Linux用宏MSGMAX和MSGMNB来限制一条消息的最大长度和一个队列的最大长度。
1、每种进程通信方式实现方式和功能不一样,带来适用的场景也有所不同
消息队列是链表队列,它通过内核提供一个struct msqid_ds *msgque[MSGMNI]向量维护内核的一个消息队列列表,因此linux系统支持的最大消息队列数由msgque数组大小来决定,每一个msqid_ds表示一个消息队列,并通过msqid_ds.msg_first、msg_last维护一个先进先出的msg链表队列,当发送一个消息到该消息队列时,把发送的消息构造成一个msg结构对象,并添加到msqid_ds.msg_first、msg_last维护的链表队列,同样,接收消息的时候也是从msg链表队列尾部查找到一个msg_type匹配的msg节点,从链表队列中删除该msg节点,并修改msqid_ds结构对象的数据。
2、消息队列的数据结构
struct msqid_ds *msgque[MSGMNI]向量:
msgque[MSGMNI]是一个msqid_ds结构的指针数组,每个msqid_ds结构指针代表一个系统消息队列,msgque[MSGMNI]的大小为MSGMNI=128,也就是说系统最多有MSGMNI=128个消息队列。
3、消息队列Key的获取:
在程序中若要使用消息队列,必须要能知道消息队列key,因为应用进程无法直接访问内核消息队列中的数据结构,因此需要一个消息队列的标识,让应用进程知道当前操作的是哪个消息队列,同时也要保证每个消息队列key值的唯一性
http://blog.csdn.net/stonecao/article/details/10364287
二、消息队列与命名管道的比较
消息队列跟命名管道有不少的相同之处,通过与命名管道一样,消息队列进行通信的进程可以是不相关的进程,同时它们都是通过发送和接收的方式来传递数据的。在命名管道中,发送数据用write,接收数据用read,则在消息队列中,发送数据用msgsnd,接收数据用msgrcv。而且它们对每个数据都有一个最大长度的限制。
与命名管道相比,消息队列的优势在于:
1、消息队列也可以独立于发送和接收进程而存在,从而消除了在同步命名管道的打开和关闭时可能产生的困难。
2、同时通过发送消息还可以避免命名管道的同步和阻塞问题,不需要由进程自己来提供同步方法。
3、接收程序可以通过消息类型有选择地接收数据,而不是像命名管道中那样,只能默认地接收。
SYSTEM V 消息队列
1、创建或者使用消息队列:msgget函数
该函数用来创建和访问一个消息队列。它的原型为:
int msgget(key_t, key, int msgflg);
与其他的IPC机制一样,程序必须提供一个键来命名某个特定的消息队列。msgflg是一个权限标志,表示消息队列的访问权限,它与文件的访问权限一样。msgflg可以与IPC_CREAT做或操作,表示当key所命名的消息队列不存在时创建一个消息队列,如果key所命名的消息队列存在时,IPC_CREAT标志会被忽略,而只返回一个标识符。
在程序中若要使用消息队列,必须要能知道消息队列key,因为应用进程无法直接访问内核消息队列中的数据结构,因此需要一个消息队列的标识,让应用进程知道当前操作的是哪个消息队列,同时也要保证每个消息队列key值的唯一性。
申请一块内存,创建一个新的消息队列(数据结构msqid_ds),将其初始化后加入到msgque向量表中的某个空位置处,返回标示符。或者在msgque向量表中找键值为key的消息队列。
2、将消息添加到消息队列中
int msgsend(int msgid, const void *msg_ptr, size_t msg_sz, int msgflg);
msgid是由msgget函数返回的消息队列标识符。
msg_ptr是一个指向准备发送消息的指针,但是消息的数据结构却有一定的要求,指针msg_ptr所指向的消息结构一定要是以一个长整型成员变量开始的结构体,接收函数将用这个成员来确定消息的类型。所以消息结构要定义成这样:
struct my_message{
long int message_type;
/* The data you wish to transfer*/
};
msg_sz是msg_ptr指向的消息的长度,注意是消息的长度,而不是整个结构体的长度,也就是说msg_sz是不包括长整型消息类型成员变量的长度,也就是在使用的时候需要sizeof(struct my_message)- sizeof(long)
msgflg用于控制当前消息队列满或队列消息到达系统范围的限制时将要发生的事情。
如果调用成功,消息数据的一分副本将被放到消息队列中,并返回0,失败时返回-1.
3、从一个消息队列中获取消息
int msgrcv(int msgid, void *msg_ptr, size_t msg_st, long int msgtype, int msgflg);
msgid, msg_ptr, msg_st的作用也函数msgsnd函数的一样。
msgtype可以实现一种简单的接收优先级。如果msgtype为0,就获取队列中的第一个消息。如果它的值大于零,将获取具有相同消息类型的第一个信息。如果它小于零,就获取类型等于或小于msgtype的绝对值的第一个消息。
msgflg用于控制当队列中没有相应类型的消息可以接收时将发生的事情。
调用成功时,该函数返回放到接收缓存区中的字节数,消息被复制到由msg_ptr指向的用户分配的缓存区中,然后删除消息队列中的对应消息。失败时返回-1.
4、消息队列控制函数
int msgctl(int msgid, int command, struct msgid_ds *buf);
command是将要采取的动作,它可以取3个值
IPC_STAT:把msgid_ds结构中的数据设置为消息队列的当前关联值,即用消息队列的当前关联值覆盖msgid_ds的值。
IPC_SET:如果进程有足够的权限,就把消息列队的当前关联值设置为msgid_ds结构中给出的值
IPC_RMID:删除消息队列
buf是指向msgid_ds结构的指针,它指向消息队列模式和访问权限的结构。msgid_ds结构至少包括以下成员:
struct msgid_ds
{
uid_t shm_perm.uid;
uid_t shm_perm.gid;
mode_t shm_perm.mode;
};
成功时返回0,失败时返回-1.
代码如下
/*************************************************************************
> File Name: recv.cpp
> Author:
> Mail:
> Created Time: 2020年02月12日 星期三 09时29分12秒
************************************************************************/
# include<iostream>
# include <signal.h>
# include <sys/wait.h>
# include <stdio.h>
# include <unistd.h>
# include <string.h>
# include <stdlib.h>
# include <sys/types.h>
# include <sys/ipc.h>
# include <sys/msg.h>
# include <errno.h>
# define MSGKEY 1024
using namespace std; typedef void Sigfunc(int);
struct msgstru{
long msgtype;
char msgtext[2048];
};
void sig_chld(int signo)
{
pid_t pid;
int stat;
if((pid=waitpid(-1,&stat,WNOHANG))>0)
cout<<pid<<" child terminated\n"<<endl;
return ;
}
Sigfunc*
signal_bind(int signo,Sigfunc* func)
{
struct sigaction act,oact;
act.sa_handler=func;
sigemptyset(&act.sa_mask);
act.sa_flags=0;
act.sa_flags|=SA_RESTART;
if(sigaction(signo,&act,&oact)<=0)
return SIG_ERR;
return oact.sa_handler;
}
void childproc()
{
struct msgstru msgs;
int msgid;
char str[512];
msgid=msgget(MSGKEY,IPC_EXCL);
if(msgid<0){
printf("msq not existed ! errno = %d [%s]/n",errno,strerror(errno));
exit(-1);
}
while(1)
{
int ret_val=msgrcv(msgid,&msgs,sizeof(msgstru)-sizeof(long),getpid(),0);
//这里接受的消息类型是和进程id号一样的才会接受
if(ret_val<0&&errno==EIDRM)
{
cout<<getpid()<<" msq deleted."<<endl;
printf("errno = %d [%s]/n",errno,strerror(errno));
exit(-1);
}
printf("text = [%s] pid = [%d]\n",msgs.msgtext,getpid());
}
}
int
main()
{
int i,cpid;
signal_bind(SIGCHLD,sig_chld);//绑定信号 用于父进程回收子进程
for( i = 0 ;i < 5;i++ )
{
cpid=fork();
if(cpid< 0 )
printf("fork failed\n");
else if(cpid == 0)
childproc();
else
cout<<"process "<<cpid<<" created."<<endl;
}
while(1);
}
/*************************************************************************
> File Name: send.cpp
> Author:
> Mail:
> Created Time: 2020年02月12日 星期三 09时20分41秒
************************************************************************/
# include<iostream>
# include <stdio.h>
# include <stdlib.h>
# include <string.h>
# include <sys/types.h>
# include <sys/ipc.h>
# include <sys/msg.h>
# include <errno.h>
# define MSGKEY 1024
using namespace std;
struct msgstru{
long msgtype;
char msgtext[2048];
};
int
main()
{
struct msgstru msgs;
int msg_type;
char str[256];
int ret_val;
int msqid;
msqid=msgget(MSGKEY,IPC_EXCL);
if(msqid<0){
msqid=msgget(MSGKEY,IPC_CREAT|0666);
if(msqid<0){
printf("failed to created msq | errno =%d [%s]\n",errno,strerror(errno));
exit(-1);
}
}
while(1){
cout<<"input message type (end : 0): ";
cin>>msg_type;
if(!msg_type) break;
cout<<"input message to be sent :";
cin>>str;
msgs.msgtype = msg_type;
strcpy(msgs.msgtext,str);
ret_val=msgsnd(msqid,&msgs,sizeof(msgs.msgtext),IPC_NOWAIT);
if(ret_val<0){
printf("failed to created msq | errno =%d [%s]\n",errno,strerror(errno));
exit(-1);
}
}
msgctl(msqid,IPC_RMID,0);
}
消息队列在内核的实现(转)
1. struct msg_queue
消息队列在内核中用下面的数据结构表示
// Linux 3.4: _msg_sm.h
struct msg_queue {
struct list_head list_elem;
struct msg_mgr *msg_mgr;
u32 max_msgs; /* Node message depth */
u32 msgq_id; /* Node environment pointer */
struct list_head msg_free_list; /* Free MsgFrames ready to be filled */
/* Filled MsgFramess waiting to be read */
struct list_head msg_used_list;
void *arg; /* Handle passed to mgr on_exit callback */
struct sync_object *sync_event; /* Signalled when message is ready */
struct sync_object *sync_done; /* For synchronizing cleanup */
struct sync_object *sync_done_ack; /* For synchronizing cleanup */
struct ntfy_object *ntfy_obj; /* For notification of message ready */
bool done; /* TRUE <==> deleting the object */
u32 io_msg_pend; /* Number of pending MSG_get/put calls */
};
可以看到消息队列实际上就是一个链表。
2. 分配获取消息队列
首先上层应用通过调用 msgget 来进行系统调用,然后陷入内核:
int msgget(key, msgflg)
key_t key;
int msgflg;
{
return INLINE_SYSCALL (ipc, 5, IPCOP_msgget, key, msgflg, 0, NULL);
}
消息队列的内核实现机制和共享内存几乎相同:在内核开辟一片内存空间存放消息队列,不同的进程使用这个消息队列(内存空间)来通信,与共享内存使用相同的一组回调函数 ipc_ops。
msgget 的基本的调用过程如下:
比较重要的是最后一步回调 newque 函数,这个函数在内核中分配了一个消息队列 msg:
3. 发送消息
上层的 msgsnd 系统调用最后会调用到内核的 do_msgsnd 函数,过程如下:
具体的过程这里就不分析了,主要就是在链表中添加一项。
4. 接受消息
上层的 msgrcv 系统调用最后会调用到内核的 do_msgrcv 函数,过程如下:
同样接收消息会从链表中删除一项,具体的过程可以根据这个路线仔细分析。
作者:程序小歌
链接:https://www.jianshu.com/p/7598e5ed5200
来源:简书
POSIX 标准消息队列
1、mq_open来创建非默认个数大小消息队列:
函数原型
mqd_t mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr);
第4个参数为 mq_attr 指针
struct mq_attr{
long mq_flags;
long mq_maxmsg;
long mq_msgsize;
long mq_curmsgs;
};
当第四个参数为空指针时,就使用默认属性。
当指向mq_attr结构的指针作为参数时,允许我们在该函数的实际操作时创建一个新队列时,给它指定mq_maxmsg和mq_msgsize属性.mq_open忽略该结构的另外两个成员.
(1)attr.mq_maxmsg 不能超过文件 /proc/sys/fs/mqueue/msg_max 中的数值;
(2)attr.mq_msgsize不能超过 /proc/sys/fs/mqueue/msgsize_max 的数值;
(3)消息队列名称前面必须加上斜杆。
在POSIX消息队列中 msg_max 默认为 10 ,msgsize_max 默认为8192 ,否则会报错!!!
可以在 /proc/sys/fs/mqueue# cat msg_max
/proc/sys/fs/mqueue# cat msgsize_max
查看
修改的话,要使用:echo 1000 > /proc/sys/fs/mqueue/msg_max往里面写。
2、获取消息队列的属性
一个进程在发送和接收消息之前,需要了解消息对象的属性,如消息的最大长度。以便设定接收和发送的buffer大小。
mqd_t mq_getattr(mqd_t mqdes, struct mq_attr *attr);参数:
Mqdes:打开消息队列时获取的描述符。
Attr:指向结构struct mq_attr的指针,用来获取消息队列的四个属性
struct mq_attr {
long mq_flags; // 0或者O_NONBLOCK
long mq_maxmsg; //队列中包含的消息数的最大限制数
long mq_msgsize; //每个消息大小的最大限制数
long mq_curmsgs; //当前队列中的消息数
}
3、设置消息队列属性
我们可以设置消息队列的属性,实际只能设置flag标志,说明队列中没有消息时,接收消息的进程是否在队列上继续等待。
mqd_t mq_setattr(mqd_t mqdes, struct mq_attr *newattr, struct mq_attr *oldattr);
参数:
Mqdes:打开消息队列时获取的描述符。
Attr:指向结构struct mq_attr的指针,用来获取消息队列的最大消息个数和最大消息长度。放到数据结构的mq_maxmsg和mq_msgsize中。
struct mq_attr {
long mq_flags; // 0或者O_NONBLOCK,只能设置这个
long mq_maxmsg; //队列中包含的消息数的最大限制数
long mq_msgsize; //每个消息大小的最大限制数
long mq_curmsgs; //当前队列中的消息数
}
oldattr:用来保存设置之前的attr值,可以为NULL.
4、发送消息
进程在打开消息队列后,可以使用下面的函数发送消息
int mq_send(mqd_t mqdes, const char *ptr, size_t len, unsigned int prio);
参数:
mqdes: 打开消息队列时获得的描述符。
ptr: 指向发送缓冲区的指针,发送缓冲区存放了要发送的数据。
Len: 要发送的数据的长度。 prio :消息的优先级;它是一个小于 MQ_PRIO_MAX 的数,数值越大,优先级越高。
prio: POSIX 消息队列在调用 mq_receive 时总是返回队列中 最高优先级的最早消息 。如果消息不需要设定优先级,那么可以在 mq_send 是置 prio 为 0 , mq_receive 的 prio 置为 NULL 。
返回值:发送成功,返回0,失败,返回-1.
5、接收消息
进程在打开消息队列后,可以使用下面的函数接收消息。
ssize_t mq_receive(mqd_t mqdes, char *ptr, size_t len, unsigned int *prio);
参数:
mqdes: 打开消息队列时获得的描述符。
ptr: 指向接收缓冲区的指针。接收缓冲区用来存放收到的消息。
Len: 接收缓冲区的长度。 len不能小于mq_msgsize,否则会返回EMSGSIZE
prio :消息的优先级;它是一个小于 MQ_PRIO_MAX 的数,数值越大,优先级越高。 POSIX 消息队列在调用 mq_receive 时总是返回队列中 最高优先级的最早消息 。如果消息不需要设定优先级,那么可以在 mq_send 是置 prio 为 0 , mq_receive 的 prio 置为 NULL 。
返回值: 接收成功,返回0,失败,返回-1.
6、消息队列的关闭
mqd_t mq_close(mqd_t mqdes); 关闭消息队列,但不能删除它 成功返回0,失败返回-1
7、删除消息队列
mqd_t mq_unlink(const char *name); 成功返回0,失败返回-1
当某个进程还没有关闭此消息队列时,调用mq_unlink时,不会马上删除队列,当最后一个进程关闭队列时,该队列被删除,实际就是引用计数为0的时候会删除。
代码
编译:
gcc -o consumer consumer.c -lrt
gcc -o producer producer.c -lrt
/*
*
* Filename: producer.c
*
* Description: 生产者进程
*
* Version: 1.0
* Created: 09/30/2011 04:52:23 PM
* Revision: none
* Compiler: gcc(g++)
*
* Author: |Zhenghe Zhang|, |zhenghe.zhang@gmail.com|
* Company: |Shenzhen XXX Technology Co., Ltd.|
*
*/
# include <stdio.h>
# include <mqueue.h>
# include <sys/stat.h>
# include <stdlib.h>
# include <unistd.h>
# include <time.h>
# include <string.h>
# define MAXSIZE 10 //定义buf大小
# define BUFFER 8192 //定义Msg大小
struct MsgType{
int len;
char buf[MAXSIZE];
char x;
short y;
};
int main()
{
/*消息队列*/
mqd_t msgq_id;
struct MsgType msg;
unsigned int prio = 1;
unsigned int send_size = BUFFER;
struct mq_attr msgq_attr;
const char *file = "/posix";
/*mq_open() for creating a new queue (using default attributes) */
/*mq_open() 创建一个新的 POSIX 消息队列或打开一个存在的队列*/
msgq_id = mq_open(file, O_RDWR | O_CREAT, S_IRWXU | S_IRWXG, NULL);
if(msgq_id == (mqd_t)-1)
{
perror("mq_open");
exit(1);
}
/* getting the attributes from the queue -- mq_getattr() */
if(mq_getattr(msgq_id, &msgq_attr) == -1)
{
perror("mq_getattr");
exit(1);
}
printf("Queue \"%s\":\n\t- stores at most %ld messages\n\t- \
large at most %ld bytes each\n\t- currently holds %ld messages\n",
file, msgq_attr.mq_maxmsg, msgq_attr.mq_msgsize, msgq_attr.mq_curmsgs);
/*setting the attributes of the queue -- mq_setattr() */
/*mq_setattr() 设置消息队列的属性,设置时使用由 newattr 指针指向的 mq_attr 结构的信息。*/
/*属性中只有标志 mq_flasgs 里的 O_NONBLOCK 标志可以更改,其它在 newattr 内的域都被忽略 */
if(mq_setattr(msgq_id, &msgq_attr, NULL) == -1)
{
perror("mq_setattr");
exit(1);
}
int i = 0;
while(i < 10)
{
msg.len = i;
memset(msg.buf, 0, MAXSIZE);
sprintf(msg.buf, "0x%x", i);
msg.x = (char)(i + 'a');
msg.y = (short)(i + 100);
printf("msg.len = %d, msg.buf = %s, msg.x = %c, msg.y = %d\n", msg.len, msg.buf, msg.x, msg.y);
/*sending the message -- mq_send() */
/*mq_send() 把 msg_ptr 指向的消息加入由 mqdes 引用的消息队列里。*/
/*参数 msg_len 指定消息 msg_ptr 的长度:这个长度必须小于或等于队列 mq_msgsize 属性的值。零长度的消息是允许。*/
if(mq_send(msgq_id, (char*)&msg, sizeof(struct MsgType), prio) == -1)
{
perror("mq_send");
exit(1);
}
i++;
sleep(1);
}
sleep(30); //等待消费者进程退出
/*closing the queue -- mq_close() */
/*mq_close() 关闭消息队列描述符 mqdes。如果调用进程在消息队列 mqdes 绑定了通知请求,*/
/*那么这个请求被删除,此后其它进程就可以绑定通知请求到此消息队列。*/
if(mq_close(msgq_id) == -1)
{
perror("mq_close");
exit(1);
}
/*mq_unlink() 删除名为 name 的消息队列。消息队列名将被直接删除。*/
/*消息队列本身在所有引用这个队列的描述符被关闭时销毁。*/
if(mq_unlink(file) == -1)
{
perror("mq_unlink");
exit(1);
}
return 0;
}
/*
*
* Filename: consumer.c
*
* Description: 消费者进程
*
* Version: 1.0
* Created: 09/30/2011 04:52:23 PM
* Revision: none
* Compiler: gcc(g++)
*
* Author: |Zhenghe Zhang|, |zhenghe.zhang@gmail.com|
* Company: |Shenzhen XXX Technology Co., Ltd.|
*
*/
# include <stdio.h>
# include <mqueue.h>
# include <sys/stat.h>
# include <stdlib.h>
# include <unistd.h>
# include <time.h>
# include <string.h>
# define MAXSIZE 10 //定义buf大小
# define BUFFER 8192 //定义Msg大小
struct MsgType{
int len;
char buf[MAXSIZE];
char x;
short y;
};
int main()
{
/*消息队列*/
mqd_t msgq_id;
struct MsgType msg;
unsigned int sender;
struct mq_attr msgq_attr;
unsigned int recv_size = BUFFER;
const char *file = "/posix";
/* mq_open() for opening an existing queue */
msgq_id = mq_open(file, O_RDWR);
if(msgq_id == (mqd_t)-1)
{
perror("mq_open");
exit(1);
}
/* getting the attributes from the queue -- mq_getattr() */
if(mq_getattr(msgq_id, &msgq_attr) == -1)
{
perror("mq_getattr");
exit(1);
}
printf("Queue \"%s\":\n\t- stores at most %ld messages\n\t- \
large at most %ld bytes each\n\t- currently holds %ld messages\n",
file, msgq_attr.mq_maxmsg, msgq_attr.mq_msgsize, msgq_attr.mq_curmsgs);
if(recv_size < msgq_attr.mq_msgsize)
recv_size = msgq_attr.mq_msgsize;
int i = 0;
while(i < 10) //运行一个consumenr,为 10 ,同时运行两个consumer进程,为 5
{
msg.len = -1;
memset(msg.buf, 0, MAXSIZE);
msg.x = ' ';
msg.y = -1;
/* getting a message */
/*mq_receive() 从由描述符 mqdes 引用的队列时删除优先级最高的最老的消息,并把放置到 msg_ptr 的缓存区内。*/
/*参数 msg_len 指定缓冲区 msg_ptr 的大小:它必须大于队列的 mq_msgsize 属性(参数 mq_getattr)。*/
/*如果 prio 不是 NULL,那么它指向的内存用于返回收到消息相关的优先级。*/
if (mq_receive(msgq_id, (char*)&msg, recv_size, &sender) == -1)
{
perror("mq_receive");
exit(1);
}
printf("msg.len = %d, msg.buf = %s, msg.x = %c, msg.y = %d\n", msg.len, msg.buf, msg.x, msg.y);
i++;
sleep(2);
}
if(mq_close(msgq_id) == -1)
{
perror("mq_close");
exit(1);
}
return 0;
}