Очереди сообщений представляют собой связный список в адресном пространстве ядра. Сообщения могут посылаться в очередь по порядку и доставаться из очереди несколькими разными путями. Каждая очередь сообщений однозначно определена идентификатором IPC.
Ключом к полному осознанию такой сложной системы, как System V IPC , является более тесное знакомство с различными структурами данных, которые лежат внутри самого ядра. Даже для большинства примитивных операций необходим прямой доступ к некоторым из этих структур, хотя другие используются только на гораздо более низком уровне.
Первой структурой, которую мы рассмотрим, будет msgbuf . Его можно понимать как шаблон для данных сообщения. Поскольку данные в сообщении программист определяет сам, он обязан понимать, что на самом деле они являются структурой msgbuf. Его описание находится в <linux/msg.h>:
/* буфер сообщения для вызовов msgsnd и msgrcv*/ struct msgbuf { long mtype; /* тип сообщения */ char mtext[1]; /* текст сообщения */ };
Возможность приписывать тип конкретному сообщению позволяет держать в одной очереди разнородные сообщения. Это может понадобиться, например, когда сообщения процесса-клиента помечаются одним магическим числом, а сообщения сообщения процесса-сервера - другим; или приложение ставит в очередь сообщения об ошибках с типом 1, сообщения-запросы - с типом 2 и т.д. Ваши возможности просто безграничны.
С другой стороны, старайтесь дать наглядное имя элементу данных сообщения (в примере был mtext). В это поле можно записывать не только массивы литер, но и вообще любые данные в любой форме. Поле действительно полностью произвольно, поэтому вся структура может быть переопределена программистом, например, так:
struct my_msgbuf { long mtype; /* тип сообщения */ long request_id; /* идентификатор запроса */ struct client info; /* информация о клиенте */ }
Здесь мы также видим структуру сообщения, но второй элемент заменился на два, причем один из них - другая структура! В этом прелесть очередей сообщений, ядро не разбирает данные, какими бы они ни были.
Существует, однако, ограничение на максимальный размер сообщения. В LINUX он определен в <linux/msg.h>:
#define MSGMAX 4056 /* <= 4056 */ /* максимальный размер сообщения, в байтах*/
Сообщения не могут быть больше, чем 4056 байт, сюда входит и элемент mtype , который занимает 4 байта (long).
Ядро хранит сообщение в очереди структуры msg. Она определена в <linux/msg.h> следующим образом:
struct msg { struct msg *msg_next; /* следующее сообщение в очереди */ long msg_type; char *msg_spot; /* адрес текста сообщения */ short msg_ts; /* размер текста */ };
Каждый из трех типов IPC-объектов имеет внутреннее представление, которое поддерживается ядром. Для очередей сообщений это структура msqid_ds . Ядро создает, хранит и сопровождает образец такой структуры для каждой очереди сообщений в системе. Она определена в <linux/msg.h> следующим образом:
/* структура msqid для каждой очереди в системе */ struct msqid_ds { struct ipc_perm msg_perm; struct msg *msg_first; /* первое сообщение в очереди */ struct msg *msg_last; /* последнее сообщение в очереди */ time_t msg_stime; /* время последнего вызова msgsnd */ time_t msg_rtime; /* время последнего вызова msgrcv */ time_t msg_ctime; /* время последнего изменения */ struct wait_queue *wwait; struct wait_queue *rwait; ushort msg_cbytes; ushort msg_qnum; ushort msg_qbytes; /* максимальное число байтов на очередь */ ushort msg_lspid; /* pid последнего испустившего msgsnd */ ushort msg_lrpid; /* последний полученный pid */ };
Хотя большинство элементов этой структуры вас будет мало волновать, для какой-то законченности мы вкратце поясним каждый.
Информацию о доступе к IPC-объектам ядро хранит в структуре ipc_perm . Например, описанная выше структура очереди сообщений содержит одну структуру типа ipc_perm в качестве элемента. Следующее ее определение дано в <linux/ipc.h>.
struct ipc_perm { key_t key; ushort uid; /* euid и egid владельца */ ushort gid; ushort cuid; /* euid и egid создателя */ ushort cgid; ushort mode; /* режим доступа, см. режимные флаги ниже */ ushort seq; /* порядковый номер использования гнезда */ };
Все приведенное выше говорит само за себя. Сохраняемая отдельно вместе с ключом IPC-объекта информация содержит данные о владельце и создателе этого объекта (они могут различаться). Режимы восьмеричного доступа также хранятся здесь, как unsigned short. Наконец, сохраняется порядковый номер использования гнезда. Каждый раз когда IPC объект закрывается через системный вызов (уничтожается), этот номер уменьшается на максимальное число объектов IPC, которые могут находиться в системе. Касается вас это значение? Нет.
Системный вызов msgget() нужен для того, чтобы создать очередь сообщений или подключиться к существующей.
PROTOTYPE: int msgget( key_t key, int msgflg );
RETURNS: идентификатор очереди сообщений в случае успеха; -1 в случае ошибки.
Первый аргумент msgget() значение ключа (мы его получаем при помощи ftok()). Этот ключ сравнивается с ключами уже существующих в ядре очередей. При этом операция открытия или доступа к очереди зависит от содержимого аргумента msgflg :
Вызов msgget() с IPC_CREAT , но без IPC_EXCL всегда выдает идентификатор (существующей с таким ключом или созданной) очереди. Использование IPC_EXCL вместе с IPC_CREAT либо создает новую очередь, либо, если очередь уже существует, заканчивается неудачей. Самостоятельно IPC_EXCL бесполезен, но вместе c IPC_CREAT он дает гарантию, что ни одна из существующих очередей не открывается для доступа.
Восьмеричный режим может быть OR-нут в маску доступа. Каждый IPC-объект имеет права доступа, аналогичные правам доступа к файлу в файловой системе UNIX-а.
Напишем оберточную функцию для открытия или создания очереди сообщений.
int open_queue( key_t keyval ) { int qid; if ((qid = msgget ( keyval, IPC_CREAT | 0660 )) == -1) { return (-1); } return (qid); }
Отметьте использование точного ограничителя доступа 0660. Эта небольшая функция возвращает идентификатор очереди (int) или -1 в случае ошибки. Единственный требуемый аргумент - ключевое значение.
Получив идентификатор очереди, мы можем выполнять над ней различные действия. Чтобы поставить сообщение в очередь, используйте системный вызов msgsnd() :
PROTOTYPE: int msgsnd( int msqid, struct msgbuf *msgp, int msgsz, int msgflg );
RETURNS: 0 в случае успеха, -1 в случае ошибки:
Первый аргумент msgsnd - идентификатор нашей очереди, возвращенный предварительным вызовом msgget . Второй аргумент, msgp - это указатель на редекларированный и загруженный буфер сообщения. Аргумент msgsz содержит длину сообщения в байтах, не учитывая тип сообщения (long 4 байта). Аргумент msgflg может быть нулем или:
Напишем незатейливую оберточную функцию для посылки сообщения:
int send_message( int qid, struct mymsgbuf *qbuf ) { int result, length; /* Длина есть в точности размер структуры минус sizeof(mtype) */ length = sizeof(struct mymsgbuf) - sizeof(long); if((result = msgsnd( qid, qbuf, length, 0)) == -1) { return(-1); } return(result); }
Эта функция пытается послать сообщение, лежащее по указанному адресу (qbuf), в очередь сообщений, идентифицированную qid . Напишем небольшую утилиту с нашими двумя оберточными функциями:
#include <stdio.h> #include <stdlib.h> #include <linux/ipc.h> #include <linux/msg.h> main() { int qid; key_t msgkey; struct mymsgbuf { long mtype; /* тип сообщения */ int request; /* рабочий номер запроса */ double salary; /* зарплата */ } msg; /* Генерируем IPC-ключ */ msgkey = ftok(".", 'm'); /* Открываем/создаем очередь */ if (( qid = open_queue( msgkey)) == -1) { perror("open_queue"); exit(1); } /* Заполняем сообщение произвольными тестовыми данными */ msg.type = 1; /* тип сообщения должен быть положительным! */ msg.request = 1; /* элемент данных 1 */ msg.salary = 1000.00; /* элемент данных 2 (моя годовая зарплата! - авт.) */ /* Бомбим! */ if((send_message( qid, &msg )) == -1) { perror("send_message"); exit(1); } }
После создания/открытия нашей очереди принимаемся за загрузку буфера сообщения с тестовыми данными (заметьте отсутствие текстовых данных для иллюстрации нашего положения о посылке двоичной информации). Вызов нашего send_message ловко доставит сообщение в очередь.
Теперь, когда мы имеем сообщение в очереди, попытайтесь при помощи ipcs посмотреть на статус нашей очереди. Обсудим, как забрать из очереди сообщение. Для этого используется системный вызов msgrcv()
PROTOTYPE: int msgrcv( int msqid, struct msgbuf *msgp, int msgsz, long mtype, $$)
RETURNS: число байт, скопированных в буфер сообщения, -1 в случае ошибки.
Конечно, первый аргумент определяет очередь, из которой будет взято сообщение (должен быть возвращен сделанным предварительно вызовом msgget) . Второй аргумент ( msgp) представляет собой адрес буфера, куда будет положено изъятое сообщение. Третий аргумент, msgsz , ограничивает размер структуры-буфера без учета длины элемента mtype . Еще раз повторимся, это может быть легко вычислено:
msgsz = sizeof(struct mymsgbuf) - sizeof(long);
Четвертый аргумент, mtype - это тип сообщения, изымаемого из очереди. Ядро будет искать в очереди наиболее старое сообщение такого типа и вернет его копию по адресу, указанному аргументом msgp . Существует один особый случай: если mtype = 0, то будет возвращено наиболее старое сообщение, независимо от типа.
Если IPC_NOWAIT был послан флагом, и нет ни одного удовлетворительного сообщения, msgrcv вернет вызывающему процессу ENOMSG. В противном случае вызывающий процесс блокируется, пока в очередь не прибудет сообщение, соответствующее параметрам msgrcv().
Если, пока клиент ждет сообщения, очередь удаляется, то ему возвращается EIDRM. EINTR возвращается, если сигнал поступил, пока процесс находился на промежуточной стадии между ожиданием и блокировкой.
Давайте рассмотрим функцию-переходник для изъятия сообщения из нашей очереди.
int read_message( int qid, long type, struct mymsgbuf *qbuf ) { int result, length; /* Длина есть в точности размер структуры минус sizeof(mtype) */ length = sizeof(struct mymsgbuf) - sizeof(long); if((result = msgrcv( qid, qbuf, length, type, 0 )) == -1) { return(-1); } return(result); }
После успешного изъятия сообщения удаляется из очереди и его ярлык.
Бит MSG_NOERROR в msgflg предоставляет некоторые дополнительные возможности. Если физическая длина сообщения больше, чем msgsz , и MSG_NOERROR установлен, то сообщение обрезается и возвращается только msgsz байт. Нормальный же msgrcv() возвращает -1 (E2BIG), и сообщение остается в очереди до последующих запросов. Такое поведение можно использовать для создания другой оберточной функции, которая позволит нам "подглядывать" внутрь очереди, чтобы узнать, пришло ли сообщение, удовлетворяющее нашему запросу.
int peek_message( int qid, long type ) { int result, length; if((result = msgrcv( qid, NULL, 0, type, IPC_NOWAIT )) == -1) { if(errno == E2BIG) return(TRUE); } return(FALSE); }
Выше вы заметили отсутствие адреса буфера и длины. В этом конкретном случае мы хотели, чтобы вызов прошел неудачно. Однако мы проверили возвращение E2BIG, которое должно показать, существует ли сообщение затребованного типа. Оберточная функция возвращает TRUE в случае успеха, и FALSE - в противном случае. Отметьте также установленный флаг IPC_NOWAIT , который помешает блокировке, о которой мы говорили раньше.
Благодаря использованию функций-переходников вы имеете некий элегантный подход к созданию и использованию очередей сообщений в ваших приложениях. Теперь коснемся непосредственно манипулирования внутренними структурами, связанными с данной очередью сообщений.
Для осуществления контроля над очередью предназначен системный вызов msgсtl .
PROTOTYPE: int msgctl ( int msgqid, int cmd, struct msqid_ds *buf );
RETURNS: 0 в случае успеха,-1 в случае неудачи
Теперь из общих соображений ясно, что прямые манипуляции с внутреностями ядра могут привести к очень занимательным последствиям. К сожалению, по-настоящему весело будет только тому, кто любит вдребезги и с наслаждением крушить подсистему IPC . Однако при использовании msgctl() с некоторыми командами вероятность огорчительных результатов не очень велика. Вот их и рассмотрим.
Вернемся к нашему разговору о внутреннем представлении очереди сообщений: msqid_ds . Ядро держит экземпляр этой структуры для каждой очереди, существующей в системе. IPC_STAT дает возможность заиметь копию такой структуры для испытаний. Посмотрим на оберточную функцию, которая берет эту структуру и размещает копию по указанному адресу.
int get_queue_ds( int qid, struct msgqid_ds *qbuf ) { if( msgctl( qid, IPC_STAT, qbuf) == -1 ) { return(-1); } return(0); }
Если копирование во внутренний буфер невозможно, то вызывающей функции возвращается -1. Если же все прошло нормально, то возвращается 0, и посланный буфер должен содержать копию внутренней структуры данных для очереди с идентификатором qid.
Что же мы можем делать с полученной копией структуры? Единственное, что можно поменять, это элемент ipc_perm. Это права доступа очереди, информация о создателе и владельце очереди. Однако и отсюда менять позволено только mode, uid и gid .
Давайте напишем оберточную функцию, изменяющую режим доступа очереди. Режим должен быть передан как массив литер (например, "660").
int change_queue_mode( int qid, char *mode ) { struct msqid_ds tmpbuf; /* Берем текущую копию внутренней структуры */ get_queue_ds( qid, &tmpbuf ); /* Применяем уже известный прикол для изменения прав доступа */ sscanf(mode, "%ho", &tmpbuf.msg_perm.mode); /* Модернизируем внутреннюю структуру */ if( msgctl( qid, IPC_SET, &tmpbuf ) == -1 ) { return(-1); } return(0); }
Мы взяли текущую копию внутренней структуры данных посредством вызова нашей get_queue_ds ; затем sscanf() меняет элемент mode структуры msg_perm . Однако ничего не произойдет, пока msgctl c IPC_SET не обновил внутреннюю версию.
ОСТОРОЖНО! Изменяя права доступа, можно случайно лишить прав себя самого! Помните, что IPC-объекты не исчезают, пока они не уничтожены должным образом или не перезагружена система. Поэтому то, что вы не видите очереди ipcs-ом, не означает, что ее нет на самом деле.
После того, как сообщение взято из очереди, оно удаляется. Однако, как отмечалось ранее, IPC-объекты остаются в системе до персонального удаления или перезапуска всей системы. Поэтому наша очередь сообщений все еще существует в ядре и пригодна к употреблению в любое время, несмотря на то, что последнее его соообщение уже давно на небесах. Чтобы и душа нашей очереди с миром отошла к богам, нужен вызов msgctl() , использующий команду IPC_RMID :
int remove_queue( int qid ) { if( msgctl( qid, IPC_RMID, 0) == -1) { return(-1) } return(0); }
Эта функция-переходник возвращает 0, если очередь удалена без инцедентов, в противном случае ввыдается -1. Удаление очереди неделимо и попытка любого обращения к ней будет безуспешной.
Мало кто станет отрицать непосредственную выгоду от возможности в любой момент получить точную техническую информацию. Подобные материалы представляют собой мощный механизм для обучения и исследования новых областей. Однако, неплохо было бы добавить к технической информации и реальные примеры. Это непременно ускорит и укрепит процесс обучения.
До сей поры все то хорошее, что мы сделали - это оберточные функции для манипуляций с очередями сообщений. Хотя они чрезвычайно полезны, ими неудобно пользоваться для дальнейшего обучения и экспериментов. Существует средство, позволяющее работать с IPC-очередями из командной строки - msgtool() . Хотя msgtool() будет использован в целях обучения, он пригодится и реально при написании скриптов.
Поведение msgtool()-а зависит от аргументов командной строки, что удобно для вызова из скрипта shell . Позволяет делать все что угодно, от создания, посылки и получения сообщений до редактирования прав доступа и удаления очереди. Изначально данными сообщений могут быть только литерные массивы. Упражнение - измените это так, чтобы можно было посылать и другие данные.
Посылка сообщений - msgtool s (type) "text"
Изъятие сообщений - msgtool r (type)
Изменение прав доступа - msgtool m (mode)
Уничтожение очереди - msgtool d
Примеры
msgtool s 1 test msgtool s 5 test msgtool s 1 "This is test" msgtool r 1 msgtool d msgtool m 660
Следующее, что мы рассмотрим, это исходный текст msgtool . Его следует компилировать в версии системы, которая поддерживает System V IPC. Убедитесь в наличии System V IPC в ядре, когда будете пересобирать программу!
На полях отметим, что наша утилита будет всегда создавать очередь, если ее не было.
Замечание. Поскольку msgtool использует ftok() для генерации ключей IPC, вы можете нарваться на конфликты, связанные с директориями. Если вы где-то в скрипте меняете директории, то все это наверняка не сработает. Это обходится путем более явного указания пути в msgtool, вроде "/tmp/msgtool", или даже запроса пути из командной строки вместе с остальными аргументами.
/**************************************************************************** Excerpt from "Linux Programmer's Guide - Chapter 6" (C)opyright 1994-1995, Scott Burkett **************************************************************************** MODULE: msgtool.c **************************************************************************** Средство командной строки для работы со очередями сообщений в стиле SysV ****************************************************************************/ #include <stdio.h> #include <stdlib.h> #include <ctype.h> #include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h> #define MAX_SEND_SIZE 80 struct mymsgbuf { long mtype; char mtext[MAX_SEND_SIZE]; }; void send_message(int qid, struct mymsgbuf *qbuf, long type, char *text); void read_message(int qid, struct mymsgbuf *qbuf, long type); void remove_queue(int qid); void change_queue_mode(int qid, char *mode); void usage(void); int main(int argc, char *argv[]) { key_t key; int msgqueue_id; struct mymsgbuf qbuf; if(argc == 1) usage(); /* Создаем уникальный ключ через вызов ftok() */ key = ftok(".",'m'); /* Открываем очередь - при необходимости создаем */ if((msgqueue_id = msgget(key, IPC_CREAT|0660)) == -1) { perror("msgget"); exit(1); } switch(tolower(argv[1][0])) { case 's': send_message(msgqueue_id, (struct mymsg buf *)&qbuf, atol(argv[2]), argv[3]); break; case 'r': read_message(msgqueue_id, &qbuf, atol(argv[2])); break; case 'd': remove_queue(msgqueue_id); break; case 'm': change_queue_mode(msgqueue_id, argv[2]); break; default: usage(); } return(0); } void send_message(int qid, struct mymsgbuf *qbuf, long type, char *text) { /* Посылаем сообщение в очередь */ printf("Sending a message ...\n"); qbuf->mtype = type; strcopy(qbuf->mtext, text); if((msgsnd(qid, (struct msgbuf *)qbuf, strlen(qbuf->mtext)+1, 0)) == -1) { perror("msgsnd"); exit(1); } } void read_message(int qid, struct mymsgbuf *qbuf, long type) { /* Вычитываем сообщение из очереди */ printf("Reading a message ...\"); qbuf->mtype = type; msgrcv(qid, (struct msgbuf *)qbuf, MAX_SEND_SIZE, type, 0); printf("Type: %ld Text: %s\n", qbuf->mtype, qbuf->mtext); } void remove_queue(int qid) { /* Удаляем очередь */ msgctl(qid, IPC_RMID, 0); } void change_queue_mode(int qid, char *mode) { struct msqid_ds myqueue_ds; /* Получаем текущее состояние */ msgctl(qid, IPC_STAT, &myqueue_ds); /* Меняем состояние в копии внутренней структуры данных */ sscanf(mode, "%ho", &myqueue_ds.msg_perm.mode) /* Обновляем состояние в самой внутренней структуре данных */ msgctl(qid, IPC_SET, &myqueue_ds); } void usage(void) { fprintf(stderr, "msgtool - A utility for tinkering with msg queues\n"); fprintf(stderr, "\nUSAGE: msgtool (s)end\n"); fprintf(stderr, " (r)ecv \n"); fprintf(stderr, " (d)elete\n"); fprintf(stderr, " (m)ode \n"); exit(1); }