消息队列
既然已经有了共享内存,为什么还需要消息队列呢?这有多种原因,让我们尝试将其分解为多个点以进行简化 -
正如所理解的,一旦消息被进程接收到,它就不再可供任何其他进程使用。而在共享内存中,数据可供多个进程访问。
如果我们想以小消息格式进行通信。
当多个进程同时通信时,需要同步保护共享内存数据。
使用共享内存的读写频率较高,实现该功能会非常复杂。在这种情况下不值得使用。
如果不是所有进程都需要访问共享内存,而是只有极少数进程需要访问共享内存,那么最好用消息队列来实现。
如果我们要与不同的数据包进行通信,假设进程 A 向进程 B 发送消息类型 1,向进程 C 发送消息类型 10,向进程 D 发送消息类型 20。在这种情况下,使用消息队列实现会更简单。为了将给定消息类型简化为 1、10、20,它可以是 0 或 +ve 或 –ve,如下所述。
当然,消息队列的顺序是FIFO(先进先出)。插入队列中的第一条消息是第一条要检索的消息。
使用共享内存或消息队列取决于应用程序的需求及其利用效率。
使用消息队列的通信可以通过以下方式进行 -
由一个进程写入共享内存,由另一进程从共享内存读取。正如我们所知,读取也可以通过多个进程来完成。
由一个进程用不同的数据包写入共享内存,并由多个进程从共享内存中读取,即根据消息类型。
了解了有关消息队列的某些信息后,现在是时候检查支持消息队列的系统调用(System V)了。
要使用消息队列执行通信,请执行以下步骤 -
步骤 1 - 创建消息队列或连接到已存在的消息队列 (msgget())
步骤 2 - 写入消息队列 (msgsnd())
步骤 3 - 从消息队列中读取 (msgrcv())
步骤 4 - 对消息队列执行控制操作 (msgctl())
现在,让我们检查上述调用的语法和某些信息。
#include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h> int msgget(key_t key, int msgflg)
此系统调用创建或分配 System V 消息队列。需要传递以下参数 -
第一个参数 key 识别消息队列。密钥可以是任意值,也可以是从库函数 ftok() 导出的值。
第二个参数 shmflg 指定所需的消息队列标志,例如 IPC_CREAT(如果不存在则创建消息队列)或 IPC_EXCL(与 IPC_CREAT 一起使用来创建消息队列,如果消息队列已存在,则调用失败)。还需要通过权限。
注意- 有关权限的详细信息,请参阅前面的部分。
如果成功,此调用将返回一个有效的消息队列标识符(用于进一步调用消息队列),如果失败,则返回 -1。要了解失败的原因,请使用 errno 变量或 perror() 函数进行检查。
与此调用相关的各种错误有 EACCESS(权限被拒绝)、EEXIST(队列已存在,无法创建)、ENOENT(队列不存在)、ENOMEM(没有足够的内存来创建队列)等。
#include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h> int msgsnd(int msgid, const void *msgp, size_t msgsz, int msgflg)
该系统调用将消息发送/附加到消息队列(系统 V)中。需要传递以下参数 -
第一个参数 msgid 识别消息队列,即消息队列标识符。msgget() 成功时会收到标识符值
第二个参数 msgp 是指向消息的指针,发送给调用者,在以下形式的结构中定义 -
struct msgbuf { long mtype; char mtext[1]; };
变量 mtype 用于与不同消息类型进行通信,在 msgrcv() 调用中详细解释。变量 mtext 是一个数组或其他结构,其大小由 msgsz(正值)指定。如果未提及 mtext 字段,则将其视为零大小消息,这是允许的。
第三个参数 msgsz 是消息的大小(消息应以空字符结尾)
第四个参数 msgflg 指示某些标志,例如 IPC_NOWAIT(当在队列中找不到消息时立即返回)或 MSG_NOERROR(如果超过 msgsz 字节,则截断消息文本)
成功时此调用将返回 0,失败时返回 -1。要了解失败的原因,请使用 errno 变量或 perror() 函数进行检查。
#include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h> int msgrcv(int msgid, const void *msgp, size_t msgsz, long msgtype, int msgflg)
该系统调用从消息队列(系统 V)中检索消息。需要传递以下参数 -
第一个参数 msgid 识别消息队列,即消息队列标识符。msgget() 成功时会收到标识符值
第二个参数 msgp 是从调用者接收到的消息的指针。它在以下形式的结构中定义 -
struct msgbuf { long mtype; char mtext[1]; };
变量 mtype 用于与不同消息类型进行通信。变量 mtext 是一个数组或其他结构,其大小由 msgsz(正值)指定。如果未提及 mtext 字段,则将其视为零大小消息,这是允许的。
第三个参数 msgsz 是收到的消息的大小(消息应以空字符结尾)
第四个参数 msgtype 指示消息的类型 -
如果 msgtype 为 0 - 读取队列中第一条收到的消息
如果 msgtype 为 +ve - 读取类型为 msgtype 的队列中的第一条消息(如果 msgtype 为 10,则仅读取类型 10 的第一条消息,即使队列开头可能有其他类型)
如果 msgtype 为 –ve - 读取小于或等于消息类型绝对值的最低类型的第一条消息(例如,如果 msgtype 为 -5,则读取类型小于 5 的第一条消息,即消息类型从 1 到5)
第五个参数 msgflg 指示某些标志,例如 IPC_NOWAIT(当在队列中找不到消息时立即返回或 MSG_NOERROR(如果超过 msgsz 字节则截断消息文本)
如果成功,此调用将返回 mtext 数组中实际接收的字节数,如果失败,则返回 -1。要了解失败的原因,请使用 errno 变量或 perror() 函数进行检查。
#include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h> int msgctl(int msgid, int cmd, struct msqid_ds *buf)
该系统调用执行消息队列(系统V)的控制操作。需要传递以下参数 -
第一个参数 msgid 识别消息队列,即消息队列标识符。msgget() 成功时会收到标识符值
第二个参数 cmd 是对消息队列执行所需控制操作的命令。cmd 的有效值为 -
IPC_STAT - 将 struct msqid_ds 每个成员的当前值信息复制到 buf 指向的传递结构。该命令需要消息队列的读取权限。
IPC_SET - 设置结构 buf 指向的用户 ID、所有者组 ID、权限等。
IPC_RMID - 立即删除消息队列。
IPC_INFO - 返回有关 buf 指向的结构中的消息队列限制和参数的信息,该结构的类型为 struct msginfo
MSG_INFO - 返回一个 msginfo 结构,其中包含有关消息队列消耗的系统资源的信息。
第三个参数 buf 是指向名为 struct msqid_ds 的消息队列结构的指针。该结构的值将用于根据 cmd 设置或获取。
此调用将根据传递的命令返回值。IPC_INFO 和 MSG_INFO 或 MSG_STAT 成功返回消息队列的索引或标识符,或者对于其他操作返回 0,如果失败则返回 -1。要了解失败的原因,请使用 errno 变量或 perror() 函数进行检查。
了解了有关消息队列的基本信息和系统调用后,现在是时候使用程序进行检查了。
在查看程序之前让我们先看看描述 -
步骤 1 - 创建两个进程,一个用于发送到消息队列(msgq_send.c),另一个用于从消息队列中检索(msgq_recv.c)
步骤 2 - 使用 ftok() 函数创建密钥。为此,最初创建文件 msgq.txt 来获取唯一密钥。
步骤 3 - 发送过程执行以下操作。
读取用户输入的字符串
删除新行(如果存在)
发送到消息队列
重复该过程直到输入结束(CTRL + D)
收到输入结束后,发送消息“end”以表示流程结束
步骤 4 - 在接收过程中,执行以下操作。
- 从队列中读取消息
- 显示输出
- 如果收到的消息是“end”,则完成该过程并退出
为简单起见,我们在此示例中不使用消息类型。此外,一个进程正在向队列写入数据,另一进程正在从队列中读取数据。这可以根据需要进行扩展,即理想情况下,一个进程写入队列,多个进程从队列读取。
现在,让我们检查一下过程(消息发送到队列) – 文件:msgq_send.c
/* Filename: msgq_send.c */ #include <stdio.h> #include <string.h> #include <stdlib.h> #include <errno.h> #include <string.h> #include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h> #define PERMS 0644 struct my_msgbuf { long mtype; char mtext[200]; }; int main(void) { struct my_msgbuf buf; int msqid; int len; key_t key; system("touch msgq.txt"); if ((key = ftok("msgq.txt", 'B')) == -1) { perror("ftok"); exit(1); } if ((msqid = msgget(key, PERMS | IPC_CREAT)) == -1) { perror("msgget"); exit(1); } printf("message queue: ready to send messages.\n"); printf("Enter lines of text, ^D to quit:\n"); buf.mtype = 1; /* we don't really care in this case */ while(fgets(buf.mtext, sizeof buf.mtext, stdin) != NULL) { len = strlen(buf.mtext); /* remove newline at end, if it exists */ if (buf.mtext[len-1] == '\n') buf.mtext[len-1] = '\0'; if (msgsnd(msqid, &buf, len+1, 0) == -1) /* +1 for '\0' */ perror("msgsnd"); } strcpy(buf.mtext, "end"); len = strlen(buf.mtext); if (msgsnd(msqid, &buf, len+1, 0) == -1) /* +1 for '\0' */ perror("msgsnd"); if (msgctl(msqid, IPC_RMID, NULL) == -1) { perror("msgctl"); exit(1); } printf("message queue: done sending messages.\n"); return 0; }
编译和执行步骤
message queue: ready to send messages. Enter lines of text, ^D to quit: this is line 1 this is line 2 message queue: done sending messages.
以下是消息接收过程的代码(从队列中检索消息) - 文件:msgq_recv.c
/* Filename: msgq_recv.c */ #include <stdio.h> #include <stdlib.h> #include <errno.h> #include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h> #define PERMS 0644 struct my_msgbuf { long mtype; char mtext[200]; }; int main(void) { struct my_msgbuf buf; int msqid; int toend; key_t key; if ((key = ftok("msgq.txt", 'B')) == -1) { perror("ftok"); exit(1); } if ((msqid = msgget(key, PERMS)) == -1) { /* connect to the queue */ perror("msgget"); exit(1); } printf("message queue: ready to receive messages.\n"); for(;;) { /* normally receiving never ends but just to make conclusion /* this program ends wuth string of end */ if (msgrcv(msqid, &buf, sizeof(buf.mtext), 0, 0) == -1) { perror("msgrcv"); exit(1); } printf("recvd: \"%s\"\n", buf.mtext); toend = strcmp(buf.mtext,"end"); if (toend == 0) break; } printf("message queue: done receiving messages.\n"); system("rm msgq.txt"); return 0; }
编译和执行步骤
message queue: ready to receive messages. recvd: "this is line 1" recvd: "this is line 2" recvd: "end" message queue: done receiving messages.