UNIX: взаимодействие процессов — страница 6 из 35

Очереди сообщений Posix

5.1. Введение

Очередь сообщений можно рассматривать как связный список сообщений. Программные потоки с соответствующими разрешениями могут помещать сообщения в очередь, а потоки с другими соответствующими разрешениями могут извлекать их оттуда. Каждое сообщение представляет собой запись (вспомните сравнение потоков и сообщений в разделе 4.10), и каждому сообщению его отправителем присваивается приоритет. Для записи сообщения в очередь не требуется наличия ожидающего его процесса. Это отличает очереди сообщений от программных каналов и FIFO, в которые нельзя произвести запись, пока не появится считывающий данные процесс.

Процесс может записать в очередь какие-то сообщения, после чего они могут быть получены другим процессом в любое время, даже если первый завершит свою работу. Мы говорим, что очереди сообщений обладают живучестью ядра (kernel persistence, раздел 1.3). Это также отличает их от программных каналов и FIFO. В главе 4 говорится о том, что данные, остающиеся в именованном или неименованном канале, сбрасываются, после того как все процессы закроют его.

В этой главе рассматриваются очереди сообщений стандарта Posix, а в главе 6 — стандарта System V. Функции для работы с ними во многом схожи, а главные отличия заключаются в следующем:

■ операция считывания из очереди сообщений Posix всегда возвращает самое старое сообщение с наивысшим приоритетом, тогда как из очереди System V можно считать сообщение с произвольно указанным приоритетом;

■ очереди сообщений Posix позволяют отправить сигнал или запустить программный поток при помещении сообщения в пустую очередь, тогда как для очередей System V ничего подобного не предусматривается.

Каждое сообщение в очереди состоит из следующих частей:

■ приоритет (беззнаковое целое, Posix) либо тип сообщения (целое типа long, System V);

■ длина полезной части сообщения, которая может быть нулевой;

■ собственно данные (если длина сообщения отлична от 0).

Этим очереди сообщений отличаются от программных каналов и FIFO. Последние две части сообщения представляют собой байтовые потоки, в которых отсутствуют границы между сообщениями и никак не указывается их тип. Мы обсуждали этот вопрос в разделе 4.10 и добавили свой собственный интерфейс для пересылки сообщений по программным каналам и FIFO. На рис. 5.1 показан возможный вид очереди сообщений.

Рис. 5.1. Очередь сообщений Posix, содержащая три сообщения


Мы предполагаем реализацию через связный список, причем его заголовок содержит два атрибута очереди: максимально допустимое количество сообщений в ней и максимальный размер сообщения. Об этих атрибутах мы расскажем более подробно в разделе 5.3.

В этой главе мы используем метод, к которому будем прибегать и в дальнейшем, рассматривая очереди сообщений, семафоры и разделяемую память. Поскольку все эти объекты IPC обладают по крайней мере живучестью ядра (вспомните раздел 1.3), мы можем писать небольшие программы, использующие эти методы для экспериментирования с ними и получения большей информации о том, как они работают. Например, мы можем написать программу, создающую очередь сообщений Posix, а потом написать другую программу, которая помещает сообщение в такую очередь, а потом еще одну, которая будет считывать сообщения из очереди. Помещая в очередь сообщения с различным приоритетом, мы увидим, в каком порядке они будут возвращаться функцией mq_receive.

5.2. Функции mq_open, mq_close, mq_unlink

Функция mq_open создает новую очередь сообщений либо открывает существующую:

#include 

mqd_t mq_open(const char *name, int oflag, …

/* mode_t mode, struct mq_attr *attr*/ );

/* Возвращает дескриптор очереди в случае успешного завершения;

 –1 – в противном случае. */

Требования к аргументу пате описаны в разделе 2.2.

Аргумент oflag может принимать одно из следующих значений: O_RDONLY, O_WRONLY, O_RDWR в сочетании (логическое сложение) с O_CREAT, O_EXCL, O_NONBLOCK. Все эти флаги описаны в разделе 2.3.

При создании новой очереди (указан флаг O_CREAT и очередь сообщений еще не существует) требуется указание аргументов mode и attr. Возможные значения аргумента mode приведены в табл. 2.3. Аргумент attr позволяет задать некоторые атрибуты очереди. Если в качестве этого аргумента задать нулевой указатель, очередь будет создана с атрибутами по умолчанию. Эти атрибуты описаны в разделе 5.3.

Возвращаемое функцией mq_open значение называется дескриптором очереди сообщений, но оно не обязательно должно быть (и, скорее всего, не является) небольшим целым числом, как дескриптор файла или программного сокета. Это значение используется в качестве первого аргумента оставшихся семи функций для работы с очередями сообщений.

ПРИМЕЧАНИЕ

В системе Solaris 2.6 тип mqd_t определен как void*, а в Digital Unix 4.0B — как int. В нашем примере в разделе 5.8 эти дескрипторы трактуются как указатели на структуру. Название «дескриптор» было дано им по ошибке. 

Открытая очередь сообщений закрывается функцией mq_close:

#include 

int mq_close(mqd_t mqdes);

/*Возвращает 0 в случае успешного завершения. –1 в случае ошибки */

По действию эта функция аналогична close для открытого файла: вызвавший функцию процесс больше не может использовать дескриптор, но очередь сообщений из системы не удаляется. При завершении процесса все открытые очереди сообщений закрываются, как если бы для каждой был сделан вызов mq_close.

Для удаления из системы имени (пате), которое использовалось в качестве аргумента при вызове mq_open, нужно использовать функцию mq_unlink:

#include 

int mq_unlink(const char *name);

/* Возвращает 0 в случае успешного завершения. –1 в случае ошибки */

Для очереди сообщений (как и для файла) ведется подсчет числа процессов, в которых она открыта в данный момент, и по действию эта функция аналогична unlink для файла: имя (пате) может быть удалено из системы, даже пока число подключений к очереди отлично от нуля, но удаление очереди (в отличие от удаления имени из системы) не будет осуществлено до того, как очередь будет закрыта последним использовавшим ее процессом.

Очереди сообщений Posix обладают по меньшей мере живучестью ядра (раздел 1.3), то есть они продолжают существовать, храня все имеющиеся в них сообщения, даже если нет процессов, в которых они были бы открыты. Очередь существует, пока она не будет удалена явно с помощью mq_unlink.

ПРИМЕЧАНИЕ

Мы увидим, что если очередь сообщений реализована через отображаемые в память файлы (раздел 12.2), она может обладать живучестью файловой системы, но это не является обязательным и рассчитывать на это нельзя.

Пример: программа mqcreate1

Поскольку очереди сообщений Posix обладают по крайней мере живучестью ядра, можно написать набор небольших программ для работы с ними — с этими программами будет проще экспериментировать. Программа из листинга 5.1[1] создает очередь сообщений, имя которой принимается в качестве аргумента командной строки.

Листинг 5.1. Создание очереди сообщений (указан флаг O_EXCL)

//pxmsg/mqcreate1.с

1  #include "unpipc.h"

2  int

3  main(int argc, char **argv)

4  {

5   int с flags:

6   mqd_t mqd;

7   flags = O_RDWR | O_CREAT;

8   while ((c = Getopt(argc, argv, "e")) != –1) {

9    switch (c) {

10   case 'e':

11    flags |= O_EXCL;

12    break;

13   }

14  }

15  if (optind != argc – 1)

16   err_quit("usage: mqcreate [ –e ] ");

17  mqd = Mq_open(argv[optind], flags, FILE_MODE, NULL);

18  Mq_close(mqd);

19  exit(0);

20 }

В командной строке можно указать параметр –е, управляющий исключающим созданием очереди. (О функции getopt и нашей обертке Getopt рассказано более подробно в комментарии к листингу 5.5.) При возвращении функция getopt сохраняет в переменной optind индекс следующего аргумента, подлежащего обработке.

Мы вызываем функцию mq_open, указывая ей в качестве имени IPC полученный из командной строки параметр, не обращаясь к рассмотренной нами в разделе 2.2 функции px_ipc_name. Это даст нам возможность узнать, как в данной реализации обрабатываются имена Posix IPC (мы используем для этого наши маленькие тестовые программы на протяжении всей книги).

Ниже приведен результат работы программы в Solaris 2.6:

solaris % mqcreate1 /temp.1234    очередь успешно создается

solaris % ls -l /tmp/.*1234

-rw-rw-rw– 1 rstevens other1 132632 Oct 23 17:08 /tmp/.MQDtemp.1234

-rw-rw-rw– 1 rstevens other1      0 Oct 23 17:08 /tmp/.MQLtemp.1234

-rw-r--r-- 1 rstevens other1      0 Oct 23 17:08 /tmp/.MQPDtemp.1234

solaris % mqcreate1 –e /temp.1234 очередь уже создана

mq_open error for /temp.1234: File exists

Мы назвали эту версию программы mqcreate1, поскольку она будет улучшена в листинге 5.4, после того как мы обсудим использование атрибутов очереди. Разрешения на доступ к третьему файлу определяются константой FILE_MODE (чтение и запись для пользователя, только чтение для группы и прочих пользователей), но у двух первых файлов разрешения отличаются. Можно предположить, что в файле с буквой D в имени хранятся данные; файл с буквой L представляет собой какую-то блокировку, а в файле с буквой Р хранятся разрешения.

В Digital Unix 4.0B мы указываем действительное имя создаваемого файла:

alpha % mqcreate1 /tmp/myq.1234    очередь успешно создается

alpha % ls –l /tmp/myq.1234

-rw-r--r-- 1 rstevens system 11976 Oct 23 17:04 /tmp/myq.1234

alpha % mqcreate1 –e /tmp/myq.1234 очередь уже создана

mq_open error for /tmp/myq.1234: File exists

Пример: программа mqunlink

В листинге 5.2 приведена программа mqunlink, удаляющая из системы очередь сообщений.

Листинг 5.2. Удаление очереди из системы: mqunlink

//pxmsg/mqunlink.c

1 #include "unpipc.h"

2 int

3 main(int argc, char **argv)

4 {

5  if (argc != 2)

6   err_quit("usage: mqunlink ");

7  Mq_unlink(argv[1]);

8  exit(0);

9 }

С помощью этой программы мы можем удалить очередь сообщений, созданную программой mqcreate1:

solaris % mqunlink /temp.1234

При этом будут удалены все три файла из каталога /tmp, которые относятся к этой очереди.

5.3. Функции mq_getattr и mq_setattr

У каждой очереди сообщений имеются четыре атрибута, которые могут быть получены функцией mq_getattr и установлены (по отдельности) функцией mq_setattr:

#include 

int mq_getattr(mqd_t mqdes, struct mq_attr *attr);

int mq_setattr(mqd_t mqdes, const struct mq_attr *attr, struct mq_attr *oattr);

/* Обе функции возвращают 0 в случае успешного завершения; –1 – в случае возникновения ошибок */

Структура mq_attr хранит в себе эти четыре атрибута:

struct mq_attr {

 long mq_flags;   /* флаг очереди: 0, O_NONBLOCK */

 long mq_maxmsg;  /* максимальное количество сообщений в очереди */

 long mq_msgsize; /* максимальный размер сообщения (в байтах) */

 long mq_curmsgs; // текущее количество сообщений в очереди

}

Указатель на такую структуру может быть передан в качестве четвертого аргумента mq_open, что дает возможность установить параметры mq_maxmsg и mq_msgsize в момент создания очереди. Другие два поля структуры функцией mq_open игнорируются.

Функция mq_getattr присваивает полям структуры, на которую указывает attr, текущие значения атрибутов очереди.

Функция mq_setattr устанавливает атрибуты очереди, но фактически используется только поле mqflags той структуры, на которую указывает attr, что дает возможность сбрасывать или устанавливать флаг запрета блокировки. Другие три поля структуры игнорируются: максимальное количество сообщений в очереди и максимальный размер сообщения могут быть установлены только в момент создания очереди, а количество сообщений в очереди можно только считать, но не изменить.

Кроме того, если указатель oattr ненулевой, возвращаются предыдущие значения атрибутов очереди (mq_flags, mq_maxmsg, mq_msgsize) и текущий статус очереди (mq_curmsgs).

Пример: программа mqgetattr

Программа из листинга 5.3 открывает указанную очередь сообщений и выводит значения ее атрибутов.

Листинг 5.3. Получение и вывод значений атрибутов очереди сообщений

//pxmsg/mqgetattr.c

1  #include "unpipc.h"

2  int

3  main(int argc, char **argv)

4  {

5   mqd_t mqd;

6   struct mq_attr attr;

7   if (argc != 2)

8    err_quit("usage: mqgetattr ");

9   mqd = Mq_open(argv[1], O_RDONLY);

10  Mq_getattr(mqd, &attr);

11  printf ("max #msgs = %ld, max #bytes/msg = %ld, "

12   "#currently on queue = %ld\n",

13   attr.mq_maxmsg, attr.mq_msgsize, attr.mq_curmsgs);

14  Mq_close(mqd);

15  exit(0);

16 }

Мы можем создать очередь сообщений и вывести значения ее атрибутов, устанавливаемые по умолчанию:

solaris % mqcreate1 /hello.world

solaris % mqgetattr /hello.world

max #msgs = 128, max #bytes/msg = 1024, #currently on queue = 0

Вспомним размер одного из файлов очереди, созданной с использованием устанавливаемых по умолчанию значений атрибутов. Он был выведен командой ls в примере после листинга 5.1. Это значение можно получить как 128×1024+1560 = 132632.

Добавочные 1560 байт представляют собой, скорее всего, дополнительную информацию: 8 байт на сообщение плюс добавочные 536 байт.

Пример: программа mqcreate

Мы можем изменить программу из листинга 5.1 таким образом, чтобы при создании очереди иметь возможность указывать максимальное количество сообщений и максимальный размер сообщения. Мы не можем указать только один из этих параметров; нужно обязательно задать оба (см., впрочем, упражнение 5.1). В листинге 5.4 приведен текст новой программы.

Листинг 5.4. Усовершенствованная программа mqcreate

//pxmsg/mqcreate.c

1  #include "unpipc.h"

2  struct mq_attr attr; /* mq_maxmsg и mq_msgsize инициализируются О */

3  int

4  main(int argc, char **argv)

5  {

6   int с flags;

7   mqd_t mqd;

8   flags = O_RDWR | O_CREAT;

9   while ((c = Getopt(argc, argv, "em:z:")) != –1) {

10   switch (c) {

11   case 'e':

12    flags |= O_EXCL;

13    break;

14   case 'm':

15    attr.mq_maxmsg = atol(optarg);

16    break;

17   case 'z':

18    attr.mq_msgsize = atol(optarg);

19    break;

20   }

21  }

22  if (optind != argc – 1)

23   err_quit("usage: mqcreate [ –е ] [ –m maxmsg –z msgsize ] ");

24  if ((attr.mq_maxmsg != 0 && attr.mq_msgsize ==0) ||

25   (attr.mq_maxmsg == 0 && attr.mq_msgsize != 0))

26   err_quit("must specify both –m maxmsg and –z msgsize");

27  mqd = Mq_open(argv[optind], flags, FILE_MODE,

28   (attr.mq_maxmsg != 0) ? &attr : NULL);

29  Mq_close(mqd);

30  exit(0);

31 }

Параметр командной строки, требующий аргумента, указывается с помощью двоеточия (после параметров m и z в вызове getopt). В момент обработки символа параметр optarg указывает на аргумент.

ПРИМЕЧАНИЕ

Наша обертка Getopt вызывает стандартную библиотечную функцию getopt и завершает выполнение процесса в случае возникновения ошибок в ее работе: при появлении параметра, не указанного в третьем аргументе при вызове функции, или при наличии параметра без необходимого числового аргумента (потребность в нем указывается с помощью двоеточия после буквы параметра в третьем аргументе функции getopt). В любом случае, getopt помещает сообщение об ошибке в стандартный поток сообщений об ошибках и возвращает ошибку, что приводит к завершению работы оберткой Getopt. В двух приведенных ниже примерах ошибка обнаруживается функцией getopt:

solaris %mqcreate –z

mqcreate: option requires an argument – z

solaris %mqcreate –q

mqcreate: illegal option – q

В следующем примере ошибка (не указан необходимый аргумент — имя очереди) обнаруживается самой программой:

solaris %mqcreate

usage: mqcreate [ –e ] [ –m maxmsg –z msgsize ] 

Если не указан ни один из двух новых параметров, мы должны передать функции mq_open пустой указатель в качестве последнего аргумента. В противном случае мы передаем указатель на нашу структуру attr.

Запустим теперь новую версию нашей программы в системе Solaris 2.6, указав максимальное количество сообщений 1024 и максимальный размер сообщения 8192 байт:

solaris % mqcreate –e –m 1024 -z 8192 /foobar

solaris % ls –al /tmp/.*foobar

-rw-rw-rw– 1 rstevens other1 8397336 Oct 25 11:29 /tmp/.MQDfoobar

–rw-rw-rw– 1 rstevens other1       0 Oct 25 11:29 /tmp/.MQLfoobar

–rw-r--r-- 1 rstevens other1       0 Oct 25 11:29 /tmp/.MQPfoobar

Размер файла, содержащего данные этой очереди, соответствует максимальному количеству сообщений в очереди и максимальному размеру сообщения (1024×8192 = 8388608), а оставшиеся 8728 байт предусматривают 8 байт информации на каждое сообщение (8×1024) плюс дополнительные 536 байт. 

При выполнении той же программы в Digital Unix 4.0B получим:

alpha % mqcreate –m 256 -z 2048 /tmp/bigq

alpha % ls-l/tmp/bigq

-rw-r--r-- 1 rstevens system 537288 Oct 25 15:38 /tmp/bigq

В этой реализации размер очереди соответствует максимальному количеству сообщений и максимальному размеру сообщения (256×2048 = 524288), а оставшиеся 13000 байт дают возможность хранить 48 байт добавочной информации для каждого сообщения (48×256) и еще 712 байт.

5.4. Функции mqsend и mqreceive

Эти две функции предназначены для помещения сообщений в очередь и получения их оттуда. Каждое сообщение имеет свой приоритет, который представляет собой беззнаковое целое, не превышающее MQ_PRIO_MAX. Стандарт Posix требует, чтобы эта величина была не меньше 32.

ПРИМЕЧАНИЕ

В Solaris 2.6 значение MQ_PRIO_MAX равняется именно 32, но в Digital Unix 4.0B этот предел равен уже 256. В листинге 5.7 мы покажем, как получить эти значения.

Функция mq_receive всегда возвращает старейшее в указанной очереди сообщение с максимальным приоритетом, и приоритет может быть получен вместе с содержимым сообщения и его длиной.

ПРИМЕЧАНИЕ

Действие mq_receive отличается от действия msgrcv в System V (раздел 6.4). Сообщения System V имеют поле type, аналогичное по смыслу приоритету, но для функции msgrcv можно указать три различных алгоритма возвращения сообщений: старейшее сообщение в очереди, старейшее сообщение с указанным типом или старейшее сообщение с типом, не превышающим указанного значения.

#include 

int mq_send(mqd_t mqdes, const char *ptr, size_t len, unsigned int prio);

/* Возвращает 0 в случае успешного завершения, –1 – в случае возникновения ошибок */

ssize_t mq_receive(mqd_t mqdes, char *ptr, size_t len, unsigned int *priop);

/* Возвращает количество байтов в сообщении в случае успешного завершения. –1 – в случае ошибки */

Первые три аргумента обеих функций аналогичны первым трем аргументам функций write и read соответственно.

ПРИМЕЧАНИЕ

Объявление указателя на буфер как char* кажется ошибкой — тип void* больше соответствовал бы по духу прочим функциям Posix.1. 

Значение аргумента len функции mq_receive должно быть по крайней мере не меньше максимального размера сообщения, которое может быть помещено в очередь, то есть значения поля mq_msgsize структуры mq_attr для этой очереди. Если len оказывается меньше этой величины, немедленно возвращается ошибка EMSGSIZE. 

ПРИМЕЧАНИЕ

Это означает, что большинству приложений, использующих очереди сообщений Posix, придется вызывать mq_getattr после открытия очереди для определения максимального размера сообщения, а затем выделять память под один или несколько буферов чтения этого размера. Требование, чтобы буфер был больше по размеру, чем максимально возможное сообщение, позволяет функции mq_receive не возвращать уведомление о том, что размер письма превышает объем буфера. Сравните это, например, с флагом MSG_NOERROR и ошибкой E2BIG для очередей сообщений System V (раздел 6.4) и флагом MSG_TRUNC для функции recvmsg, используемой с дейтаграммами UDP (раздел 13.5 [24]). 

Аргумент prio устанавливает приоритет сообщения для mq_send, его значение должно быть меньше MQ_PRIO_MAX. Если при вызове mq_receive priop является ненулевым указателем, в нем сохраняется приоритет возвращаемого сообщения. Если приложению не требуется использование различных приоритетов сообщений, можно указывать его равным нулю для mq_send и передавать mq_receive нулевой указатель в качестве последнего аргумента.

ПРИМЕЧАНИЕ

Разрешена передача сообщений нулевой длины. Это тот случай, когда важно не то, о чем говорится в стандарте (Posix.1), а то, о чем в нем не говорится: нигде не запрещена передача сообщений нулевой длины. Функция mq_receive возвращает количество байтов в сообщении (в случае успешного завершения работы) или –1 в случае возникновения ошибок, так что 0 обозначает сообщение нулевой длины. 

Очередям сообщений Posix и System V не хватает полезной функции: получатель не может определить отправителя сообщения. Эта информация могла бы пригодиться многим приложениям. К сожалению, большинство механизмов передачи сообщений IPC не позволяют определить отправителя сообщений. В разделе 15.5 мы расскажем, как эта возможность обеспечивается для дверей. В разделе 14.8 [24] описано, как эта возможность обеспечивается в BSD/OS для доменных сокетов Unix. В разделе 15.3.1 [21] описано, как SVR4 передает информацию об отправителе по каналу при передаче по нему дескриптора. В настоящее время методы BSD/OS широко используются, и хотя реализация SVR4 является частью стандарта Unix 98, она требует передачи дескриптора по каналу, что обычно является более дорогостоящей операцией, чем просто передача данных. Мы не можем предоставить отправителю возможность передать информацию о себе (например, эффективный идентификатор пользователя) в самом сообщении, поскольку мы не можем быть уверены, что эта информация окажется истинной. Хотя разрешения доступа к очереди сообщений определяют, имеет ли право отправитель помещать в нее сообщения, это все равно не дает однозначности. Существует возможность создавать одну очередь для каждого отправителя (о которой рассказывается в связи с очередями System V в разделе 6.8), но это плохо подходит для больших приложений. Наконец, если функции для работы с очередями сообщений реализованы как пользовательские функции (как мы показываем в разделе 5.8), а не как часть ядра, мы не можем доверять никакой информации об отправителе, передаваемой с сообщением, так как ее легко подделать. 

Пример: программа mqsend

В листинге 5.5 приведен текст программы, помещающей сообщение в очередь.

Листинг 5.5. Программа mqsend

//pxmsg/mqsend.c

1  #include "unpipc.h"

2  int

3  main(int argc, char **argv)

4  {

5   mqd_t mqd;

6   void *ptr;

7   size_t len;

8   uint_t prio;

9   if (argc != 4)

10   err_quit("usage: mqsend <#bytes>");

11  len = atoi(argv[2]);

12  prio = atoi(argv[3]);

13  mqd = Mq_open(argv[1], O_WRONLY);

14  ptr = Calloc(len, sizeof (char));

15  Mq_send(mqd, ptr, len, prio);

16  exit(0);

17 }

И размер сообщения, и его приоритет являются обязательными аргументами командной строки. Буфер под сообщение выделяется функцией callос, которая инициализирует его нулем.

Пример: программа mqreceive

Программа в листинге 5.6 считывает сообщение из очереди.

Листинг 5.6. Программа mqreceive

//pxmsg/mqreceive.с

1  #include "unpipc.h"

2  int

3  main(int argc, char **argv)

4  {

5   int с flags;

6   mqd_t mqd;

7   ssize_t n;

8   uint_t prio;

9   void *buff;

10  struct mq_attr attr;

11  flags = O_RDONLY;

12  while ((c = Getopt(argc, argv, "n")) != –1) {

13   switch (c) {

14   case 'n':

15    flags |= O_NONBLOCK;

16    break;

17   }

18  }

19  if (optind != argc – 1)

20   err_quit("usage: mqreceive [ –n ] ");

21  mqd = Mq_open(argv[optind], flags);

22  Mq_getattr(mqd, &attr);

23  buff = Malloc(attr.mqjnsgsize);

24  n = Mq_receive(raqd, buff, attr.mq_msgsize, &prio);

25  printf("read %ld bytes, priority = %u\n", (long) n, prio);

26  exit(0);

27 }

Параметр -n запрещает блокировку

14-17 Параметр командной строки –n отключает блокировку. При этом программа возвращает сообщение об ошибке, если в очереди нет сообщений.

Открытие очереди и получение атрибутов

21-25 Мы открываем очередь и получаем ее атрибуты, вызвав mq_getattr. Нам обязательно нужно определить максимальный размер сообщения, потому что мы должны выделить буфер подходящего размера, чтобы вызвать mq_receive. Программа выводит размер считываемого сообщения и его приоритет.

ПРИМЕЧАНИЕ

Поскольку n имеет тип size_t и мы не знаем, int это или long, мы преобразуем эту величину к типу long и используем строку формата %ld. В 64-разрядной реализации int будет 32-разрядным целым, a long и size_t будут 64-разрядными целыми.

Воспользуемся обеими программами, чтобы проиллюстрировать использование поля приоритета.

solaris % mqcreate /test1

solaris % mqgetattr /test1        создаем очередь и смотрим на ее атрибуты

max #msgs = 128, max #bytes/msg = 1024, #currently on queue = 0

solaris % mqsend /test1 100 99999 отправка с некорректным значением приоритета

mq_send error: Invalid argument

solaris % mqsend /test1 100 6     100 байт, приоритет 6

solaris % mqsend /test1 50 18     50 байт, приоритет 18

solaris % mqsend /test1 33 18     33 байт, приоритет 18

solaris % mqreceive /test1

read 50 bytes, priority = 18         возвращается старейшее сообщение с

solaris % mqreceive /test1        наивысшим приоритетом

read 33 bytes, priority = 18

Solaris % mqreceive /test1

read 100 bytes, priority = 6

Solaris % mqreceive –n /test1     отключаем блокировку и убеждаемся, что очередь пуста

mq_receive error: Resource temporarily unavailable

Мы видим, что mq_receive действительно возвращает старейшее сообщение с наивысшим приоритетом. 

5.5. Ограничения очередей сообщений

Мы уже сталкивались с двумя ограничениями, устанавливаемыми для любой очереди в момент ее создания:

■ mq_maxmsg — максимальное количество сообщений в очереди;

■ mq_msgsize — максимальный размер сообщения.

Не существует каких-либо ограничений на эти значения, хотя в рассматриваемых реализациях необходимо наличие в файловой системе места для файла требуемого размера. Кроме того, ограничения на эти величины могут накладываться реализацией виртуальной памяти (см. упражнение 5.5).

Другие два ограничения определяются реализацией:

■ MQ_OPEN_MAX — максимальное количество очередей сообщений, которые могут быть одновременно открыты каким-либо процессом (Posix требует, чтобы эта величина была не меньше 8);

■ MQ_PRIO_MAX — максимальное значение приоритета плюс один (Posix требует, чтобы эта величина была не меньше 32).

Эти две константы часто определяются в заголовочном файле и могут быть получены во время выполнения программы вызовом функции sysconf, как мы покажем далее.

Пример: программа mqsysconf

Программа в листинге 5.7 вызывает функцию sysconf и выводит два ограничения на очереди сообщений, определяемые реализацией.

Листинг 5.7. Получение ограничений очередей с помощью sysconf

//pxmsg/mqsysconf.с

1 #include "unpipc.h"

2 int

3 main(int argc, char **argv)

4 {

5  printf("MQ_OPEN_MAX = %ld, MQ_PRIO_MAX = %ld\n",

6  Sysconf(_SC_MQ_OPEN_MAX), Sysconf(_SC_MQ_PRIO_MAX));

7  exit(0);

8 }

Запустив эту программу в наших двух операционных системах, получим:

solaris % mqsysconf

MQ_OPEN_MAX = 32, MQ_PRIO_MAX = 32

alpha % mqsysconf

MQ_OPEN_MAX = 64, MQ_PRIO_MAX = 256

5.6. Функция mq_notify

Один из недостатков очередей сообщений System V, как мы увидим в главе 6, заключается в невозможности уведомить процесс о том, что в очередь было помещено сообщение. Мы можем заблокировать процесс при вызове msgrcv, но тогда мы не сможем выполнять другие действия во время ожидания сообщения. Если мы укажем флаг отключения блокировки при вызове msgrcv (IPC_NOWAIT), процесс не будет заблокирован, но нам придется регулярно вызывать эту функцию, чтобы получить сообщение, когда оно будет отправлено. Мы уже говорили, что такая процедура называется опросом и на нее тратится лишнее время. Нужно, чтобы система сама уведомляла процесс о том, что в пустую очередь было помещено новое сообщение.

ПРИМЕЧАНИЕ

В этом и всех последующих разделах данной главы обсуждаются более сложные вопросы, которые могут быть пропущены при первом чтении. 

Очереди сообщений Posix допускают асинхронное уведомление о событии, когда сообщение помещается в очередь. Это уведомление может быть реализовано либо отправкой сигнала, либо созданием программного потока для выполнения указанной функции.

Мы включаем режим уведомления с помощью функции mq_notify:

#include 

int mq_notify(mqd_t mqdes, const struct sigevent *notification);

/* Возвращает 0 в случае успешного выполнения, –1 – в случае ошибки */

Эта функция включает и выключает асинхронное уведомление о событии для указанной очереди. Структура sigevent впервые появилась в стандарте Posix.1 для сигналов реального времени, о которых более подробно рассказано в следующем разделе. Эта структура и все новые константы, относящиеся к сигналам, определены в заголовочном файле :

union sigval {

 int sival_int; /* целое значение */

 void *sival_ptr; /* указатель */ 

};


struct sigevent {

 int sigev_notify; /* SIGEV_{NONE,SIGNAL,THREAD} */

 int sigev_signo; /* номер сигнала, если SIGEV_SIGNAL */

 union sigval sigev_value; /* передается обработчику сигнала или потоку */

/* Следующие два поля определены для SIGEV_THREAD */

void (*sigev_notify_function) (union sigval);

pthread_attr_t *sigev_notify_attributes;

Мы вскоре приведем несколько примеров различных вариантов использования уведомления, но о правилах, действующих для этой функции всегда, можно упомянуть уже сейчас.

1. Если аргумент notification ненулевой, процесс ожидает уведомления при поступлении нового сообщения в указанную очередь, пустую на момент его поступления. Мы говорим, что процесс регистрируется на уведомление для данной очереди.

2. Если аргумент notification представляет собой нулевой указатель и процесс уже зарегистрирован на уведомление для данной очереди, то уведомление для него отключается.

3. Только один процесс может быть зарегистрирован на уведомление для любой данной очереди в любой момент.

4.  При помещении сообщения в пустую очередь, для которой имеется зарегистрированный на уведомление процесс, оно будет отправлено только в том случае, если нет заблокированных в вызове mq_receive для этой очереди процессов. Таким образом, блокировка в вызове mq_receive имеет приоритет перед любой регистрацией на уведомление.

5. При отправке уведомления зарегистрированному процессу регистрация снимается. Процесс должен зарегистрироваться снова (если в этом есть необходимость), вызвав mq_notify еще раз.

ПРИМЕЧАНИЕ

С сигналами в Unix всегда была связана одна проблема: действие сигнала сбрасывалось на установленное по умолчанию каждый раз при отправке сигнала (раздел 10.4 [21]). Обычно первой функцией, вызываемой обработчиком сигнала, была signal, переопределявшая обработчик. Это создавало небольшой временной промежуток между отправкой сигнала и переопределением обработчика, в который процесс мог быть завершен при повторном появлении того же сигнала. На первый взгляд может показаться, что та же проблема должна возникать и при использовании mq_notify, поскольку процесс должен перерегистрироваться каждый раз после появления уведомления. Однако очереди сообщений отличаются по своим свойствам от сигналов, поскольку необходимость отправки уведомления не может возникнуть, пока очередь не будет пуста. Следовательно, необходимо аккуратно перерегистрироваться на получение уведомления до считывания пришедшего сообщения из очереди.

Пример: простая программа с уведомлением

Прежде чем углубляться в тонкости сигналов реального времени и потоков Posix, мы напишем простейшую программу, включающую отправку сигнала SI6USR1 при помещении сообщения в пустую очередь. Эта программа приведена в листинге 5.8, и мы отметим, что она содержит ошибку, о которой мы вскоре поговорим подробно.

Листинг 5.8. Отправка sigusr1 при помещении сообщения в пустую очередь (неправильная версия программы)

//pxmsg/mqnotifysigl.c

1  #include "unpipc.h"

2  mqd_t mqd;

3  void *buff;

4  struct mq_attr attr;

5  struct sigevent sigev;

6  static void sig_usrl(int);


7  int

8  main(int argc, char **argv)

9  {

10  if (argc != 2)

11   err_quit("usage: mqnotifysig1 ");

12  /* открываем очередь, получаем атрибуты, выделяем буфер */

13  mqd = Mq_open(argv[1], O_RDONLY);

14  Mq_getattr(mqd, &attr);

15  buff = Malloc(attr.mq_msgsize);

16  /* устанавливаем обработчик, включаем уведомление */

17  Signal(SIGUSR1, sig_usr1);

18  sigev.sigev_notify = SIGEV_SIGNAL;

19  sigev.sigev_signo = SIGUSR1;

20  Mq_notify(mqd, &sigev);

21  for (;;)

22   pause(); /* все делает обработчик */

23  exit(0);

24 }


25 static void

26 sig_usr1(int signo)

27 {

28  ssize_t n;

29  Mq_notify(mqd, &sigev); /* сначала перерегистрируемся */

30  n = Mq_receive(mqd, buff, attr.mq_msgsize, NULL);

31  printf("SIGUSR1 received, read %ld bytes\n", (long) n);

32  return;

33 }

Объявление глобальных переменных

2-6 Мы объявляем несколько глобальных переменных, используемых совместно функцией main и нашим обработчиком сигнала (sig_usr1).

Открытие очереди, получение атрибутов, выделение буфера чтения

12-15 Мы открываем очередь сообщений, получаем ее атрибуты и выделяем буфер считывания соответствующего размера.

Установка обработчика сигнала, включение уведомления

16-20 Сначала мы устанавливаем свой обработчик для сигнала SIGUSR1. Мы присваиваем полю sigev_notify структуры sigevent значение SIGEV_SIGNAL, что говорит системе о необходимости отправки сигнала, когда очередь из пустой становится непустой. Полю sigev_signo присваивается значение, соответствующее тому сигналу, который мы хотим получить. Затем вызывается функция mq_notify.

Бесконечный цикл

Функция main после этого зацикливается, и процесс приостанавливается при вызове pause, возвращающей –1 при получении сигнала.

Получение сигнала, считывание сообщения

Обработчик сигнала вызывает mq_notify для перерегистрации, считывает сообщение и выводит его длину. В этой программе мы игнорируем приоритет полученного сообщения. 

ПРИМЕЧАНИЕ

Оператор return в конце sig_usr1 не требуется, поскольку возвращаемое значение отсутствует, а конец текста функции неявно предусматривает возвращение в вызвавшую программу. Тем не менее автор всегда записывает return явно, чтобы указать, что возвращение из этой функции может происходит с особенностями. Например, может произойти преждевременный возврат (с ошибкой EINTR) в потоке, обрабатывающем сигнал. 

Запустим теперь эту программу в одном из окон

solaris % mqcreate /test1

solaris % mqnotifysig1 /test1

и затем выполним следующую команду в другом окне

solaris % mqsend /test1 50 16

Как и ожидалось, программа mqnotifysig1 выведет сообщение: SIGUSR1 received, read 50 bytes.

Мы можем проверить, что только один процесс может быть зарегистрирован на получение уведомления в любой момент, запустив копию пpoгрaммы в другом окне:

solaris % mqnotifysig1 /test1

mq_notify error: Device busy

Это сообщение соответствует коду ошибки EBUSY.

Сигналы Posix: функции типа Async-Signal-Safe

Недостаток пpoгрaммы из листинга 5.8 в том, что она вызывает mq_notify, mq_receive и printf из обработчика сигнала. Ни одну из этих функций вызывать оттуда не следует.

Функции, которые могут быть вызваны из обработчика сигнала, относятся к группе, называемой, согласно Posix, async-signal-safe functions (функции, обеспечивающие безопасную обработку асинхронных сигналов). В табл. 5.1 приведены эти функции по стандарту Posix вместе с некоторыми дополнительными, появившимися только в Unix 98.

Функции, которых нет в этом списке, не должны вызываться из обработчика сигнала. Обратите внимание, что в списке отсутствуют стандартные функции библиотеки ввода-вывода и функции pthread_XXX для работы с потоками. Из всех функций IPC, рассматриваемых в этой книге, в список попали только sem_post, read и write (подразумевается, что последние две используются с программными каналами и FIFO).

ПРИМЕЧАНИЕ

Стандарт ANSI С указывает четыре функции, которые могут быть вызваны из обработчика сигналов: abort, exit, longjmp, signal. Первые три отсутствуют в списке функций async-signal-safe стандарта Unix 98. 


Таблица 5.1. Функции, относящиеся к группе async-signal-safe

access        fpathconf rename      sysconf

aio_return    fstat     rmdir       tcdrain

aio_suspend   fsync     sem_post    tcflow 

alarm         getegid   setgid      tcflush

cfgetispeed   geteuid   setpgid     tcgetattr

cfgetospeed   getgid    setsid      tcgetgrp

cfsetispeed   getgroups setuid      tcsendbreak

cfsetospeed   getpgrp   sigaction   tcsetattr

chdir         getpid    sigaddset   tcsetpgrp

chmod         getppid   sigdelset   time

chown         getuid    sigemptyset timer_getoverrun

clock_gettime kill      sigfillset  timer_gettime

close         link      sigismember timer_settime

creat         lseek     signal      times

dup           mkdir     sigpause    umask

dup2          mkfifo    sigpending  uname

execle        open      sigprocmask unlink

execve        pathconf  sigqueue    utime

_exit         pause     sigset      wait

fcntl         pipe      sigsuspend  waitpid

fdatasync     raise     sleep       write

fork          read      stat

Пример: уведомление сигналом

Одним из способов исключения вызова каких-либо функций из обработчика сигнала является установка этим обработчиком глобального флага, который проверяется программным потоком для получения информации о приходе сообщения. В листинге 5.9 иллюстрируется этот метод, хотя новая программа также содержит ошибку, но уже другую, о которой мы вскоре поговорим подробнее.

Глобальная переменная

2 Поскольку единственное действие, выполняемое обработчиком сигнала, заключается в присваивании ненулевого значения флагу mqflag, глобальным переменным из листинга 5.8 уже не нужно являться таковыми. Уменьшение количества глобальных переменных — это всегда благо, особенно при использовании программных потоков.

Открытие очереди сообщений

15-18 Мы открываем очередь сообщений, получаем ее атрибуты и выделяем буфер считывания.

Инициализация наборов сигналов

19-22 Мы инициализируем три набора сигналов и устанавливаем бит для сигнала SIGUSR1 в наборе newmask.

Установка обработчика сигнала, включение уведомления

23-27 Мы устанавливаем обработчик сигнала для SIGUSR1, присваиваем значения полям структуры sigevent и вызываем mq_notify. 

Листинг 5.9. Обработчик сигнала устанавливает флаг для главного потока (неправильная версия)

//pxmsg/mqnotifysig2.c

1  #include "unpipc.h"

2  volatile sig_atomic_t mqflag; /* ненулевое значение устанавливается обработчиком */

3  static void sig_usrl(int);


4  int

5  main(int argc, char **argv)

6  {

7   mqd_t mqd;

8   void *buff;

9   ssize_t n;

10  sigset_t zeromask, newmask, oldmask;

11  struct mq_attr attr;

12  struct sigevent sigev;

13  if (argc != 2)

14   err_quit("usage: mqnotifysig2 ");

15  /* открытие очереди, получение атрибутов, выделение буфера */

16  mqd = Mq_open(argv[1], O_RDONLY);

17  Mq_getattr(mqd, &attr);

18  buff = Malloc(attr.mq_msgsize);

19  Sigemptyset(&zeromask); /* сигналы не блокируются */

20  Sigemptyset(&newmask);

21  Sigemptyset(&oldmask);

22  Sigaddset(&newmask, SIGUSR1);

23  /* установка обработчика, включение уведомления */

24  Signal(SIGUSR1, sig_usr1);

25  sigev.sigev_notify = SIGEV_SIGNAL;

26  sigev.sigev_signo = SIGUSR1;

27  Mq_notify(mqd, &sigev);

28  for (;;) {

29   Sigprocmask(SIG_BLOCK, &newmask, &oldmask); /* блокируем SIGUSR1 */

30   while (mqflag == 0)

31    sigsuspend(&zeromask);

32   mqflag = 0; /* сброс флага */

33   Mq_notify(mqd, &sigev); /* перерегистрируемся */

34   n = Mq_receive(mqd, buff, attr.mq_msgsize, NULL);

35   printf("read %ld bytes\n", (long) n);

36   Sigprocmask(SIG_UNBLOCK, &newmask, NULL); /* разблокируем SIGUSR1 */

37  }

38  exit(0);

39 }


40 static void

41 sig_usr1(int signo)

42 {

43  mqflag = 1;

44  return;

45 }

Ожидание установки флага обработчиком

28-32 Мы вызываем sigprocmask, чтобы заблокировать SIGUSR1, сохраняя текущую маску сигналов в oldmask. Затем мы в цикле проверяем значение глобального флага mqflag, ожидая, когда обработчик сигнала установит его в ненулевое значение. Пока значение этого флага равно нулю, мы вызываем sigsuspend, что автоматически приостанавливает вызывающий поток и устанавливает его маску в zeromask (сигналы не блокируются). Раздел 10.16 [21] рассказывает о функции sigsuspend более подробно. Также там объясняются причины, по которым мы должны проверять значение переменной mqflag только при заблокированном сигнале SIGUSR1. Каждый раз при выходе из sigsuspend сигнал SIGUSR1 блокируется.

Перерегистрация и считывание сообщения

33-36 Когда флаг mqflag принимает ненулевое значение, мы регистрируемся на получение уведомления заново и считываем сообщение из очереди. Затем мы разблокируем сигнал SIGUSR1 и возвращаемся к началу цикла.

Мы уже говорили, что в этой версии программы также присутствует ошибка. Посмотрим, что произойдет, если в очередь попадут два сообщения, прежде чем будет считано первое из них. Мы можем имитировать это, добавив sleep перед вызовом mq_notify. Проблема тут в том, что уведомление отсылается только в том случае, когда сообщение помещается в пустую очередь. Если в очередь поступают два сообщения, прежде чем первое будет считано, то отсылается только одно уведомление. Тогда мы считываем первое сообщение и вызываем sigsuspend, ожидая поступления еще одного. А в это время в очереди уже имеется сообщение, которое мы должны прочитать, но которое мы никогда не прочтем.

Пример: уведомление сигналом с отключением блокировки

Исправить описанную выше ошибку можно, отключив блокировку операции считывания сообщений. Листинг 5.10 содержит измененную версию программы из листинга 5.9. Новая программа считывает сообщения в неблокируемом режиме.

Листинг 5.10. Использование уведомления с помощью сигнала для считывания сообщения из очереди сообщений Posix

//pxmsg/mqnotifysig3.с

1  #include "unpipc.h"

2  volatile sig_atomic_t mqflag; /* ненулевое значение устанавливается обработчиком сигнала */

3  static void sig_usr1(int);


4  int

5  main(int argc, char **argv)

6  {

7   mqd_t mqd;

8   void *buff;

9   ssize_t n;

10  sigset_t zeromask, newmask, oldmask;

11  struct mq_attr attr;

12  struct sigevent sigev;

13  if (argc != 2)

14   err_quit("usage: mqnotifysig3 ");

15  /* открытие очереди, получение атрибутов, выделение буфера */

16  mqd = Mq_open(argv[1], O_RDONLY | O_NONBLOCK);

17  Mq_getattr(mqd, &attr);

18  buff = Malloc(attr.mq_msgsize);

19  Sigemptyset(&zeromask); /* сигналы не блокируются */

20  Sigemptyset(&newmask);

21  Sigemptyset(&oldmask);

22  Sigaddset(&newmask, SIGUSR1);

23  /* установка обработчика, включение уведомления */

24  Signal(SIGUSR1, sig_usr1);

25  sigev.sigev_notify = SIGEV_SIGNAL;

26  sigev.sigev_signo = SIGUSR1;

27  Mq_notify(mqd, &sigev);

28  for (;;) {

29   Sigprocmask(SIG_BLOCK, &newmask, &oldmask); /* блокируем SIGUSR1 */

30   while (mqflag == 0)

31    sigsuspend(&zeromask);

32   mqflag = 0; /* сброс флага */

33   Mq_notify(mqd, &sigev); /* перерегистрируемся */

34   while ((n = mq_receive(mqd, buff, attr.mq_msgsize, NULL)) >= 0) {

35    printf("read $ld bytes\n", (long) n);

36   }

37   if (errno != EAGAIN)

38    err_sys("mq_receive error");

39   Sigprocmask(SIG_UNBLOCK, &newmask, NULL); /* разблокируем SIGUSR1 */

40  }

41  exit(0);

42 }


43 static void

44 sig_usr1(int signo)

45 {

46  mqflag = 1;

47  return;

48 }

Открытие очереди сообщений в режиме отключенной блокировки

15-18 Первое изменение в программе: при открытии очереди сообщений указывается флаг O_NONBLOCK.

Считывание всех сообщений из очереди

34-38 Другое изменение: mq_receive вызывается в цикле, считывая все сообщения в очереди, пока не будет возвращена ошибка с кодом EAGAIN, означающая отсутствие сообщений в очереди. 

Пример: уведомление с использованием sigwait вместо обработчика

Хотя программа из предыдущего примера работает правильно, можно повысить ее эффективность. Программа использует sigsuspend для блокировки в ожидании прихода сообщения. При помещении сообщения в пустую очередь вызывается сигнал, основной поток останавливается, запускается обработчик, который устанавливает флаг mqflag, затем снова запускается главный поток, он обнаруживает, что значение mqflag отлично от нуля, и считывает сообщение. Более простой и эффективный подход заключается в блокировании в функции, ожидающей получения сигнала, что не требует вызова обработчика только для установки флага. Эта возможность предоставляется функцией sigwait:

#include 

int sigwait(const sigset_t *set, int *sig);

/* Возвращает 0 в случае успешного завершения, –1 – в случае ошибки */

Перед вызовом sigwait мы блокируем некоторые сигналы. Набор блокируемых сигналов указывается в качестве аргумента set. Функция sigwait блокируется, пока не придет по крайней мере один из этих сигналов. Когда он будет получен, функция возвратит его. Значение этого сигнала сохраняется в указателе sig, а функция возвращает значение 0. Это называется синхронным ожиданием асинхронного события: мы используем сигнал, но не пользуемся асинхронным обработчиком сигнала.

В листинге 5.11 приведен текст программы, использующей mq_notifу и sigwait.

Листинг 5.11. Использование mq_notify совместно с sigwait

//pxmsg/mqnotifysig4.c

1  #include "unpipc.h"


2  int

3  main(int argc, char **argv)

4  {

5   int signo;

6   mqd_t mqd;

7   void *buff;

8   ssize_t n;

9   sigset_t newmask;

10  struct mq_attr attr;

11  struct sigevent sigev;

12  if (argc != 2)

13   err_quit("usage: mqnotifysig4 ");

14  /* открытие очереди, получение атрибутов, выделение буфера */

15  mqd = Mq_open(argv[1], O_RDONLY | O_NONBLOCK);

16  Mq_getattr(mqd, &attr);

17  buff = Malloc(attr.mq_msgsize);

18  Sigemptyset(&newmask);

19  Sigaddset(&newmask, SIGUSR1);

20  Sigprocmask(SIG_BLOCK, &newmask, NULL); /* блокируем SIGUSR1 */

21  /* установка обработчика, включение уведомления */

22  sigev.sigev_notify = SIGEV_SIGNAL;

23  sigev.sigev_signo = SIGUSR1;

24  Mq_notify(mqd, &sigev);

25  for (;;) {

26   Sigwait(&newmask, &signo);

27   if (signo == SIGUSR1) {

28    Mq_notify(mqd, &sigev); /* перерегистрируемся */

29    while ((n = mq_receive(mqd, buff, attr.mq_msgsize, NULL)) >= 0) {

30     printf("read %ld bytes\n", (long) n);

31    }

32    if (errno != EAGAIN)

33     err_sys("mq_receive error");

34   }

35  }

36  exit(0);

37 }

Инициализация набора сигналов и блокировка SIGUSR1

18-20 Инициализируется один набор сигналов, содержащий только SIGUSR1, а затем этот сигнал блокируется sigprocmask.

Ожидание сигнала

26-34 Мы блокируем выполнение программы и ждем прихода сигнала, вызвав sigwait. При получении сигнала SIGUSR1 мы перерегистрируемся на уведомление и считываем все доступные сообщения.

ПРИМЕЧАНИЕ

Функция sigwait часто используется в многопоточных процессах. Действительно, глядя на прототип функции, мы можем заметить, что возвращаемое значение будет 0 или одной из ошибок Еххх, что весьма похоже на функции Pthread. Однако в многопоточном процессе нельзя пользоваться sigprocmask — вместо нее следует вызывать pthread_ sigmask, которая изменяет маску сигналов только для вызвавшего ее потока. Аргументы pthread_sigmask совпадают с аргументами sigprocmask.

Существуют два варианта функции sigwait: sigwaitinfo возвращает структуру siginfo_t (которая будет определена в следующем разделе) и предназначена для использования с надежными сигналами; функция sigtimedwait также возвращает структуру siginfo_t и позволяет вызывающему процессу установить ограничение по времени на ожидание.

Большая часть книг о многопоточном программировании, таких как [3], рекомендуют пользоваться sigwait для обработки всех сигналов в многопоточном процессе и не использовать асинхронные обработчики. 

Пример: очереди сообщений Posix и функция select

Дескриптор очереди сообщений (переменная типа mqd_t) не является «обычным» дескриптором и не может использоваться с функциями select и poll (глава 6 [24]). Тем не менее их можно использовать вместе с каналом и функцией mq_notify. (Аналогичный метод применен в разделе 6.9 для очередей System V, где создается дочерний процесс и канал связи.) Прежде всего обратите внимание, что, согласно табл. 5.1, функция write принадлежит к группе async-signal-safe, поэтому она может вызываться из обработчика сигналов. Программа приведена в листинге 5.12.

Листинг 5.12. Использование уведомления с помощью сигнала и канала

//pxmsg/mqnotifysig5.c

1  #include "unpipc.h"

2  int pipefd[2];

3  static void sig_usr1(int);


4  int

5  main(int argc, char **argv)

6  {

7   int nfds;

8   char c;

9   fd_set rset;

10  mqd_t mqd;

11  void *buff;

12  ssize_t n;

13  struct mq_attr attr;

14  struct sigevent sigev;

15  if (argc != 2)

16   err_quit("usage: mqnotifysig5 ");

17  /* открытие очереди, получение атрибутов, выделение буфера */

18  mqd = Mq_open(argv[1], O_RDONLY | O_NONBLOCK);

19  Mq_getattr(mqd, &attr);

20  buff = Malloc(attr.mq_msgsize);

21  Pipe(pipefd);

22  /* установка обработчика, включение уведомления */

23  Signal(SIGUSR1, sig_usr1);

24  sigev.sigev_notify = SIGEV_SIGNAL;

25  sigev.sigev_signo = SIGUSR1;

26  Mq_notify(mqd, &sigev);

27  FD_ZERO(&rset);

28  for (;;) {

29   FD_SET(pipefd[0], &rset);

30   nfds = Select(pipefd[0] + 1, &rset, NULL, NULL, NULL);

31   if (FD_ISSET(pipefd[0], &rset)) {

32    Read(pipefd[0], &c, 1);

33    Mq_notify(mqd, &sigev); /* перерегистрируемся */

34    while ((n = mq_receive(mqd, buff, attr.mq_msgsize, NULL)) >= 0) {

35     printf("read %ld bytes\n", (long) n);

36    }

37    if (errno != EAGAIN)

38     err_sys("mq_receive error");

39   }

40  }

41  exit(0);

42 }


43 static void

44 sig_usr1(int signo)

45 {

46  Write(pipefd[1], "", 1); /* один байт – 0 */

47  return;

48 }

Создание канала

21 Мы создаем канал, в который обработчик сигнала произведет запись, когда будет получено уведомление о поступлении сообщения в очередь. Это пример использования канала внутри одного процесса.

Вызов select

27-40 Мы инициализируем набор дескрипторов rset и при каждом проходе цикла включаем бит, соответствующий дескриптору pipefd[0] (открытый на считывание конец канала). Затем мы вызываем функцию select, ожидая получения единственного дескриптора, хотя в типичном приложении именно здесь осуществлялось бы размножение дескрипторов одного из концов канала. Когда появляется возможность читать из канала, мы перерегистрируемся на уведомление и считываем все доступные сообщения.

Обработчик сигнала

43-48 Единственное, что делает обработчик сигнала, — записывает в канал 1 байт. Как мы уже отмечали, эта операция относится к разрешенным для асинхронных обработчиков.

Пример: запуск нового потока

Альтернативой снятию блокировки сигналом является присваивание sigev_notify значения SIGEV_THREAD, что приводит к созданию нового потока. Функция, указанная в sigev_notify_function, вызывается с параметром sigev_value. Атрибуты нового канала указываются переменной sigev_notify_attributes, которая может быть и нулевым указателем, если нас устраивают устанавливаемые по умолчанию атрибуты. Текст программы приведен в листинге 5.13.

Листинг 5.13. Функция mq_notify, запускающая новый программный поток

//pxmsg/mqnotifythread1.с

1  #include "unpipc.h"

2  mqd_t mqd;

3  struct mq_attr attr;

4  struct sigevent sigev;

5  static void notify_thread(union sigval); /* наш поток */


6  int

7  main(int argc, char **argv)

8  {

9   if (argc != 2)

10   err_quit("usage: mqnotifythread1 ");

11  mqd = Mq_open(argv[1], O_RDONLY | O_NONBLOCK);

12  Mq_getattr(mqd, &attr);

13  sigev.sigev_notify = SIGEV_THREAD;

14  sigev.sigev_value.sival_ptr = NULL;

15  sigev.sigev_notify_function = notify_thread;

16  sigev.sigev_notify_attributes = NULL;

17  Mq_notify(mqd, &sigev);

18  for (;;)

19   pause(); /* новый поток делает все */

20  exit(0);

21 }


22 static void

23 notify_thread(union sigval arg)

24 {

25  ssize_t n;

26  void *buff;

27  printf("notify_thread started\n");

28  buff = Malloc(attr.mq_msgsize);

29  Mq_notify(mqd, &sigev); /* перерегистрируемся */

30  while ((n = mq_receive(mqd, buff, attr.mq_msgsize, NULL)) >= 0) {

31   printf("read %ld bytes\n", (long) n);

32  }

33  if (errno != EAGAIN)

34   err_sys("mq_receive error");

35  free(buff);

36  pthread_exit(NULL);

37 }

Мы задаем нулевой указатель в качестве аргумента нового потока (sigev_value), поэтому функции start нового потока ничего не передается. Мы могли бы передать указатель на дескриптор, вместо того чтобы декларировать его как глобальный, но новому потоку все равно нужно получать атрибуты очереди сообщений и структуру sigev (для перерегистрации). Мы также указываем нулевой указатель в качестве атрибутов нового потока, поэтому используются установки по умолчанию. Новые потоки создаются как неприсоединенные (detached threads).

ПРИМЕЧАНИЕ

К сожалению, ни одна из использовавшихся для проверки примеров систем (Solaris 2.6 и Digital Unix 4.0B) не поддерживает SIGEV_THREAD. Обе они допускают только два значения sigev_notify: SIGEV_NONE и SIGEV_SIGNAL.

5.7. Сигналы реального времени Posix

За прошедшие годы сигналы в Unix много раз претерпевали революционные изменения.

1. Модель сигналов, предлагавшаяся в Unix Version 7 (1978), была ненадежной. Сигналы могли быть потеряны, и процессу было трудно отключить отдельные сигналы при выполнении отдельных участков кода.

2. В версии 4.3BSD (1986) надежные сигналы были добавлены.

3. Версия System V Release 3.0 (1986) также добавила надежные сигналы, хотя и иначе, чем BSD.

4. Стандарт Posix.1 (1990) увековечил модель надежных сигналов BSD, и эта модель подробно описана в главе 10 [21].

5. Posix.1 (1996) добавил к модели Posix сигналы реального времени. Это произросло из расширений реального времени Posix.1b (которые были названы Posix.4).

Почти все системы Unix в настоящее время поддерживают надежные сигналы, а новейшие системы предоставляют также и сигналы реального времени стандарта Posix. (Следует различать надежные сигналы и сигналы реального времени.) О сигналах реального времени следует поговорить подробнее, поскольку мы уже столкнулись с некоторыми структурами, определяемыми этим расширением стандарта, в предыдущем разделе (структуры sigval и sigevent).

Сигналы могут быть отнесены к двум группам:

1. Сигналы реального времени, которые могут принимать значения между SIGRTMIN и SIGRTMAX включительно. Posix требует, чтобы предоставлялось по крайней мере RTSIG_MAX сигналов, и минимальное значение этой константы равно 8.

2. Все прочие сигналы: SIGALRM, SIGINT, SIGKILL и пр.

ПРИМЕЧАНИЕ

В Solaris 2.6 обычные сигналы Unix нумеруются с 1 по 37, а 8 сигналов реального времени имеют номера с 38 по 45. В Digital Unix 4.0B обычные сигналы нумеруются с 1 по 32, а 16 сигналов реального времени имеют номера с 33 по 48. Обе реализации определяют SIGRTMIN и SIGRTMAX как макросы, вызывающие sysconf, что позволяет изменять их значения.

Далее все зависит от того, установлен ли процессом, получившим сигнал, флаг SA_SIGINFO при вызове sigaction. В итоге получаются четыре возможных сценария, приведенных в табл. 5.2.


Таблица 5.2. Поведение сигналов Posix в реальном времени в зависимости от SA_SIGINFO 

СигналФлаг SA_SIGINFO указанФлаг SA_SIGINFO не указан
От SIGRTMIN до SIGRTMAXГарантируются характеристики реального времениХарактеристики реального времени не обязательны
Все прочие сигналыХарактеристики реального времени не обязательныХарактеристики реального времени не обязательны

Смысл фразы «характеристики реального времени не обязательны» следующий: некоторые реализации могут обрабатывать эти сигналы как сигналы реального времени, но это не обязательно. Если мы хотим, чтобы сигналы обрабатывались как сигналы реального времени, мы должны использовать сигналы с номерами от SIGRTMIN до SIGRTMAX и должны указать флаг SA_SIGINFO при вызове sigaction при установке обработчика сигнала.

Термин «характеристики реального времени» подразумевает следующее:

■ Сигналы помещаются в очередь. Если сигнал будет порожден трижды, он будет трижды получен адресатом. Более того, повторения одного и того же сигнала доставляются в порядке очереди (FIFO). Мы вскоре покажем пример очереди сигналов. Если же сигналы в очередь не помещаются, трижды порожденный сигнал будет получен лишь один раз.

■ Когда в очередь помещается множество неблокируемых сигналов в диапазоне SIGRTMIN—SIGRTMAX, сигналы с меньшими номерами доставляются раньше сигналов с большими номерами. То есть сигнал с номером SIGRTMIN имеет «больший приоритет», чем сигнал с номером SIGRTMIN+1, и т.д.

■ При отправке сигнала, не обрабатываемого как сигнал реального времени, единственным аргументом обработчика является номер сигнала. Сигналы реального времени несут больше информации, чем прочие сигналы. Обработчик для сигнала реального времени, устанавливаемый с флагом SA_SIGINFO, объявляется как

void func(int signo, siginfo_t *info, void *context);

где signo— номер сигнала, a siginfo_t — структура, определяемая как

typedef struct {

 int si_signo; /* то же, что и signo */

 int si_code; /* SI_{USER,QUEUE,TIMER,ASYNCIO,MESGQ} */

 union sigval si_value; /* целое или указатель от отправителя */

} siginfo_t;

На что указывает context — зависит от реализации.

ПРИМЕЧАНИЕ

Обработчик сигналов, не являющихся сигналами реального времени, вызывается с единственным аргументом. Во многих системах существует старое соглашение о вызове обработчиков сигналов с тремя аргументами, которое предшествовало стандарту реального времени Posix.

Тип siginfo_t является единственной структурой Posix, определяемой оператором typedef с именем, оканчивающимся на _t. В листинге 5.14 мы объявляем указатели на эти структуры как siginfo_t * без слова struct. 

■ Для работы с сигналами реального времени добавлено несколько новых функций. Например, для отправки сигнала какому-либо процессу используется функция sigqueue вместо kill. Новая функция позволяет отправить вместе с сигналом структуру sigval.

Сигналы реального времени порождаются нижеследующими функциями Posix.1, определяемыми значением si_code, которое хранится в структуре siginfo_t, передаваемой обработчику сигнала.

■ SI_ASYNCIO — сигнал был порожден по завершении асинхронного запроса на ввод или вывод одной из функций Posix aio_XXX, которые мы не рассматриваем;

■ SI_MESGQ — сигнал был порожден при помещении сообщения в пустую очередь сообщений (как в разделе 5.6); 

■ SI_QUEUE — сигнал был отправлен функцией sigqueue. Пример будет вскоре приведен;

■ SI_TIMER — сигнал был порожден по истечении установленного функцией timer_settime времени. Эту функцию мы не описываем;

■ SI_USER — сигнал был отправлен функцией kill.

Если сигнал был порожден каким-либо другим событием, si_code будет иметь значение, отличающееся от приведенных выше. Значение поля si_value структуры siginfo_t актуально только в том случае, если si_code имеет одно из следующих значений: SI_ASYNCIO, SI_MESGQ, SI_QUEUE и SI_TIMER.

Пример

В листинге 5.14 приведен пример программы, демонстрирующей использование сигналов реального времени. Программа вызывает fork, дочерний процесс блокирует три сигнала реального времени, родительский процесс отправляет девять сигналов (три раза отсылается каждый из заблокированных сигналов), затем дочерний процесс разблокирует сигналы и мы смотрим, сколько раз будет получен каждый из них и в каком порядке они придут.

Листинг 5.14. Тестовая программа, иллюстрирующая работу с сигналами реального времени

//rtsignals/test1.c

1  #include "unpipc.h"

2  static void sig_rt(int, siginfo_t *, void *);


3  int

4  main(int argc, char **argv)

5  {

6   int i, j;

7   pid_t pid;

8   sigset_t newset;

9   union sigval val;

10  printf("SIGRTMIN = %d, SIGRTMAX = %d\n", (int) SIGRTMIN, (int) SIGRTMAX);

11  if ((pid = Fork()) == 0) {

12   /* дочерний процесс блокирует 3 сигнала */

13   Sigemptyset(&newset);

14   Sigaddset(&newset, SIGRTMAX);

15   Sigaddset(&newset, SIGRTMAX – 1);

16   Sigaddset(&newset, SIGRTMAX – 2);

17   Sigprocmask(SIG_BLOCK, &newset, NULL);

18   /* установка обработчика с флагом SA_SIGINFO */

19   Signal_rt(SIGRTMAX, sig_rt);

20   Signal_rt(SIGRTMAX – 1, sig_rt);

21   Signal_rt(SIGRTMAX – 2, sig_rt);

22   sleep(6); /* родительский процесс посылает все сигналы */

23   Sigprocmask(SIG UNBLOCK, &newset, NULL); /* разблокируемся */

24   sleep(3); /* получаем сигналы */

25   exit(O);

26  }

27  /* родительский процесс отправляет сигналы */

28  sleep(3); /* дочерний процесс блокирует сигналы */

29  for (i = SIGRTMAX; i >= SIGRTMAX – 2; i--) {

30   for (j = 0; j <= 2; j++) {

31    val.sival_int = j;

32    Sigqueue(pid, i, val);

33    printf("sent signal %d, val = %d\n", i, j);

34   }

35  }

36  exit(0);

37 }


38 static void

39 sig_rt(int signo, siginfo_t *info, void *context)

40 {

41  printf(received signal #%d, code = %d, ival = %d\n",

42   signo.info->si_code, info->si_value.sival_int);

43 }

Вывод номеров сигналов реального времени

10 Мы печатаем наибольший и наименьший номера сигналов реального времени, чтобы узнать, сколько их предоставляется в данной реализации. Мы преобразуем обе константы к типу integer, поскольку в некоторых реализациях они определяются как макросы, требующие вызова sysconf, например:

#define SIGRTMAX (sysconf(_SC_RTSIG_MAX))

и функция sysconf возвращает целое типа long (см. упражнение 5.4).

Вызов fork и блокирование трех сигналов реального времени

11-17 Запускается дочерний процесс, который вызывает sigprocmask для блокировки трех используемых сигналов реального времени: SIGRTMAX, SIGRTMAX-1 и SIGRTMAX-2.

Установка обработчика сигнала

18-21 Мы вызываем функцию signal_rt (приведенную в листинге 5.15) для установки функции sig_rt в качестве обработчика трех указанных выше сигналов реального времени. Функция устанавливает флаг SA_SIGINFO, и поскольку эти три сигнала являются сигналами реального времени, мы можем ожидать, что они будут обрабатываться соответствующим образом. Эта функция также устанавливает маску сигналов, блокируемых на время выполнения обработчика.

Ожидание порождения сигналов родительским процессом, разблокирование сигналов

22-25 Дочерний процесс ждет 6 секунд, пока родительский породит девять сигналов. Затем вызывается sigprocmask для разблокирования трех сигналов реального времени. Это позволяет всем помещенным в очередь сигналам достичь адресата. Затем делается пауза еще на три секунды, чтобы обработчик успел вызвать printf девять раз, после чего дочерний процесс завершает свою работу.

Родительский процесс отправляет девять сигналов

27-36 Родительский процесс ждет три секунды, пока дочерний не заблокирует все требуемые сигналы. Затем родительский процесс порождает три экземпляра каждого из трех сигналов реального времени: i принимает 3 значения, a j принимает значения 0, 1 и 2 для каждого из значений i. Мы преднамеренно порождаем сигналы начиная с наибольшего номера, поскольку ожидаем, что они будут получены начиная с наименьшего. Мы также отсылаем с каждым из сигналов новое значение sigval_int, чтобы проверить, что копии одного и того же сигнала доставляются в том же порядке, в каком они были отправлены, то есть очередь действительно является очередью.

Обработчик сигнала

38-43 Обработчик сигнала просто выводит информацию о полученном сигнале.

ПРИМЕЧАНИЕ

Из табл. 5.1 следует, что функция printf не относится к функциям типа async-signal-safe и не должна вызываться из обработчика сигналов. Здесь мы используем ее исключительно в качестве проверочного средства в маленькой тестовой программе. 

Запустим эту программу в Solaris 2.6. Результат будет не тем, которого мы ожидали:

solaris % test1

SIGRTMIN = 38, SIGRTMAX = 45 8 сигналов реального времени

                             трехсекундная пауза

sent signal 45, val = 0

sent signal 45, val = 1

sent signal 45, val = 2

sent signal 44, val = 0

sent signal 44, val = 1

sent signal 44, val = 2

sent signal 43, val = 0

sent signal 43, val = 1

sent signal 43, val = 2

solaris % родительский процесс завершил работу, пауза 3 секунды,

          пока дочерний процесс не разблокирует сигналы

received signal #45, code = –2, ival = 2 дочерний процесс получает сигналы

received signal #45, code = –2, ival = 1

received signal #45, code = –2, ival = 0

received signal #44, code = –2, ival = 2

received signal #44, code = –2, ival = 1

received signal #44, code = –2, ival = 0

received signal #43, code = –2, ival = 2

received signal #43, code = –2, ival = 1

received signal #43, code = –2, ival = 0

В очередь помещаются девять сигналов, но первыми принимаются сигналы с большими номерами (а мы ожидали получить сигналы с меньшими номерами). 

Кроме того, сигналы с одинаковым номером приходят в порядке LIFO, а не FIFO. Код si_code = –2 соответствует SI_QUEUE.

Запустив программу в Digital Unix 4.0B, мы получим именно тот результат, которого ожидали:

alpha % test1

SIGRTMIN = 33, SIGRTMAX = 48 16 сигналов реального времени

                             трех секундная пауза

sent signal 48, val = 0

sent signal 48, val = 1

sent signal 48, val = 2

sent signal 47, val = 0

sent signal 47, val = 1

sent signal 47, val = 2

sent signal 46, val = 0

sent signal 46, val = 1

sent signal 46, val = 2

alpha % родительский процесс завершил работу, пауза 3 секунды.

        пока дочерний процесс не разблокируетсигналы

received signal #46, code – –1, ival = 0 дочерний процесс получает сигналы

received signal #46, code = –1, ival = 1

received signal #46, code = –1, ival = 2

received signal #47, code – –1, ival = 0

received signal #47, code = –1, ival = 1

received signal #47, code = –1, ival = 2

received signal #48, code = –1, ival = 0

received signal #48, code = –1, ival = 1

received signal #48, code = –1, ival = 2

Девять сигналов помещаются в очередь и получаются адресатом в ожидаемом порядке: первым приходит сигнал с меньшим номером, а копии сигнала приходят в порядке FIFO.

ПРИМЕЧАНИЕ

Похоже, что в реализации Solaris 2.6 есть ошибка.

Функция signal_rt

В книге [24, с. 120] мы привели пример собственной функции signal, вызывавшей функцию sigaction стандарта Posix для установки обработчика сигнала, обеспечивающего надежную семантику Posix. Изменим эту функцию, чтобы обеспечить поддержку реального времени. Новую функцию мы назовем signal_rt; ее текст приведен в листинге 5.15.

Листинг 5.15. Функция signal_rt с поддержкой реального времени

//lib/signal_rt.c

1  #include "unpipc.h"

2  Sigfunc_rt *


3  signal_rt(int signo, Sigfunc_rt *func)

4  {

5   struct sigaction act, oact;

6   act.sa_sigaction = func; /* сохраняем адрес функции */

7   sigemptyset(&act.sa_mask);

8   act.sa_flags = SA_SIGINFO; /* гарантирует режим реального времени */

9   if (signo == SIGALRM) {

10 #ifdef SA_INTERRUPT

11   act.sa_flags |= SA_INTERRUPT; /* SunOS 4.x */

12 #endif

13  } else {

14 #ifdef SA_RESTART

15   act.sa_flags |= SA_RESTART; /* SVR4, 44BSD */

16 #endif

17  }

18  if (sigaction(signo, &act, &oact) < 0)

19   return((Sigfunc_rt *) SIG_ERR);

20  return(oact.sa_sigaction);

21 }

Упрощение прототипа функции с использованием typedef

1-3 В нашем заголовочном файле unpiрс.h (листинг В.1) мы определяем Sigfunc_rt как

typedef void Sigfunc_rt(int, siginfo_t*, void*);

Ранее в этом разделе мы говорили о том, что это прототип функции для обработчика сигнала, устанавливаемого с флагом SA_SIGINFO.

Указание функции-обработчика

Структура sigaction претерпела изменения с добавлением поддержки сигна-5-7 лов реального времени: к ней было добавлено новое поле sa_sigaction:

struct sigaction {

 void (*sa_handler)(); /* SIG_DFL, SIG_IGN или адрес обработчика сигнала */

 sigset_t sa_mask; /* дополнительные блокируемые сигналы */

 int sa_flags; /* параметры сигналов: SA_XXX */

 void (*sa_sigaction)(int, siginfo_t, void *);

};

Правила действуют следующие:

■ Если в поле sa_flags установлен флаг SA_SIGINFO, поле sa_sigaction указывает адрес функции-обработчика сигнала.

■ Если флаг SA_SIGINFO не установлен, поле sa_handler указывает адрес функции-обработчика сигнала.

■ Чтобы сопоставить сигналу действие по умолчанию или игнорировать его, следует установить sa_handler равным либо SIG_DFL, либо SIG_IGN и не устанавливать флаг SA_SIGINFO.

Установка SA_SIGINFO

8-17 Мы всегда устанавливаем флаг SA_SIGINFO и указываем флаг SA_RESTART, если перехвачен какой-либо другой сигнал, кроме SIGALRM.

5.8. Реализация с использованием отображения в память

Теперь рассмотрим реализацию очередей сообщений Posix с использованием отображения в память, взаимных исключений и условных переменных Posix. 

ПРИМЕЧАНИЕ

Взаимные исключения и условные переменные описаны в главе 7, а ввод-вывод с отображением в память — в главах 12 и 13. Вы можете отложить данный раздел до ознакомления с этими главами.

На рис. 5.2 приведена схема структур данных, которыми мы пользуемся для реализации очереди сообщений Posix. Изображенная очередь может содержать до четырех сообщений по 7 байт каждое.

В листинге 5.16 приведен заголовочный файл mqueue.h, определяющий основные структуры, используемые в этой реализации.

Тип mqd_t

Дескриптор нашей очереди сообщений является просто указателем на структуру mq_infо. Каждый вызов mq_open выделяет память под одну такую структуру, а вызвавшему возвращается указатель на нее. Повторим, что дескриптор очереди сообщений не обязательно является небольшим целым числом, как дескриптор файла — единственное ограничение, накладываемое Posix, заключается в том, что этот тип данных не может быть массивом.

Листинг 5.16. Заголовочный файл mqueue.h

//my_pxmsg_mmap/mqueue.h

1  typedef struct mymq_info *mymqd_t;


2  struct mymq_attr {

3   long mq_flags; /* флаг очереди : O_NONBLOCK */

4   long mq_maxmsg; /* максимальное количество сообщений в очереди */

5   long mq_msgsize; /* максимальный размер сообщения в байтах */

6   long mq_curmsgs; /* количество сообщений в очереди */

7  };


8  /* одна структура mymq_hdr{} на очередь, в начале отображаемого файла */

9  struct mymq_hdr {

10  struct mymq_attr mqh_attr; /* атрибуты очереди */

11  long mqh_head; /* индекс первого сообщения*/

12  long mqh_free; /* индекс первого пустого сообщения */

13  long mqh_nwait; /* количество заблокированных mq_receive() потоков */

14  pid_t mqh_pid; /* ненулевой PID. если включено уведомление */

15  struct sigevent mqh_event; /* для mq_notify() */

16  pthread_mutex_t mqh_lock; /* блокировка: mutex*/

17  pthread_cond_t mqh_wait; /* и условная переменная */

18 };


19 /* один mymsg_hdr{} в начале каждого сообщения */

20 struct mymsg_hdr {

21  long msg_next; /* индекс следующего сообщения в списке */

22                 /* msg_next должно быть первым полем в структуре */

23  ssize_t msg_len; /* реальная длина */

24  unsigned int msg_prio; /* приоритет */

25 };


26 /* одна mymq_info{} выделяется при каждом mq_open() */

27 struct mymq_info {

28  struct mymq_hdr *mqi_hdr; /* начало отображаемого файла */

29  long mqi_magic; /* магическое значение после инициализации */

30  int mqi_flags; /* флаги для данного процесса */

31 };

32 #define MQI_MAGIC 0x98765432

33 /* размер сообщения округляется для подгонки */

34 #define MSGSIZE(i) ((((i) + sizeof(long)-1) / sizeof(long)) * sizeof(long))

Рис. 5.2. Схема структур данных, используемых при реализации очередей сообщений posix через отображаемый в память файл 


Структура mq_hdr

8-18 Эта структура хранится в самом начале отображаемого файла и содержит всю информацию об очереди. Поле mq_flags структуры mqh_attr не используется, поскольку флаги (единственный определенный флаг используется для отключения блокировки) должны обрабатываться для каждого открывающего очередь процесса в отдельности, а не для очереди в целом. Поэтому флаги хранятся в структуре mq_info. О прочих полях этой структуры мы будем говорить в связи с их использованием различными функциями.

Обратите внимание, что все переменные, называемые нами индексными (поля этой структуры mqh_head и mqh_free, а также поле msg_next следующей структуры), содержат индексы байтов относительно начала отображаемого в память файла. Например, размер структуры mq_hdr в системе Solaris 2.6 — 96 байт, поэтому индекс первого сообщения, располагающегося сразу за заголовком, имеет значение 96. Каждое сообщение на рис. 5.2 занимает 20 байт (12 байт на структуру msg_hdr и 8 байт на данные), поэтому индексы следующих трех сообщений имеют значения 116, 136 и 156, а размер отображаемого в память файла — 176 байт. Индексы используются для обработки двух связных списков, хранящихся в этом файле: в одном из списков (mqh_head) хранятся все сообщения, имеющиеся в данный момент в очереди, а в другом (mqh_free) — все незаполненные сообщения. Мы не можем использовать настоящие указатели на области памяти (адреса) при работе со списком, поскольку отображаемый файл может находиться в произвольной области памяти для каждого из процессов, работающих с ним (как показано в листинге 13.5).

Структура msg_hdr

19-25 Эта структура располагается в начале каждого сообщения в отображаемом файле. Любое сообщение может принадлежать либо к списку заполненных, либо к списку свободных сообщений, и поле msg_next содержит индекс следующего сообщения в этом списке (или 0, если сообщение является в этом списке последним). Переменная msg_len хранит реальную длину данных в сообщении, которая в нашем примере с рис. 5.2 может иметь значение от 0 до 7 байт включительно. В переменную msg_prio отправителем помещается значение приоритета сообщения.

Структура mq_info

26-32 Экземпляр такой структуры динамически создается функцией mq_open при открытии очереди и удаляется mq_close. Поле mqi_hdr указывает на отображаемый файл (адрес начала файла возвращается mmap). Указатель на эту структуру имеет основной в нашей реализации тип mqd_t, он принимает значение, возвращаемое mq_open.

Поле mqi_magiс принимает значение MQI_MAGIC в момент инициализации структуры. Это значение проверяется всеми функциями, которым передается указатель типа mqd_t, что дает им возможность удостовериться, что указатель действительно указывает на структуру типа mq_infо. mqi_flags содержит флаг отключения блокировки для открывшего очередь процесса.

Макрос MSGSIZE

33-34 В целях выравнивания содержимого файла (alignment) мы располагаем начало каждого сообщения так, чтобы его индекс был кратен размеру длинного целого. Следовательно, если максимальный размер сообщения не допускает такого выравнивания, мы добавляем к нему от 1 до 3 байт, как показано на рис. 5.2. При этом предполагается, что размер длинного целого — 4 байт (что верно для Solaris 2.6). Если размер длинного целого 8 байт (в Digital Unix 4.0B), нам придется добавлять к каждому сообщению от 1 до 7 байт.

Функция mq_open

В листинге 5.17 приведен текст первой части функции mq_open, создающей новую очередь сообщений или открывающей существующую.

Листинг 5.17. Функция mq_open: первая часть

//my_pxmsg._mmap/mq_open. с

1  #include "unpipc.h"

2  #include "mqueue.h"

3  #include 

4  #define MAX_TRIES 10

5  struct mymq_attr defattr =

6   { 0, 128, 1024, 0 };


7  mymqd_t

8  mymq_open(const char *pathname, int oflag, …)

9  {

10  int i, fd, nonblock, created, save_errno;

11  long msgsize, filesize, index;

12  va_list ap;

13  mode_t mode;

14  int8_t *mptr;

15  struct stat statbuff;

16  struct mymq_hdr *mqhdr;

17  struct mymsg_hdr *msghdr;

18  struct mymq_attr *attr;

19  struct mymq_info *mqinfo;

20  pthread_mutexattr_t mattr;

21  pthread_condattr_t cattr;

22  created = 0;

23  nonblock = oflag & O_NONBLOCK;

24  oflag &= ~O_NONBLOCK;

25  mptr = (int8_t *) MAP_FAILED;

26  mqinfo = NULL;

27 again:

28  if (oflag & O_CREAT) {

29   va_start(ap, oflag); /* ар инициализируется последним аргументом */

30   mode = va_arg(ap, va_mode_t) & ~S_IXUSR;

31   attr = va_arg(ap, struct mymq_attr *);

32   va_end(ap);

33   /* открытие с установкой бита user-execute */

34   fd = open (pathname, oflag | O_EXCL | O_RDWR, mode | S_IXUSR);

35   if (fd < 0) {

36    if (errno == EEXIST && (oflag & O_EXCL) == 0)

37     goto exists; /* уже существует, OK */

38    else

39     return((mymqd_t) –1);

40   }

41   created = 1;

42   /* при создании файла он инициализируется */

43   if (attr == NULL)

44    attr = &defattr;

45   else {

46    if (attr->mq_maxmsg <– 0 || attr->mq_msgsize <= 0) {

47     errno = EINVAL;

48     goto err;

49    }

50   }

Обработка списка аргументов переменного размера

29-32 Функция может быть вызвана либо с двумя, либо с четырьмя аргументами в зависимости от того, указан ли флаг O_CREAT. Если флаг указан, третий аргумент имеет тип mode_t, а это простой системный тип, являющийся одним из целых типов. При этом мы столкнемся с проблемой в BSD/OS, где этот тип данных определен как unsigned short (16 бит). Поскольку целое в этой реализации занимает 32 бита, компилятор С увеличивает аргумент этого типа с 16 до 32 бит, потому что все короткие целые в списке аргументов увеличиваются до обычных целых. Но если мы укажем mode_t при вызове va_arg, он пропустит 16 бит аргумента в стеке, если этот аргумент был увеличен до 32 бит. Следовательно, мы должны определить свой собственный тип данных, va_mode_t, который будет целым в BSD/OS и типом mode_t в других системах. Эту проблему с переносимостью решают приведенные ниже строки нашего заголовка unpipc.h (листинг В.1):

#ifdef __bsdi__

#define va_mode_t int

#else

#define va_mode_t mode_t

#endif

30 Мы сбрасываем бит user-execute (S_IXUSR) в переменной mode по причинам, которые будут вскоре раскрыты.

Создание новой очереди сообщений

33-34 Создается обычный файл с именем, указанным при вызове функции, и устанавливается бит user-execute. 

Обработка потенциальной ситуации гонок

35-40 Если бы при указании флага O_CREAT мы просто открыли файл, отобразили его содержимое в память и проинициализировали отображенный файл (как будет описано ниже), у нас возникла бы ситуация гонок. Очередь сообщений инициализируется mq_open только в том случае, если вызывающий процесс указывает флаг O_CREAT и очередь сообщений еще не существует. Это означает, что нам нужно каким-то образом определять, существует она или нет. Для этого при открытии файла для последующего отображения в память мы всегда указываем флаг O_EXCL. Возвращение ошибки EEXIST функцией open является ошибкой для mq_open только в том случае, если при вызове был указан флаг O_EXCL. В противном случае при возвращении функцией open ошибки EEXIST мы делаем вывод, что файл уже существует, и переходим к листингу 5.19, как если бы флаг O_CREAT вовсе не был указан.

Ситуация гонок может возникнуть потому, что использование отображаемого в память файла для реализации очереди сообщений требует двух шагов при инициализации очереди: сначала файл должен быть создан функцией open, а затем его содержимое должно быть проинициализировано. Проблема возникает, если два потока (одного или различных процессов) вызывают mq_open приблизительно одновременно. Один из потоков может создать файл, после чего управление будет передано системой второму потоку, прежде чем первый завершит инициализацию файла. Второй поток обнаружит, что файл уже существует (вызвав open с флагом O_EXCL), и приступит к использованию очереди сообщений.

Мы используем бит user-execute для указания того, был ли проинициализирован файл с очередью сообщений. Этот бит устанавливается только тем потоком, который создает этот файл (флаг O_EXCL позволяет определить этот поток); этот поток инициализирует файл с очередью сообщений, а затем сбрасывает бит user-execute.

Аналогичная ситуация может возникнуть в листингах 10.28 и 10.37.

Проверка атрибутов

42-50 Если при вызове в качестве последнего аргумента передан нулевой указатель, очередь сообщений инициализируется со значениями атрибутов по умолчанию: 128 сообщений в очереди и 1024 байта на сообщение. Если атрибуты указаны явно, мы проверяем, что mq_maxmsg и mq_msgsize имеют положительные значения.

Вторая часть функции mq_open приведена в листинге 5.18. Она завершает инициализацию новой очереди сообщений.

Листинг 5.18. Вторая часть функции mq_open: инициализация новой очереди

//my_pxmsg_mmap/mq_open.с

51    /* вычисление и установка размера файла */

52    msgsize = MSGSIZE(attr->mq_msgsize);

53    filesize = sizeof(struct mymq_hdr) + (attr->mq_maxmsg *

54     (sizeof(struct mymsg_hdr) + msgsize));

55    if (lseek(fd, filesize – 1, SEEK_SET) == –1)

56     goto err;

57    if (write(fd, "", 1) == –1)

58     goto err;

59    /* отображение файла в память */

60    mptr = mmap(NULL, filesize, PROT_READ | PROT_WRITE,

61     MAP_SHARED, fd, 0);

62    if (mptr == MAP_FAILED)

63     goto err;

64    /* выделение структуры mymq_info{} для очереди */

65    if ((mqinfo = mallос (sizeof (struct mymq_info))) == NULL)

66     goto err;

67    mqinfo->mqi_hdr = mqhdr = (struct mymq_hdr *) mptr;

68    mqinfo->mqi_magic = MQI_MAGIC;

69    mqinfo->mqi_flags = nonblock;

70    /* инициализация заголовка в начале файла */

71    /* создание списка пустых сообщений */

72    mqhdr->mqh_attr.mq_flags = 0;

73    mqhdr->mqh_attr.mq_maxmsg = attr->mq_maxmsg;

74    mqhdr->mqh_attr.mq_msgsize = attr->mq_msgsize;

75    mqhdr->mqh_attr.mq_curmsgs = 0;

76    mqhdr->mqh_nwait = 0;

77    mqhdr->mqh_pid = 0;

78    mqhdr->mqh_head = 0;

79    index = sizeof(struct mymq_hdr);

80    mqhdr->mqh_free = index;

81    for (i = 0; i < attr->mq_maxmsg – 1; i++) {

82     msghdr = (struct mymsg_hdr *) &mptr[index];

83     index += sizeof(struct mymsg_hdr) + msgsize;

84     msghdr->msg_next = index;

85    }

86    msghdr = (struct mymsg_hdr *) &mptr[index];

87    msghdr->msg_next = 0; /* конец списка пустых сообщений */

88    /* инициализация взаимного исключения и условной переменной */

89    if ((i = pthread_mutexattr_init(&mattr)) != 0)

90     goto pthreaderr;

91    pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);

92    i = pthread_mutex_init(&mqhdr->mqh_lock, &mattr);

93    pthread_mutexattr_destroy(&mattr); /* обязательно нужно удалить */

94    if (i != 0)

95     goto pthreaderr:

96    if ((i = pthread_condattr_init(&cattr)) != 0)

97     goto pthreaderr;

98    pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED);

99    i = pthread_cond_init(&mqhdr->mqh_wait, &cattr);

100   pthread_condattr_destroy(&cattr); /* обязательно нужно удалить */

101   if (i != 0)

102    goto pthreaderr;

103   /* инициализация завершена, снимаем бит user-execute */

104   if (fchmod(fd, mode) == –1)

105    goto err;

106   close(fd);

107   return((mymqd_t) mqinfo);

108  }

Установка размера файла

51-58 Вычисляется размер сообщения, который затем округляется до кратного размеру длинного целого. Также в файле отводится место для структуры mq_hdr в начале файла и msghdr в начале каждого сообщения (рис. 5.2). Размер вновь созданного файла устанавливается функцией lseek и записью одного байта со значением 0. Проще было бы вызвать ftruncate (раздел 13.3), но у нас нет гарантий, что это сработало бы для увеличения размера файла.

Отображение файла в память

59-63 Файл отображается в память функцией mmap.

Выделение памяти под структуру mq_info

64-66 При каждом вызове mq_open создается отдельный экземпляр mq_infо. Эта структура после создания инициализируется.

Инициализация структуры mq_hdr

67-87 Инициализируется структура mq_hdr. Заголовок связного списка сообщений (mqh_head) инициализируется нулем, а все сообщения в очереди добавляются к списку свободных (mqh_frее).

Инициализация взаимного исключения и условной переменной

88-102 Поскольку очереди сообщений Posix могут использоваться совместно произвольным количеством процессов, которые знают имя очереди и имеют соответствующие разрешения, нам нужно инициализировать взаимное исключение и условную переменную с атрибутом PTHREAD_PROCESS_SHARED. Для этого мы сначала инициализируем атрибуты вызовом pthread_mutexattr_init, а затем устанавливаем значение атрибута совместного использования процессами, вызвав pthread_mutexattr_setpshared. После этого взаимное исключение инициализируется вызовом pthread_mutex_init. Аналогичные действия выполняются для условной переменной. Мы должны аккуратно удалить взаимное исключение и условную переменную даже при возникновении ошибки, поскольку вызовы pthread_ mutexattr_init и pthread_condattr_init выделяют под них память (упражнение 7.3).

Сброс бита user-execute

103-107 После инициализации очереди сообщений мы сбрасываем бит user-execute. Это говорит другим процессам о том, что очередь была проинициализирована. Мы также закрываем файл вызовом close, поскольку он был успешно отображен в память и держать его открытым больше нет необходимости.

В листинге 5.19 приведен конец функции mq_open, в котором осуществляется открытие существующей очереди сообщений.

Листинг 5.19. Третья часть функции mq_open: открытие существующей очереди сообщений

//my_pxmsg_mmap/mq_open.с

109 exists:

110  /* открытие файла и отображение его в память */

111  if ((fd = open(pathname, O_RDWR)) < 0) {

112   if (errno == ENOENT && (oflag & O_CREAT))

113    goto again;

114  goto err;

115  }

116  /* проверяем, что инициализация завершена */

117  for (i = 0; i < MAX TRIES; i++) {

118   if (stat(pathname, &statbuff) == –1) {

119    if (errno == ENOENT && (oflag & O_CREAT)) {

120     close(fd);

121     goto again;

122    }

123    goto err;

124   }

125   if ((statbuff.st_mode & S_IXUSR) == 0)

126   break;

127   sleep(1);

128  }

129  if (i == MAX_TRIES) {

130   errno = ETIMEDOUT;

131   goto err;

132  }

133  filesize = statbuff.st_size;

134  mptr = mmap(NULL, filesize, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);

135  if (mptr == MAP_FAILED)

136   goto err;

137  close(fd);

138  /* выделяем одну mymq_info{} для каждого вызова open */

139  if ((mqinfo = malloc(sizeof(struct mymq_info))) == NULL)

140   goto err;

141  rnqinfo->mqi_hdr = (struct mymq_hdr *) mptr;

142  mqinfo->mqi_magic = MQI_MAGIC;

143  mqinfo->mqi_flags = nonblock;

144  return((mymqd_t) mqinfo);

145 pthreaderr:

146  errno = i;

147 err:

148  /* не даем следующим вызовам изменить errno */

149  save_errno = errno;

150  if (created)

151   unlink(pathname);

152  if (mptr != MAP_FAILED)

153   munmap(mptr, filesize);

154  if (mqinfo != NULL)

155   free(mqinfo);

156  close(fd);

157  errno = save_errno;

158  return((mymqd_t) –1);

159 }

Открытие существующей очереди сообщений

109-115 Здесь мы завершаем работу, если флаг O_CREAT не был указан или если он был указан, но очередь уже существовала. В любом случае, мы открываем существующую очередь сообщений. Для этого мы открываем для чтения и записи файл, в котором она содержится, функцией open и отображаем его содержимое в адресное пpocтрaнcтвo процесса (mmap).

ПРИМЕЧАНИЕ

Наша реализация сильно упрощена в том, что касается режима открытия файла. Даже если вызвавший процесс указывает флаг O_RDONLY, мы должны дать возможность доступа для чтения и записи при открытии файла командой open и при отображении его в память командой mmap, поскольку невозможно считать сообщение из очереди, не изменив содержимое файла. Аналогично невозможно записать сообщение в очередь, не имея доступа на чтение. Обойти эту проблему можно, сохранив режим открытия (O_RDONLY, O_WRONLY, O_RDWR) в структуре mq_info и проверяя этот режим в каждой из функций. Например, mq_receive должна возвращать ошибку, если в mq_info хранится значение O_WRONLY.

Проверка готовности очереди

116-132 Нам необходимо дождаться, когда очередь будет проинициализирована (в случае, если несколько потоков сделают попытку открыть ее приблизительно одновременно). Для этого мы вызываем stat и проверяем разрешения доступа к файлу (поле st_mode структуры stat). Если бит user-execute сброшен, очередь уже проинициализирована.

Этот участок кода обрабатывает другую возможную ситуацию гонок. Предположим, что два потока разных процессов попытаются открыть очередь приблизительно одновременно. Первый поток создает файл и блокируется при вызове lseek в листинге 5.18. Второй поток обнаруживает, что файл уже существует, и переходит к метке exists, где он вновь открывает файл функцией open и при этом блокируется. Затем продолжается выполнение первого потока, но его вызов mmap в листинге 5.18 не срабатывает (возможно, он превысил лимит использования памяти), поэтому он переходит на метку err и удаляет созданный файл вызовом unlink. Продолжается выполнение второго потока, но если бы мы вызывали fstat вместо stat, он бы вышел по тайм-ауту в цикле for, ожидая инициализации файла. Вместо этого мы вызываем stat, которая возвращает ошибку, если файл не существует, и, если флаг O_CREAT был указан при вызове mq_open, мы переходим на метку again (листинг 5.17) для повторного создания файла. Эта ситуация гонок заставляет нас также проверять, не возвращается ли при вызове open ошибка ENOENT.

Отображение файла в память; создание и инициализация структуры mq_info

133-144 Файл отображается в память, после чего его дескриптор может быть закрыт. Затем мы выделяем место под структуру mq_infо и инициализируем ее. Возвращаемое значение представляет собой указатель на эту структуру.

Обработка ошибок

145-148 При возникновении ошибок происходит переход к метке err, а переменной errno присваивается значение, которое должно быть возвращено функцией mq_open. Мы аккуратно вызываем функции для очистки памяти от выделенных объектов, чтобы переменная errno не изменила свое значение в случае возникновения ошибки в этих функциях.

Функция mq_close

В листинге 5.20 приведен текст нашей функции mq_close.

Листинг 5.20. Функция mq_close

//my_pxmsg_mmap/mq_close.с

1  #include "unpipc.h"

2  #include "mqueue.h"


3  int

4  mymq_close(mymqd_t mqd)

5  {

6   long msgsize, filesize:

7   struct mymq_hdr *mqhdr;

8   struct mymq_attr *attr;

9   struct mymq_info *mqinfo;

10  mqinfo = mqd;

11  if (mqinfo->mqi_magic != MQI_MAGIC) {

12   errno = EBADF;

13   return(-1);

14  }

15  mqhdr = mqinfo->mqi_hdr;

16  attr = &mqhdr->mqh_attr;

17  if (mymq_notify(mqd, NULL) != 0) /* снятие вызвавшего процесса с регистрации */

18   return(-1);

19  msgsize = MSGSIZE(attr->mq_msgsize);

20  filesize = sizeof(struct mymq_hdr) + (attr->mq_maxmsg *

21   (sizeof(struct mymsg_hdr) + msgsize));

22  if (munmap(mqinfo->mqi_hdr, filesize) == –1)

23   return(-1);

24  mqinfo->mqi_magic = 0; /* на всякий случай */

25  free(mqinfo);

26  return(0);

27 }

Получение указателей на структуры

10-16 Проверяется правильность переданных аргументов, после чего получаются указатели на область, занятую отображенным в память файлом (mqhdr), и атрибуты (в структуре mq_hdr).

Сброс регистрации вызвавшего процесса

17-18 Для сброса регистрации на уведомление вызвавшего процесса мы вызываем mq_notify. Если процесс был зарегистрирован, он будет снят с уведомления, но если нет — ошибка не возвращается.

Отключение отображения файла и освобождение памяти

19-25 Мы вычисляем размер файла для вызова munmap и освобождаем память, используемую структурой mqinfo. На случай, если вызвавший процесс будет продолжать использовать дескриптор очереди сообщений, до того как область памяти будет вновь задействована вызовом malloc, мы устанавливаем значение mq_magiс в ноль, чтобы наши функции для работы с очередью сообщений обнаруживали ошибку.

Обратите внимание, что если процесс завершает работу без вызова mq_close, эти же операции выполняются автоматически: отключается отображение в память, а память освобождается.

Функция mq_unlink

Текст функции mqunlink приведен в листинге 5.21. Она удаляет файл, связанный с очередью сообщений, вызывая функцию unlink.

Листинг 5.21. Функция mq_unlink

//my_pxmsg_mmap/mq_unlink.с

1 #include "unpipc.h"

2 #include "mqueue.h"


3 int

4 mymq_unlink(const char *pathname)

5 {

6  if (unlink(pathname) == –1)

7   return(-1);

8  return(0);

9 }

Функция mq_getattr

В листинге 5.22 приведен текст функции mq_getattr, которая возвращает текущее значение атрибутов очереди.

Листинг 5.22. Функция mq_getattr

//my_pxmsg_mmap/mq_getattr.с

1  #include "unpipc.h"

2  #include "mqueue.h"


3  int

4  mymq_getattr(mymqd_t mqd, struct mymq_attr *mqstat)

5  {

6   int n;

7   struct mymq_hdr *mqhdr;

8   struct mymq_attr *attr;

9   struct mymq_info *mqinfo;

10  mqinfo = mqd;

11  if (mqinfo->mqi_magic != MQI_MAGIC) {

12   errno = EBADF;

13   return(-1);

14  }

15  mqhdr = mqinfo->mqi_hdr;

16  attr = &mqhdr->mqh_attr;

17  if ((n = pthread_mutex_lock(&mqhdr->mqh_lock)) != 0) {

18   errno = n;

19   return (-1);

20  }

21  mqstat->mq_flags = mqinfo->mqi_flags; /* для каждого open */

22  mqstat->mq_maxmsg = attr->mq_maxmsg; /* оставшиеся три – для очереди */

23  mqstat->mq_msgsize = attr->mq_msgsize;

24  mqstat->mq_curmsgs = attr->mq_curmsgs;

25  pthread_mutex_unlock(&mqhdr->mqh_lock);

26  return(0);

27 }

Блокирование взаимного исключения

17-20 Мы должны заблокировать соответствующее взаимное исключение для работы с очередью, в частности для получения атрибутов, поскольку какой-либо другой поток может в это время их изменить.

Функция mq_setattr

В листинге 5.23 приведен текст функции mq_setattr, которая устанавливает значение атрибутов очереди.

Считывание текущих атрибутов

22-27 Если третий аргумент представляет собой ненулевой указатель, мы возвращаем предыдущее значение атрибутов перед внесением каких-либо изменений.

Изменение mq_flags

28-31 Единственный атрибут, который можно менять с помощью нашей функции, — mq_flags, хранящийся в структуре mq_infо.

Листинг 5.23. Функция mq_setattr

//my_pxmsg_mniap/mq_setattr.с

1  #include "unpipc.h"

2  #include "mqueue.h"


3  int

4  mymq_setattr(mymqd_t mqd. const struct mymq_attr *mqstat,

5   struct mymq attr *omqstat)

6  {

7   int n;

8   struct mymq_hdr *mqhdr;

9   struct mymq_attr *attr;

10  struct mymq_info *mqinfo;

11  mqinfo = mqd;

12  if (mqinfo->mqi_magic != MQI_MAGIC) {

13   errno = EBADF;

14   return(-1);

15  }

16  mqhdr = mqinfo->mqi_hdr;

17  attr = &mqhdr->mqh_attr;

18  if ((n = pthread_mutex_lock(&mqhdr->mqh_lock)) ! = 0) {

19   errno = n;

20   return(-1);

21  }

22  if (omqstat != NULL) {

23   omqstat->mq_flags = mqinfo->mqi_flags; /* исходные атрибуты */

24   omqstat->mq_maxmsg = attr->mq_maxmsg;

25   omqstat->mq_msgsize = attr->mq_msgsize;

26   omqstat->mq_curmsgs = attr->mq_curmsgs; /* текущий статус */

27  }

28  if (mqstat->mq_flags & O_NONBLOCK)

29   mqinfo->mqi flags |= O_NONBLOCK;

30  else

31   mqinfo->ntqi_flags &= ~O_NONBLOCK;

32  pthread_mutex_unlock(&mqhdr->mqh_lock);

33  return(0);

34 }

Функция mq_notify

Функция mq_notify, текст которой приведен в листинге 5.24, позволяет регистрировать процесс на уведомление для текущей очереди и снимать его с регистрации. Информация о зарегистрированных процессах (их идентификаторы) хранится в поле mqh_pid структуры mq_hdr. Только один процесс может быть зарегистрирован на уведомление в любой момент времени. При регистрации процесса мы сохраняем его структуру sigevent в структуре mqh_event.

Листинг 5.24. Функция mq_notify

//my_pxmsg_mmap/mq_notify.с

1  #include "unpipc.h"

2  #include "mqueue.h"


3  int

4  mymq_notify(mymqd_t mqd, const struct sigevent *notification)

5  {

6   int n;

7   pid_t pid;

8   struct mymq_hdr *mqhdr;

9   struct mymq_info *mqinfo;

10  mqinfo = mqd;

11  if (mqinfo->mqi magic != MQI_MAGIC) {

12   errno = EBADF;

13   return(-1);

14  }

15  mqhdr = mqinfo->mqi_hdr;

16  if ((n = pthread_mutex_lock(&mqhdr->mqh_lock)) != 0) {

17   errno = n;

18   return(-1);

19  }

20  pid = getpid();

21  if (notification == NULL) {

22   if (mqhdr->mqh_pid == pid) {

23    mqhdr->mqh_pid = 0; /* снятие вызвавшего процесса с регистрации */

24   } /* если вызвавший процесс не зарегистрирован – 61К */

25  } else {

26   if (mqhdr->mqh_pid != 0) {

27    if (kill(mqhdr->mqh_pid, 0) != –1 || errno != ESRCH) {

28     errno = EBUSY;

29     goto err;

30    }

31   }

32   mqhdr->mqh_pid = pid;

33   mqhdr->mqh_event = *notification;

34  }

35  pthread_mutex_unlock(&mqhdr->mqh_lock);

36  return(0);

37 err:

38  pthread_mutex_unlock(&mqhdr->mqh_lock);

39  return(-1);

40 }

Снятие процесса с регистрации

20-24 Если второй аргумент представляет собой нулевой указатель, вызвавший процесс снимается с регистрации. Если он не зарегистрирован, никакой ошибки не возвращается.

Регистрация вызвавшего процесса

25-34 Если какой-либо процесс уже зарегистрирован, мы проверяем, существует ли он, отправкой ему сигнала с кодом 0 (называемого нулевым сигналом — null signal). Это обычная проверка на возможность ошибки, на самом деле при этом никакого сигнала процессу не отправляется, но при его отсутствии возвращается ошибка с кодом ESRCH. Если какой-либо процесс уже зарегистрирован на уведомление, функция возвращает ошибку EBUSY. В противном случае сохраняется идентификатор процесса вместе с его структурой sigevent.

ПРИМЕЧАНИЕ

Наш метод проверки существования вызвавшего процесса не идеален. Процесс мог завершить работу, а его идентификатор мог быть использован другим процессом.

Функция mq_send

В листинге 5.25 приведен текст первой половины нашей функции mqsend.

Инициализация

14-29 Мы получаем указатели на используемые структуры и блокируем взаимное исключение для данной очереди. Проверяем, не превышает ли размер сообщения максимально допустимый для данной очереди.

Проверка очереди на пустоту и отправка уведомления

30-38 Если мы помещаем первое сообщение в пустую очередь, нужно проверить, не зарегистрирован ли какой-нибудь процесс на уведомление об этом событии и нет ли потоков, заблокированных в вызове mq_receive. Для проверки второго условия мы воспользуемся сохраняемым функцией mq_receive счетчиком mqh_nwait, содержащим количество потоков, заблокированных в вызове mq_receive. Если этот счетчик имеет ненулевое значение, мы не отправляем уведомление зарегистрированному процессу. Для отправки сигнала SIGEV_SIGNAL используется функция sigqueue. Затем процесс снимается с регистрации.

ПРИМЕЧАНИЕ

Вызов sigqueue для отправки сигнала приводит к передаче сигнала SI_QUEUE обработчику сигнала в структуре типа siginfo_t (раздел 5.7), что неправильно. Отправка правильного значения si_code (а именно SI_MESGQ) из пользовательского процесса осуществляется в зависимости от реализации. На с. 433 стандарта IEEE 1996 [8] отмечается, что для отправки этого сигнала из пользовательской библиотеки необходимо воспользоваться скрытым интерфейсом механизма отправки сигналов. 

Проверка заполненности очереди

39-48 Если очередь переполнена и установлен флаг O_NONBLOCK, мы возвращаем ошибку с кодом EAGAIN. В противном случае мы ожидаем сигнала по условной переменной mqh_wait, который, как мы увидим, отправляется функцией mq_receive при считывании сообщения из переполненной очереди.

ПРИМЕЧАНИЕ

Наша реализация упрощает ситуацию с возвращением ошибки EINTR при прерывании вызова mq_send сигналом, перехватываемым вызвавшим процессом. Проблема в том, что функция pthread_cond_wait не возвращает ошибки при возврате из обработчика сигнала: она может вернуть либо 0 (что рассматривается как ложное пробуждение), либо вообще не завершить работу. Все эти проблемы можно обойти, но это непросто. 

В листинге 5.26 приведена вторая половина функции mq_send. К моменту ее выполнения мы уже знаем о наличии в очереди свободного места для нашего сообщения.

Листинг 5.25. Функция mq_send: первая половина

//my_pxmsg_mmap/mq_send.с

1  #include "unpipc.h"

2  #include "mqueue.h"


3  int

4  mymq_send(mymqd_t mqd, const char *ptr, size_t len, unsigned int prio)

5  {

6   int n;

7   long index, freeindex;

8   int8_t *mptr;

9   struct sigevent *sigev;

10  struct mymq_hdr *mqhdr;

11  struct mymq_attr *attr;

12  struct mymsg_hdr *msghdr, *nmsghdr, *pmsghdr;

13  struct mymq_info *mqinfo;

14  mqinfo = mqd;

15  if (mqinfo->mqi_magic != MQI_MAGIC) {

16   errno = EBADF;

17   return(-1);

18  }

19  mqhdr = mqinfo->mqi_hdr; /* указатель типа struct */

20  mptr = (int8_t *) mqhdr; /* указатель на байт */

21  attr = &mqhdr->mqh_attr;

22  if ((n = pthread_mutex_lock(&mqhdr->mqh_lock)) != 0) {

23   errno = n;

24   return(-1);

25  }

26  if (len > attr->mq_msgsize) {

27   errno = EMSGSIZE;

28   goto err;

29  }

30  if (attr->mq_curmsgs == 0) {

31   if (mqhdr->mqh_pid != 0 && mqhdr->mqh_nwait == 0) {

32    sigev = &mqhdr->mqh_event;

33    if (sigev->sigev_notify == SIGEV_SIGNAL) {

34     sigqueue(mqhdr->mqh_pid, sigev->sigev_signo,

35      sigev->sigev_value);

36    }

37    mqhdr->mqh_pid = 0; /* снятие с регистрации */

38   }

39  } else if (attr->mq_curmsgs >= attr->mq_maxmsg) {

40   /* 4queue is full */

41   if (mqinfo->mqi_flags & O_NONBLOCK) {

32    errno = EAGAIN;

43    goto err;

44   }

45   /* ожидание освобождения места в очереди */

46   while (attr->mq_curmsgs >= attr->mq_maxmsg)

47    pthread_cond_wait(&mqhdr->mqh_wait, &mqhdr->mqh_lock);

48  }

Листинг 5.25. Функция mq_send: вторая половина

//my_pxmsg_mmap/mq_send.с

49  /* nmsghdr будет указывать на новое сообщение*/

50  if ((freeindex = mqhdr->mqh_free) == 0)

51   err_dump("mymq_send: curmsgs = %ld; free = 0", attr->mq_curmsgs);

52  nmsghdr = (struct mymsg_hdr *) &mptr[freeindex];

53  nmsghdr->msg_prio = prio;

54  nmsghdr->msg_len = len;

55  memcpy(nmsghdr + 1, ptr, len); /* копирование сообщения в очередь */

56  mqhdr->mqh_free = nmsghdr->msg_next; /* новое начало списка пустых сообщений */

57  /* поиск места в списке для нового сообщения */

58  index = mqhdr->mqh_head;

59  pmsghdr = (struct mymsg_hdr *) &(mqhdr->mqh_head);

60  while (index != 0) {

61   msghdr = (struct mymsg_hdr *) &mptr[index];

62   if (prio > msghdr->msg_prio) {

63    nmsghdr->msg_next = index;

64    pmsghdr->msg_next = freeindex;

65    break;

66   }

67   index = msghdr->msg_next;

68   pmsghdr = msghdr;

69  }

70  if (index == 0) {

71   /* очередь была пуста или новое письмо добавлено к концу списка */

72   pmsghdr->msg_next = freeindex;

73   nmsghdr->msg_next = 0;

74  }

75  /* запускаем любой из процессов, заблокированных в mq_receive */

76  if (attr->mq_curmsgs == 0)

77   pthread_cond_signal(&mqhdr->mqh_wait);

78  attr->mq_curmsgs++;

79  pthread_mutex_unlock(&mqhdr->mqh_lock);

80  return(0);

81 err:

82  pthread_mutex_unlock(&mqhdr->mqh lock);

83  return(-1);

84 }

Получение индекса свободного блока

50-52 Поскольку количество свободных сообщений при создании очереди равно mq_maxmsg, ситуация, в которой mq_curmsgs будет меньше mq_maxmsg для пустого списка свободных сообщений, возникнуть не может.

Копирование сообщения

53-56 Указатель nmsghdr хранит адрес области памяти, в которую помещается сообщение. Приоритет и длина сообщения сохраняются в структуре msg_hdr, а затем в память копируется содержимое сообщения, переданного вызвавшим процессом.

Помещение нового сообщения в соответствующее место связного списка

57-74 Порядок сообщений в нашем списке зависит от их приоритета: они расположены в порядке его убывания. При добавлении нового сообщения мы проверяем, существуют ли сообщения с тем же приоритетом; в этом случае сообщение добавляется после последнего из них. Используя такой метод упорядочения, мы гарантируем, что mq_receive всегда будет возвращать старейшее сообщение с наивысшим приоритетом. По мере продвижения по списку мы сохраняем в pmsghdr адрес предыдущего сообщения, поскольку именно это сообщение будет хранить индекс нового сообщения в поле msg_next.

ПРИМЕЧАНИЕ

Наша схема может оказаться медленной в случае наличия в очереди большого количества сообщений, поскольку каждый раз при добавлении нового придется просматривать их значительную часть. Можно хранить отдельно индексы последних сообщений со всеми имеющимися значениями приоритета. 

Пробуждение любого процесса, заблокированного в вызове mq_receive

75-77 Если очередь была пуста в момент помещения в нее нового сообщения, мы вызываем pthread_cond_signal, чтобы разблокировать любой из процессов, ожидающих сообщения.

78 Увеличиваем на единицу количество сообщений в очереди mq_curmsgs.

Функция mq_receive

В листинге 5.27 приведен текст первой половины функции mq_receive, которая получает необходимые указатели, блокирует взаимное исключение и проверяет объем буфера вызвавшего процесса, который должен быть достаточным для помещения туда сообщения максимально возможной длины.

Проверка полноты очереди

30-40 Если очередь пуста и установлен флаг O_NONBLOCK, возвращается ошибка с кодом EAGAIN. В противном случае увеличивается значение счетчика mqh_nwait, который проверяется функцией mq_send (листинг 5.25) в случае, если очередь пуста и есть процессы, ожидающие уведомления. Затем мы ожидаем сигнала по условной переменной, который будет передан функцией mq_send (листинг 5.26).

ПРИМЕЧАНИЕ

Наша реализация mq_receive, как и реализация mq_send, упрощает ситуацию с ошибкой EINTR, возвращаемой при прерывании ожидания сигналом, перехватываемым вызвавшим процессом.

В листинге 5.28 приведен текст второй половины функции mq_receive. Мы уже знаем, что в очереди есть сообщение, которое можно будет возвратить вызвавшему процессу.

Листинг 5.27.Функция mq_receive: первая половина

//my_pxmsg_mmap/mq_receive.с

1  #include "unpipc.h"

2  #include "mqueue.h"


3  ssize_t

4  mymq_receive(mymqd_t mqd, char *ptr, size_t maxlen, unsigned int *priop)

5  {

6   int n;

7   long index;

8   int8_t *mptr;

9   ssize_t len;

10  struct mymq_hdr *mqhdr;

11  struct mymq_attr *attr;

12  struct mymsg_hdr *msghdr;

13  struct mymq_info *mqinfo;

14  mqinfo = mqd;

15  if (mqinfo->mqi_magic != MQI_MAGIC) {

16   errno = EBADF;

17   return(-1);

18  }

19  mqhdr = mqinfo->mqi_hdr; /* указатель struct */

20  mptr = (int8_t *) mqhdr; /* указатель на байт */

21  attr = &mqhdr->mqh_attr;

22  if ((n = pthread_mutex_lock(&mqhdr->mqh_lock)) != 0) {

23   errno = n;

24   return(-1);

25  }

26  if (maxlen < attr->mq_msgsize) {

27   errno = EMSGSIZE;

28   goto err;

29  }

30  if (attr->mq_curmsgs = 0) { /* очередь пуста */

31   if (mqinfo->mqi_flags & O_NONBLOCK) {

32    errno = EAGAIN;

33    goto err;

34   }

35   /* ожидаем помещения сообщения в очередь */

36   mqhdr->mqh_nwait++;

37   while (attr->mq_curmsgs == 0)

38    pthread_cond_wait(&mqhdr->mqh_wait, &mqhdr->mqh_lock);

39   mqhdr->mqh_nwait--;

40  }

Листинг 5.28. Функция mq_receive: вторая половина

//my_pxmsg_mmap/mq_receive.c

41  
if ((index = mqhdr->mqh_head) == 0)

42   err_dump("mymq_receive: curmsgs = %ld; head = 0", attr->mq_curmsgs);

43  msghdr = (struct mymsg_hdr *) &mptr[index];

44  mqhdr->mqh_head = msghdr->msg_next; /* новое начало списка */

45  len = msghdr->msg_len;

46  memcpy(ptr, msghdr + 1, len); /* копирование самого сообщения */

47  if (priop != NULL)

48   *priop = msghdr->msg_prio;

49  /* только что считанное сообщение становится первым в списке пустых */

50  msghdr->msg_next = mqhdr->mqr_free;

51  mqhdr->mqh_free = index;

52  /* запуск любого процесса, заблокированного в вызове mq_send */

53  if (attr->mq_curmsgs == attr->mq_maxmsg)

54   pthread_cond_signal(&mqhdr->mqh_wait);

55  attr->mq_curmsgs--;

56  pthread_mutex_unlock(&mqhdr->mqh_lock);

57  return(len);

58 err:

59  pthread_mutex_unlock(&mqhdr->mqh_lock);

60  return(-1);

61 }

Возвращение сообщения вызвавшему процессу

43-51 msghdr указывает на msg_hdr первого сообщения в очереди, которое мы и возвратим. Освободившееся сообщение становится первым в списке свободных. 

Разблокирование процесса, заблокированного в вызове mq_send

52-54 Если очередь была полной в момент считывания сообщения, мы вызываем pthread_cond_signal для отправки сообщения любому из процессов, заблокированных в вызове mq_send.

5.9. Резюме

Очереди сообщений Posix просты в использовании: новая очередь создается (или существующая открывается) функцией mq_open; закрываются очереди вызовом mq_close, а удаляются mq_unlink. Поместить сообщение в очередь можно функцией mq_send, а считать его оттуда можно с помощью mq_receive. Атрибуты очереди можно считать и установить с помощью функций mq_getattr и mq_setattr, а функция mq_notify позволяет зарегистрировать процесс на уведомление о помещении нового сообщения в пустую очередь. Каждое сообщение в очереди обладает приоритетом (небольшое целое число), и функция mq_receive всегда возвращает старейшее сообщение с наивысшим приоритетом.

Изучая mq_notify, мы познакомились с сигналами реального времени стандарта Posix, которые обладают номерами от SIGMIN до SIGMAX. При установке обработчика для этих сигналов с флагом SA_SIGINFO они будут помещаться в очередь, доставляться в порядке очереди и сопровождаться двумя дополнительными аргументами (при вызове обработчика).

Наконец, мы реализовали большую часть возможностей очереди сообщений Posix в приблизительно 500 строках кода на языке С, используя отображаемые в память файлы, взаимные исключения и условные переменные Posix. Эта реализация иллюстрирует обработку ситуации гонок при создании новой очереди; еще раз нам придется столкнуться с такой ситуацией в главе 10 при реализации семафоров Posix.

Упражнения

1.  Говоря о листинге 5.4, мы отметили, что атрибут attr функции mq_open при создании новой очереди является ненулевым; следует указать оба поля: mq_maxmsg и mq_msgsize. Как можно было бы указать только одно из них, не указывая второе, для которого использовать значения атрибутов по умолчанию?

2. Измените листинг 5.8 так, чтобы при получении сигнала не вызывалась функция mq_notify. Затем поместите в очередь два сообщения и убедитесь, что для второго из них сигнал порожден не будет. Почему?

3. Измените листинг 5.8 так, чтобы сообщение из очереди при получении сигнала не считывалось. Вместо этого просто вызовите mq_notify и напечатайте сообщение о получении сигнала. Затем отправьте два сообщения и убедитесь, что для второго из них сигнал не порождается. Почему?

4. Что произойдет, если мы уберем преобразование двух констант к целому типу в первом вызове printf в листинге 5.14? 

5. Измените листинг 5.4 следующим образом: перед вызовом mq_open напечатайте сообщение и подождите 30 секунд (sleep). После возвращения из mq_open выведите еще одно сообщение и подождите еще 30 секунд, а затем вызовите mq_close. Откомпилируйте программу и запустите ее, указав большое количество сообщений (несколько сотен тысяч) и максимальный размер сообщения, скажем, в 10 байт. Задача заключается в том, чтобы создать большую очередь и проверить, используются ли в реализации отображаемые в память файлы. В течение 30-секундной паузы запустите программу типа ps и посмотрите на занимаемый программой объем памяти. Сделайте это еще раз после возвращения из mq_open. Можете ли вы объяснить происходящее?

6. Что произойдет при вызове memcpy в листинге 5.26, если вызвавший процесс укажет нулевую длину сообщения?

7. Сравните очередь сообщений с двусторонними каналами, описанными в разделе 4.4. Сколько очередей нужно для двусторонней связи между родительским и дочерним процессами?

8. Почему мы не удаляем взаимное исключение и условную переменную в листинге 5.20?

9. Стандарт Posix утверждает, что дескриптор очереди сообщений не может иметь тип массива. Почему? 

10. В каком состоянии проводит большую часть времени функция main из листинга 5.12? Что происходит каждый раз при получении сигнала? Как мы обрабатываем эту ситуацию?

11. Не все реализации поддерживают атрибут PTHREAD_PROCESS_SHARED для взаимных исключений и условных переменных. Переделайте реализацию очередей сообщений из раздела 5.8 так, чтобы использовать семафоры Posix (глава 10) вместо взаимных исключений и условных переменных.

12. Расширьте реализацию очередей сообщений Posix из раздела 5.8 так, чтобы она поддерживала SIGEV_THREAD. 

ГЛАВА 6