消息队列是LinuxIPC中很常用的一种通信方式,它通常用来在不同进程间发送特定格式的消息数据。
消息队列和之前讨论过的管道和FIFO有很大的区别,主要有以下两点:
- 一个进程向消息队列写入消息之前,并不需要某个进程在该队列上等待该消息的到达,而管道和FIFO是相反的,进程向其中写消息时,管道和FIFO必需已经打开来读,否则写进程就会阻塞(默认情况下)。
- IPC的持续性不同。管道和FIFO是随进程的持续性,当管道和FIFO最后一次关闭发生时,仍在管道和FIFO中的数据会被丢弃。消息队列是随内核的持续性,即一个进程向消息队列写入消息后,然后终止,另外一个进程可以在以后某个时刻打开该队列读取消息。只要内核没有重新自举,消息队列没有被删除。
消息队列中的每条消息通常具有以下属性:
- 一个表示优先级的整数;
- 消息的数据部分的长度;
- 消息数据本身;
POSIX消息队列的一个可能的设计是一个如下图所示的消息链表,链表头部有消息队列的属性信息。
图1消息队列的可能布局
1POSIX消息队列的创建和关闭
POSIX消息队列的创建,关闭和删除用到以下三个函数接口:
#include <mqueue.h>
mqd_t mq_open(const char *name, int oflag, /* mode_t mode, struct mq_attr *attr */);
//成功返回消息队列描述符,失败返回-1
mqd_t mq_close(mqd_t mqdes);
mqd_t mq_unlink(const char *name);
//成功返回0,失败返回-1
mq_open用于打开或创建一个消息队列。
name:表示消息队列的名字,它符合POSIXIPC的名字规则。
oflag:表示打开的方式,和open函数的类似。有必须的选项:O_RDONLY,O_WRONLY,O_RDWR,还有可选的选项:O_NONBLOCK,O_CREAT,O_EXCL。
mode:是一个可选参数,在oflag中含有O_CREAT标志且消息队列不存在时,才需要提供该参数。表示默认访问权限。可以参考open。
attr:也是一个可选参数,在oflag中含有O_CREAT标志且消息队列不存在时才需要。该参数用于给新队列设定某些属性,如果是空指针,那么就采用默认属性。
mq_open返回值是mqd_t类型的值,被称为消息队列描述符。在Linux2.6.18中该类型的定义为整型:
#include <bits/mqueue.h>
typedef int mqd_t;
mq_close用于关闭一个消息队列,和文件的close类型,关闭后,消息队列并不从系统中删除。一个进程结束,会自动调用关闭打开着的消息队列。
mq_unlink用于删除一个消息队列。消息队列创建后只有通过调用该函数或者是内核自举才能进行删除。每个消息队列都有一个保存当前打开着描述符数的引用计数器,和文件一样,因此本函数能够实现类似于unlink函数删除一个文件的机制。
POSIX消息队列的名字所创建的真正路径名和具体的系统实现有关,关于具体POSIXIPC的名字规则可以参考《UNIX网络编程卷2:进程间通信》的P14。
经过测试,在Linux2.6.18中,所创建的POSIX消息队列不会在文件系统中创建真正的路径名。且POSIX的名字只能以一个’/’开头,名字中不能包含其他的’/’。
2POSIX消息队列的属性
POSIX标准规定消息队列属性mq_attr必须要含有以下四个内容:
long mq_flags //消息队列的标志:0或O_NONBLOCK,用来表示是否阻塞
long mq_maxmsg //消息队列的最大消息数
long mq_msgsize //消息队列中每个消息的最大字节数
long mq_curmsgs //消息队列中当前的消息数目
在Linux2.6.18中mq_attr结构的定义如下:
#include <bits/mqueue.h>
struct mq_attr
{
long int mq_flags; /* Message queue flags. */
long int mq_maxmsg; /* Maximum number of messages. */
long int mq_msgsize; /* Maximum message size. */
long int mq_curmsgs; /* Number of messages currently queued. */
long int __pad[4];
};
POSIX消息队列的属性设置和获取可以通过下面两个函数实现:
#include <mqueue.h>
mqd_t mq_getattr(mqd_t mqdes, struct mq_attr *attr);
mqd_t mq_setattr(mqd_t mqdes, struct mq_attr *newattr, struct mq_attr *oldattr);
//成功返回0,失败返回-1
mq_getattr用于获取当前消息队列的属性,mq_setattr用于设置当前消息队列的属性。其中mq_setattr中的oldattr用于保存修改前的消息队列的属性,可以为空。
mq_setattr可以设置的属性只有mq_flags,用来设置或清除消息队列的非阻塞标志。newattr结构的其他属性被忽略。mq_maxmsg和mq_msgsize属性只能在创建消息队列时通过mq_open来设置。mq_open只会设置该两个属性,忽略另外两个属性。mq_curmsgs属性只能被获取而不能被设置。
下面是测试代码:
#include <iostream>
#include <cstring>
#include <errno.h>
#include <unistd.h>
#include <fcntl.h>
#include <mqueue.h>
using namespace std;
int main()
{
mqd_t mqID;
mqID = mq_open("/anonymQueue", O_RDWR | O_CREAT, 0666, NULL);
if (mqID < 0)
{
cout<<"open message queue error..."<<strerror(errno)<<endl;
return -1;
}
mq_attr mqAttr;
if (mq_getattr(mqID, &mqAttr) < 0)
{
cout<<"get the message queue attribute error"<<endl;
return -1;
}
cout<<"mq_flags:"<<mqAttr.mq_flags<<endl;
cout<<"mq_maxmsg:"<<mqAttr.mq_maxmsg<<endl;
cout<<"mq_msgsize:"<<mqAttr.mq_msgsize<<endl;
cout<<"mq_curmsgs:"<<mqAttr.mq_curmsgs<<endl;
}
在Linux2.6.18中执行结果是:
mq_flags:0
mq_maxmsg:10
mq_msgsize:8192
mq_curmsgs:0
3POSIX消息队列的使用
POSIX消息队列可以通过以下两个函数来进行发送和接收消息:
#include <mqueue.h>
mqd_t mq_send(mqd_t mqdes, const char *msg_ptr,
size_t msg_len, unsigned msg_prio);
//成功返回0,出错返回-1
mqd_t mq_receive(mqd_t mqdes, char *msg_ptr,
size_t msg_len, unsigned *msg_prio);
//成功返回接收到消息的字节数,出错返回-1
#ifdef __USE_XOPEN2K
mqd_t mq_timedsend(mqd_t mqdes, const char *msg_ptr,
size_t msg_len, unsigned msg_prio,
const struct timespec *abs_timeout);
mqd_t mq_timedreceive(mqd_t mqdes, char *msg_ptr,
size_t msg_len, unsigned *msg_prio,
const struct timespec *abs_timeout);
#endif
mq_send向消息队列中写入一条消息,mq_receive从消息队列中读取一条消息。
mqdes:消息队列描述符;
msg_ptr:指向消息体缓冲区的指针;
msg_len:消息体的长度,其中mq_receive的该参数不能小于能写入队列中消息的最大大小,即一定要大于等于该队列的mq_attr结构中mq_msgsize的大小。如果mq_receive中的msg_len小于该值,就会返回EMSGSIZE错误。POXIS消息队列发送的消息长度可以为0。
msg_prio:消息的优先级;它是一个小于MQ_PRIO_MAX的数,数值越大,优先级越高。POSIX消息队列在调用mq_receive时总是返回队列中最高优先级的最早消息。如果消息不需要设定优先级,那么可以在mq_send是置msg_prio为0,mq_receive的msg_prio置为NULL。
还有两个XSI定义的扩展接口限时发送和接收消息的函数:mq_timedsend和mq_timedreceive函数。默认情况下mq_send和mq_receive是阻塞进行调用,可以通过mq_setattr来设置为O_NONBLOCK。
下面是消息队列使用的测试代码:
#include <iostream>
#include <cstring>
#include <errno.h>
#include <unistd.h>
#include <fcntl.h>
#include <mqueue.h>
using namespace std;
int main()
{
mqd_t mqID;
mqID = mq_open("/anonymQueue", O_RDWR | O_CREAT | O_EXCL, 0666, NULL);
if (mqID < 0)
{
if (errno == EEXIST)
{
mq_unlink("/anonymQueue");
mqID = mq_open("/anonymQueue", O_RDWR | O_CREAT, 0666, NULL);
}
else
{
cout<<"open message queue error..."<<strerror(errno)<<endl;
return -1;
}
}
if (fork() == 0)
{
mq_attr mqAttr;
mq_getattr(mqID, &mqAttr);
char *buf = new char[mqAttr.mq_msgsize];
for (int i = 1; i <= 5; ++i)
{
if (mq_receive(mqID, buf, mqAttr.mq_msgsize, NULL) < 0)
{
cout<<"receive message failed. ";
cout<<"error info:"<<strerror(errno)<<endl;
continue;
}
cout<<"receive message "<<i<<": "<<buf<<endl;
}
exit(0);
}
char msg[] = "yuki";
for (int i = 1; i <= 5; ++i)
{
if (mq_send(mqID, msg, sizeof(msg), i) < 0)
{
cout<<"send message "<<i<<" failed. ";
cout<<"error info:"<<strerror(errno)<<endl;
}
cout<<"send message "<<i<<" success. "<<endl;
sleep(1);
}
}
在Linux2.6.18下的执行结构如下:
send message 1 success.
receive message 1: yuki
send message 2 success.
receive message 2: yuki
send message 3 success.
receive message 3: yuki
send message 4 success.
receive message 4: yuki
send message 5 success.
receive message 5: yuki
4POSIX消息队列的限制
POSIX消息队列本身的限制就是mq_attr中的mq_maxmsg和mq_msgsize,分别用于限定消息队列中的最大消息数和每个消息的最大字节数。在前面已经说过了,这两个参数可以在调用mq_open创建一个消息队列的时候设定。当这个设定是受到系统内核限制的。
下面是在Linux2.6.18下shell对启动进程的POSIX消息队列大小的限制:
# ulimit -a |grep message
POSIX message queues (bytes, -q) 819200
限制大小为800KB,该大小是整个消息队列的大小,不仅仅是最大消息数*消息的最大大小;还包括消息队列的额外开销。前面我们知道Linux2.6.18下POSIX消息队列默认的最大消息数和消息的最大大小分别为:
mq_maxmsg = 10
mq_msgsize = 8192
为了说明上面的限制大小包括消息队列的额外开销,下面是测试代码:
#include <iostream>
#include <cstring>
#include <errno.h>
#include <unistd.h>
#include <fcntl.h>
#include <mqueue.h>
using namespace std;
int main(int argc, char **argv)
{
mqd_t mqID;
mq_attr attr;
attr.mq_maxmsg = atoi(argv[1]);
attr.mq_msgsize = atoi(argv[2]);
mqID = mq_open("/anonymQueue", O_RDWR | O_CREAT | O_EXCL, 0666, &attr);
if (mqID < 0)
{
if (errno == EEXIST)
{
mq_unlink("/anonymQueue");
mqID = mq_open("/anonymQueue", O_RDWR | O_CREAT, 0666, &attr);
if(mqID < 0)
{
cout<<"open message queue error..."<<strerror(errno)<<endl;
return -1;
}
}
else
{
cout<<"open message queue error..."<<strerror(errno)<<endl;
return -1;
}
}
mq_attr mqAttr;
if (mq_getattr(mqID, &mqAttr) < 0)
{
cout<<"get the message queue attribute error"<<endl;
return -1;
}
cout<<"mq_flags:"<<mqAttr.mq_flags<<endl;
cout<<"mq_maxmsg:"<<mqAttr.mq_maxmsg<<endl;
cout<<"mq_msgsize:"<<mqAttr.mq_msgsize<<endl;
cout<<"mq_curmsgs:"<<mqAttr.mq_curmsgs<<endl;
}
下面进行创建消息队列时设置最大消息数和消息的最大大小进行测试:
[root@idcserver program]# g++ -g test.cpp -lrt
[root@idcserver program]# ./a.out 10 81920
open message queue error...Cannot allocate memory
[root@idcserver program]# ./a.out 10 80000
open message queue error...Cannot allocate memory
[root@idcserver program]# ./a.out 10 70000
open message queue error...Cannot allocate memory
[root@idcserver program]# ./a.out 10 60000
mq_flags:0
mq_maxmsg:10
mq_msgsize:60000
mq_curmsgs:0
从上面可以看出消息队列真正存放消息数据的大小是没有819200B的。可以通过修改该限制参数,来改变消息队列的所能容纳消息的数量。可以通过下面方式来修改限制,但这会在shell启动进程结束后失效,可以将设置写入开机启动的脚本中执行,例如.bashrc,rc.local。
[root@idcserver ~]# ulimit -q 1024000000
[root@idcserver ~]# ulimit -a |grep message
POSIX message queues (bytes, -q) 1024000000
下面再次测试可以设置的消息队列的属性。
[root@idcserver program]# ./a.out 10 81920
mq_flags:0
mq_maxmsg:10
mq_msgsize:81920
mq_curmsgs:0
[root@idcserver program]# ./a.out 10 819200
mq_flags:0
mq_maxmsg:10
mq_msgsize:819200
mq_curmsgs:0
[root@idcserver program]# ./a.out 1000 8192
mq_flags:0
mq_maxmsg:1000
mq_msgsize:8192
mq_curmsgs:0
POSIX消息队列在实现上还有另外两个限制:
MQ_OPEN_MAX:一个进程能同时打开的消息队列的最大数目,POSIX要求至少为8;
MQ_PRIO_MAX:消息的最大优先级,POSIX要求至少为32;
Aug 7, 2013 AM 08:53 @lab
相关推荐
POSIX消息队列的读操作总是返回消息队列中优先级最高的最早消息,而对于SystemV消息队列可以返回任意指定优先级(通过消息类型)的消息。当向一个空消息队列中写入一个消息时,POSIX消息队列允许产生一个信号或启动...
目前主要有两种类型的消息队列:POSIX消息队列以及系统V消息队列,系统V消息队列目前被大量使用。系统V消息队列是随内核持续的,只有在内核重起或者人工删除时,该消息队列才会被删除。 二、相关函数 1. ...
Linux系统是一个免费使用和自由传播的类Unix操作系统,基于POSIX和UNIX的多用户、多任务、支持多线程和多CPU的操作系统。它继承了Unix以网络为核心的设计思想,是一个性能稳定的多用户网络操作系统,Linux是许多企业...
linux 进程间通信 posix 消息队列 实现 。(此为博客http://blog.csdn.net/shallnet 文章对应源码下载)
Linux下的进程通信手段基本上是从Unix平台上的进程通信手段继承而来的,最初Unix IPC包括:管道、FIFO、信号;System V IPC包括:System V消息队列、System V信号灯、System V共享内存区;Posix IPC包括: Posix消息...
进程通信 Linux进程间通信 一、进程间通信概述 进程通信有如下一些目的: A、数据传输:一个进程需要将它的数据发送给另一个进程,发送的数据量在一个字节到几M字节之间 B、共享数据:多个进程想要操作共享数据...
报文(Message)队列(消息队列):消息队列是消息的链接表,包括Posix消息队列system V消息队列。有足够权限的进程可以向队列中添加消息,被赋予读权限的进程则可以读走队列中的消息。消息队列克服了信号承载信息量...
【linux学习笔记-1】使用GDB调试简单的用户程序 【linux学习笔记-2】父子进程...【linux学习笔记--17】POSIX IPC——消息队列 【linux学习笔记--18】POSIX IPC——信号量 【linux学习笔记--19】POSIX IPC——共享内存
Linux网络编程之进程间通信篇 Linux网络编程之线程篇 Linux网络编程之TCP/IP基础篇 01TCPIP基础(一) ISO/OSI参考模型 TCP/IP四层模型 基本概念(对等通信、封装、分用、端口) 02TCPIP基础(二) 最大传输...
POSIX消息队列相关函数 POSIX消息队列示例 35POSIX共享内存 POSIX共享内存相关函数 POSIX共享内存示例 Linux网络编程之线程篇 36线程介绍 什么是线程 进程与线程 线程优缺点 线程模型 N:1用户线程模型 1:1核心线程...
Linux网络编程之进程间通信篇 Linux网络编程之线程篇 Linux网络编程之TCP/IP基础篇 01TCPIP基础(一) ISO/OSI参考模型 TCP/IP四层模型 基本概念(对等通信、封装、分用、端口) 02TCPIP基础(二) 最大传输...
Linux网络编程之进程间通信篇 Linux网络编程之线程篇 Linux网络编程之TCP/IP基础篇 01TCPIP基础(一) ISO/OSI参考模型 TCP/IP四层模型 基本概念(对等通信、封装、分用、端口) 02TCPIP基础(二) 最大传输...
Linux网络编程之进程间通信篇 Linux网络编程之线程篇 Linux网络编程之TCP/IP基础篇 01TCPIP基础(一) ISO/OSI参考模型 TCP/IP四层模型 基本概念(对等通信、封装、分用、端口) 02TCPIP基础(二) 最大传输...
【linux学习笔记--17】POSIX IPC——消息队列.doc 【linux学习笔记--18】POSIX IPC——信号量.doc 【linux学习笔记--19】POSIX IPC——共享内存.doc 【linux学习笔记-10】Linux进程相关系统调用(三).doc 【linux...
Linux网络编程之进程间通信篇 Linux网络编程之线程篇 Linux网络编程之TCP/IP基础篇 01TCPIP基础(一) ISO/OSI参考模型 TCP/IP四层模型 基本概念(对等通信、封装、分用、端口) 02TCPIP基础(二) 最大传输...
Linux网络编程之进程间通信篇 Linux网络编程之线程篇 Linux网络编程之TCP/IP基础篇 01TCPIP基础(一) ISO/OSI参考模型 TCP/IP四层模型 基本概念(对等通信、封装、分用、端口) 02TCPIP基础(二) 最大传输单元...
(3) 通过 Linux 管道通信机制、消息队列通信机制、共享内存通信机制的使用,加深 对不同类型的进程通信方式的理解。 (4) 通过对 linux 的 Posix 信号量的应用,加深对信号量同步机制的理解。 (5)请根据自身情况,...
5.8 使用内存映射I/O实现Posix消息队列 85 5.9 小结 101 习题 101 第6章 System V消息队列 103 6.1 概述 103 6.2 msgget函数 104 6.3 msgsnd函数 104 6.4 msgrcv函数 105 6.5 msgctl函数 106 6.6 简单的...
Linux进程间通信方式汇总 目前已包含的方式 管道(PIPE) FIFO(有名管道) XSI消息队列 XSI信号量 XSI共享内存 POSIX信号量 域套接字(Domain Socket) 信号(Signal) 互斥量(Mutex) 其中信号(signal)和信号量(semaphore)...