UNIX: взаимодействие процессов — страница 12 из 35

Семафоры Posix

10.1.Введение

Семафор представляет собой простейшее средство синхронизации процессов и потоков. Мы рассматриваем три типа семафоров:

■ именованные семафоры Posix, идентифицируемые именами, соответствующими стандарту Posix для IPC (см. раздел 2.2);

■ размещаемые в разделяемой памяти семафоры Posix;

■ семафоры System V (глава 11), обслуживаемые ядром.

Все три типа семафоров могут использоваться для синхронизации как отдельных процессов, так и потоков одного процесса. Мы начнем с рассмотрения проблем синхронизации между разными процессами с помощью бинарного семафора, то есть такого, который может принимать только значения 0 и 1. Пример подобной схемы приведен на рис. 10.1.

Рис. 10.1. Два процесса взаимодействуют с помощью бинарного семафора


На этом рисунке изображен бинарный семафор, хранящийся в ядре (семафор System V).

Семафоры Posix не обязательно должны обрабатываться ядром. Их особенностью является наличие имен, которые могут соответствовать именам реальных файлов в файловой системе. На рис. 10.2 изображена схема, лучше иллюстрирующая предмет обсуждения данной главы — именованный семафор Posix.

Рис. 10.2. Два процесса, использующие бинарный именованный семафор Posix


ПРИМЕЧАНИЕ

В отношении рис. 10.2 необходимо сделать одно уточнение: хотя именованные семафоры Posix обладают именами в файловой системе, они не обязательно должны храниться в файлах. Во встроенной системе реального времени значение семафора, скорее всего, будет размещаться в ядре, а имя файла будет использоваться исключительно для идентификации семафора. При реализации с помощью отображения файлов в память (пример такой реализации приведен в разделе 10.15) значение семафора будет действительно храниться в файле, который будет отображаться в адресное пространство всех процессов, использующих семафор. 

На рис. 10.1 и 10.2 мы указали три операции, которые могут быть применены к семафорам:

1. Создание семафора. При этом вызвавший процесс должен указать начальное значение (часто 1, но может быть и 0).

2. Ожидание изменения значения семафора (wait). При этом производится проверка его значения и процесс блокируется, если значение оказывается меньшим либо равным 0, а при превышении 0 значение уменьшается на 1. Это может быть записано на псевдокоде как

while (semaphore_value <= 0); /* wait: т.е. поток или процесс блокируется */

semaphore_value--; /* семафор разрешает выполнение операций */

Основным требованием является атомарность выполнения операций проверки значения в цикле while и последующего уменьшения значения семафора (то есть как одной операции) по отношению к другим потокам (это одна из причин, по которой семафоры System V были реализованы в середине 80-х как часть ядра. Поскольку операции с ними выполнялись с помощью системных вызовов, легко было гарантировать их атомарность по отношению к другим процессам).

У этой операции есть несколько общеупотребительных имен. Изначально она называлась Р, от голландского proben (проверка, попытка), — это название было введено Эдсгером Дейкстрой. Используются также и термины down (поскольку значение семафора уменьшается) и lock, но мы будем следовать стандарту Posix и говорить об ожидании (wait).

3. Установка значения семафора (post). Значение семафора увеличивается одной командой, которая может быть записана на псевдокоде как

semaphore_value++;

Если в системе имеются процессы, ожидающие изменения значения семафора до величины, превосходящей 0, один из них может быть пробужден. Как и операция ожидания, операция установки значения семафора также должна быть атомарной по отношению к другим процессам, работающим с этим семафором.

Для этой операции также имеется несколько общеупотребительных терминов. Изначально она называлась V, от голландского verhogen (увеличивать). Называют ее up (значение семафора увеличивается), unlock и signal. Мы, следуя стандарту Posix, называем эту операцию post.

Очевидно, что реальный код для работы с семафором будет более сложным, чем приведенный выше. Все процессы, ожидающие изменения какого-либо семафора, должны помещаться в очередь, и один из них должен запускаться при выполнении требуемого условия. К счастью, это обеспечивается реализацией.

Обратите внимание, что приведенный псевдокод не ограничен в применении только бинарными семафорами. Код работает с семафором, инициализируемым любым неотрицательным значением. Такие семафоры называют также семафорами-счетчиками. Обычно они инициализируются некоторым значением N, которое указывает количество доступных ресурсов (например, буферов). В этой главе есть примеры использования как бинарных семафоров, так и семафоров-счетчиков.

ПРИМЕЧАНИЕ

Мы часто проводим различие между бинарными и многозначными семафорами, но делаем это исключительно в образовательных целях. В системной реализации семафоров никакой разницы нет.

Бинарный семафор может использоваться в качестве средства исключения (подобно взаимному исключению). В листинге 10.1 приведен пример для сравнения этих средств.

Листинг 10.1. Сравнение бинарных семафоров и взаимных исключений

инициализация взаимного исключения;  инициализация семафора единицей;

pthread_mutex_lock(&mutex);          sem_wait(&sem);

критическая область                  критическая область

pthread_mutex_unlock(&mutex);        sem_post(&sem);

Мы инициализируем семафор значением 1. Вызвав sem_wait, мы ожидаем, когда значение семафора окажется больше 0, а затем уменьшаем его на 1. Вызов sem_post увеличивает значение с 0 до 1 и возобновляет выполнение всех потоков, заблокированных в вызове sem_wait для данного семафора.

Хотя семафоры и могут использоваться в качестве взаимных исключений, они обладают некоторыми особенностями: взаимное исключение должно быть разблокировано именно тем потоком, который его заблокировал, в то время как увеличение значения семафора может быть выполнено другим потоком. Можно привести пример использования этой особенности для решения упрощенной версии задачи потребителей и производителей из главы 7 с двумя бинарными семафорами. На рис. 10.3 приведена схема с одним производителем, помещающим объект в общий буфер, и одним потребителем, изымающим его оттуда. Для простоты предположим, что в буфер помещается ровно один объект. 

Рис. 10.3. Задача производителя и потребителя с общим буфером


В листинге 10.2 приведен текст соответствующей программы на псевдокоде.

Листинг 10.2. Псевдокод для задачи производителя и потребителя

Producer                                Consumer

инициализация семафора get значением 0;

инициализация семафора put значением 1;

for (;;) {                              for (;;) {

 sem_wait(&put);                         sem_wait(&get);

 помещение данных в буфер;               обработка данных в буфере;

 sem_post(&get);                         sem_post(&put);

}                                       }

Семафор put oгрaничивaeт возможность помещения объекта в общий буфер, а семафор get управляет потребителем при считывании объекта из буфера. Работает эта пpoгрaммa в такой последовательности:

1. Производитель инициализирует буфер и два семафора.

2. Пусть после этого запускается потребитель. Он блокируется при вызове sem_wait, поскольку семафор get имеет значение 0.

3. После этого запускается производитель. При вызове sem_wait значение put уменьшается с 1 до 0, после чего производитель помещает объект в буфер. Вызовом sem_post значение семафора get увеличивается с 0 до 1. Поскольку имеется поток, заблокированный в ожидании изменения значения этого семафора, этот поток помечается как готовый к выполнению. Предположим, тем не менее, что производитель продолжает выполняться. В этом случае он блокируется при вызове sem_wait в начале цикла for, поскольку значение семафора put — 0. Производитель должен подождать, пока потребитель не извлечет данные из буфера.

4. Потребитель возвращается из sem_wait, уменьшая значение семафора get с 0 до 1. Затем он обрабатывает данные в буфере и вызывает sem_post, увеличивая значение put с 0 до 1. Заблокированный в ожидании изменения значения этого семафора поток-производитель помечается как готовый к выполнению. Предположим опять, что выполнение потребителя продолжается. Тогда он блокируется при вызове sem_wait в начале цикла for, поскольку семафор get имеет значение 0.

5. Производитель возвращается из sem_wait, помещает данные в буфер, и все повторяется.

Мы предполагали, что каждый раз при вызове sem_post продолжалось выполнение вызвавшего эту функцию потока, несмотря на то что ожидающий изменения значения семафора поток помечался как готовый к выполнению. Никаких изменений в работе программы не произойдет, если вместо вызвавшего sem_post потока будет выполняться другой, ожидавший изменения состояния семафора (исследуйте такую ситуацию и убедитесь в этом самостоятельно).

Перечислим три главных отличия семафоров и взаимных исключений в паре с условными переменными:

1. Взаимное исключение всегда должно разблокироваться тем потоком, который установил блокировку, тогда как увеличение значения семафора не обязательно осуществляется ожидающим его изменения потоком. Это мы только что продемонстрировали на примере.

2. Взаимное исключение может быть либо заблокировано, либо разблокировано (пара состояний, аналогично бинарному семафору).

3. Поскольку состояние семафора хранится в определенной переменной, изменение его значения оказывает влияние на процессы, которые вызовут функцию wait уже после этого изменения, тогда как при отправке сигнала по условной переменной в отсутствие ожидающих его потоков сигнал будет утерян. Взгляните на листинг 10.2 и представьте, что при первом проходе цикла производителем потребитель еще не вызвал sem_wait. Производитель сможет поместить объект в буфер, вызвать sem_post для семафора get (увеличивая его значение с 0 до 1), а затем он заблокируется в вызове sem_wait для семафора put. Через некоторое время потребитель дойдет до цикла for и вызовет sem_wait для переменной get, что уменьшит значение этого семафора с 1 до 0, а затем потребитель приступит к обработке содержимого буфера.

ПРИМЕЧАНИЕ

В Обосновании Posix.1 (Rationale) содержится следующий комментарий по поводу добавления семафоров помимо взаимных исключений и условных переменных: «Семафоры включены в стандарт в первую очередь с целью предоставить средства синхронизации выполнения процессов; эти процессы могут и не использовать общий сегмент памяти. Взаимные исключения и условные переменные описаны как средства синхронизации потоков, у которых всегда есть некоторое количество общей памяти. Оба метода широко используются уже много лет. Каждое из этих простейших средств имеет свой предпочтительный круг задач». В разделе 10.15 мы увидим, что для реализации семафоров-счетчиков с живучестью ядра требуется написать около 300 строк кода на С, использующего взаимные исключения и условные переменные. Несмотря на предпочтительность применения семафоров для синхронизации между процессами и взаимных исключений для синхронизации между потоками, и те и другие могут использоваться в обоих случаях. Следует пользоваться тем набором средств, который удобен в данном приложении.

Выше мы отмечали, что стандартом Posix описано два типа семафоров: именованные (named) и размещаемые в памяти (memory-based или unnamed). На рис. 10.4 сравниваются функции, используемые обоими типами семафоров.

Именованный семафор Posix был изображен на рис. 10.2. Неименованный, или размещаемый в памяти, семафор, используемый для синхронизации потоков одного процесса, изображен на рис. 10.5.

Рис. 10.4. Вызовы для семафоров Posix


Рис. 10.5. Семафор, размещенный в общей памяти двух потоков


На рис. 10.6 изображен размещенный в разделяемой памяти семафор (часть 4), используемый двумя процессами. Общий сегмент памяти принадлежит адресному пространству обоих процессов.

Рис. 10.6. Семафор, размещенный в разделяемой двумя процессами памяти


В этой главе сначала рассматриваются именованные семафоры Posix, а затем — размещаемые в памяти. Мы возвращаемся к задаче производителей и потребителей из раздела 7.3 и расширяем ее, позволяя нескольким производителям работать с одним потребителем, а в конце концов переходим к нескольким производителям и нескольким потребителям. Затем мы покажем, что часто используемый при реализации ввода-вывода метод множественных буферов является частным случаем задачи производителей и потребителей.

Мы рассмотрим три реализации именованных семафоров Posix: с использованием каналов FIFO, отображаемых в память файлов и семафоров System V. 

10.2. Функции sem_open, sem_close и sem_unlink

Функция sem_open создает новый именованный семафор или открывает существующий. Именованный семафор может использоваться для синхронизации выполнения потоков и процессов:

#include 

sem_t *sem_open(const char *name, int oflag, …

 /* mode_t mode, unsigned int value */);

/* Возвращает указатель на семафор в случае успешного завершения, SEM_FAILED — в случае ошибки */

Требования к аргументу пате приведены в разделе 2.2.

Аргумент oflag может принимать значения 0, O_CREAT, O_CREAT | O_EXCL, как описано в разделе 2.3. Если указано значение O_CREAT, третий и четвертый аргументы функции являются обязательными. Аргумент mode указывает биты разрешений доступа (табл. 2.3), a value указывает начальное значение семафора. Это значение не может превышать константу SEM_VALUE_MAX, которая, согласно Posix, должна быть не менее 32767. Бинарные семафоры обычно устанавливаются в 1, тогда как семафоры-счетчики чаще инициализируются большими величинами.

При указании флага O_CREAT (без O_EXCL) семафор инициализируется только в том случае, если он еще не существует. Если семафор существует, ошибки не возникнет. Ошибка будет возвращена только в том случае, если указаны флаги O_CREAT | O_EXCL.

Возвращаемое значение представляет собой указатель на тип sem_t. Этот указатель впоследствии передается в качестве аргумента функциям sem_close, sem_wait, sem_trywait, sem_post и sem_getvalue.

ПРИМЕЧАНИЕ

Кажется странным возвращать SEM_FAILED в случае ошибки — нулевой указатель был бы более уместен. В ранних версиях стандарта Posix указывалось возвращаемое значение –1, и во многих реализациях константа SEM_FAILED определена как

#define SEM_FAILED ((sem_t *)(-1))

В Posix.1 мало говорится о битах разрешений, связываемых с семафором при его создании и открытии. Вспомните, мы говорили в связи с табл. 2.2 о том, что для именованных семафоров не нужно даже указывать флаги O_RDONLY, O_WRONLY и O_RDWR. В системах, на которых мы тестируем все программы этой книги (Digital Unix 4.0B и Solaris 2.6), для работы с семафором (его открытия) необходимо иметь к нему доступ как на чтение, так и на запись. Причина, скорее всего, в том, что обе операции, выполняемые с семафором (post и wait), состоят из считывания текущего значения и последующего его изменения. Отсутствие доступа на чтение или запись в этих реализациях приводит к возвращению функцией sem_open ошибки EACCESS ("Permission denied"). 

Открыв семафор с помощью sem_open, можно потом закрыть его, вызвав sem_close:

#include 

int sem_close(sem_t *sem);

/* Возвращает 0 в случае успешного завершения. –1 – в случае ошибки */

Операция закрытия выполняется автоматически при завершении процесса для всех семафоров, которые были им открыты. Автоматическое закрытие осуществляется как при добровольном завершении работы (вызове exit или _exit), так и при принудительном (с помощью сигнала).

Закрытие семафора не удаляет его из системы. Именованные семафоры Posix обладают по меньшей мере живучестью ядра. Значение семафора сохраняется, даже если ни один процесс не держит его открытым.

Именованный семафор удаляется из системы вызовом sem_unlink:

#include 

int sem_unlink(const char *name);

/* Возвращает 0 в случае успешного завершения, –1 – в случае ошибки */

Для каждого семафора ведется подсчет процессов, в которых он является открытым (как и для файлов), и функция sem_unlink действует аналогично unlink для файлов: объект пате может быть удален из файловой системы, даже если он открыт какими-либо процессами, но реальное удаление семафора не будет осуществлено до тех пор, пока он не будет окончательно закрыт.

10.3. Функции sem_wait и sem_trywait

Функция sem_wait проверяет значение заданного семафора на положительность, уменьшает его на единицу и немедленно возвращает управление процессу. Если значение семафора при вызове функции равно нулю, процесс приостанавливается, до тех пор пока оно снова не станет больше нуля, после чего значение семафора будет уменьшено на единицу и произойдет возврат из функции. Ранее мы отметили, что операция «проверка и уменьшение» должна быть атомарной по отношению к другим потокам, работающим с этим семафором:

#include 

int sem_wait(sem_t *sem);

int sem_trywait(sem_t *sem);

/* Обе функции возвращают 0 в случае успешного завершения. –1 – в случае ошибки */

Разница между sem_wait и sem_trywait заключается в том, что последняя не приостанавливает выполнение процесса, если значение семафора равно нулю, а просто немедленно возвращает ошибку EAGAIN.

Возврат из функции sem_wait может произойти преждевременно, если будет получен сигнал. При этом возвращается ошибка с кодом EINTR.

10.4. Функции sem_post и sem_getvalue

После завершения работы с семафором поток вызывает sem_post. Как мы уже говорили в разделе 10.1, этот вызов увеличивает значение семафора на единицу и возобновляет выполнение любых потоков, ожидающих изменения значения семафора:

#include 

int sem_post(sem_t *sem);

int sem_getvalue(sem_t *sem, int *valp);

/* Обе функции возвращают 0 в случае успешного завершения. –1 – в случае ошибки */

Функция sem_getvalue возвращает текущее значение семафора, помещая его в целочисленную переменную, на которую указывает valp. Если семафор заблокирован, возвращается либо 0, либо отрицательное число, модуль которого соответствует количеству потоков, ожидающих разблокирования семафора.

Теперь мы ясно видим отличия семафоров от взаимных исключений и условных переменных. Прежде всего взаимное исключение может быть разблокировано только заблокировавшим его потоком. Для семафоров такого ограничения нет: один из потоков может ожидать изменения значения семафора, чтобы потом уменьшить его с 1 до 0 (действие аналогично блокированию семафора), а другой поток может изменить значение семафора с 0 до 1, что аналогично разблокированию семафора.

Далее, поскольку любой семафор имеет некоторое значение, увеличиваемое операцией post и уменьшаемое операцией wait, поток может изменить его значение (например, увеличить с 0 до 1), даже если нет потоков, ожидающих его изменения. Если же поток вызывает pthread_cond_signal в отсутствие заблокированных при вызове pthread_cond_wait потоков, сигнал просто теряется.

Наконец, среди всех функций, работающих со средствами синхронизации — взаимными исключениями, условными переменными, блокировками чтения-записи и семафорами, только одна может быть вызвана из обработчика сигналов: sem_post.

ПРИМЕЧАНИЕ

Не следует рассматривать приведенный выше текст как доводы в пользу семафоров. Все средства синхронизации, обсуждаемые в этой книге — взаимные исключения, условные переменные, блокировки чтения-записи, семафоры и блокировка fcntl, обладают своими преимуществами и недостатками. Выбирать средства синхронизации для приложения следует с учетом их многочисленных особенностей. Из нашего сравнительного описания можно сделать вывод, что взаимные исключения больше приспособлены для блокировки, условные переменные — для ожидания, а семафоры — для того и другого, и последнее может привести к излишнему усложнению текста программы.

10.5. Простые примеры

В этом разделе мы напишем несколько простых программ, работающих с именованными семафорами Posix. Эти программы помогут нам узнать особенности функционирования и реализации семафоров. Поскольку именованные семафоры Posix обладают по крайней мере живучестью ядра, для работы с ними мы можем использовать отдельные программы.

Программа semcreate

В листинге 10.3 приведен текст программы, создающей именованный семафор. При вызове программы можно указать параметр –е, обеспечивающий исключающее создание (если семафор уже существует, будет выведено сообщение об ошибке), а параметр –i с числовым аргументом позволяет задать начальное значение семафора, отличное от 1. 

Листинг 10.3.[1] Создание именованного семафора

//pxsem/semcreate.c

1  #include "unpipc.h"


2  int

3  main(int argc, char **argv)

4  {

5   int с, flags;

6   sem_t *sem;

7   unsigned int value;

8   flags = O_RDWR | O_CREAT;

9   value = 1;

10  while ((c = Getopt(argc, argv, "ei:")) != –1) {

11   switch (c) {

12   case 'e':

13    flags |= O_EXCL;

14    break;

15   case 'i':

16    value = atoi(optarg);

17    break;

18   }

19  }

20  if (optind != argc – 1)

21   err_quit("usage: semcreate [ –e ] [ –i initialvalue ] ");

22  sem = Sem_open(argv[optind], flags, FILE_MODE, value);

23  Sem_close(sem);

24  exit(0);

25 }

Создание семафора

22 Поскольку мы всегда указываем флаг O_CREAT, нам приходится вызывать sem_open с четырьмя аргументами. Последние два используются только в том случае, если семафор еще не существует.

Закрытие семафора

23 Мы вызываем sem_close, хотя, если бы мы не сделали этот вызов, семафор все равно закрылся бы автоматически при завершении процесса и ресурсы системы были бы высвобождены.

Программа semunlink

Программа в листинге 10.4 удаляет именованный семафор.

Листинг 10.4. Удаление именованного семафора

//pxsem/semunlink.c

1 #include "unpipc.h"


2 int

3 main(int argc, char **argv)

4 {

5  if (argc != 2)

6   err_quit("usage: semunlink ");

7  Sem_unlink(argv[1]);

8  exit(0);

9 }

Программа semgetvalue

В листинге 10.5 приведен текст простейшей программы, которая открывает указанный именованный семафор, получает его текущее значение и выводит его.

Листинг 10.5. Получение и вывод значения семафора

//pxsem/semgetvalue.с

1  #include "unpipc.h"


2  int

3  main(int argc, char **argv)

4  {

5   sem_t *sem;

6   int val;

7   if (argc != 2)

8    err_quit("usage: semgetvalue ");

9   sem = Sem_open(argv[1], 0);

10  Sem_getvalue(sem, &val);

11  printf("value = %d\n", val);

12  exit(0);

13 }

Открытие семафора

9 Семафор, который мы открываем, должен быть заранее создан другой программой. Вторым аргументом sem_open будет 0: мы не указываем флаг O_CREAT и нам не нужно задавать никаких других параметров открытия 0_ххх.

Программа semwait

Программа в листинге 10.6 открывает именованный семафор, вызывает semwait (которая приостанавливает выполнение процесса, если значение семафора меньше либо равно 0, а при положительном значении семафора уменьшает его на 1), получает и выводит значение семафора, а затем останавливает свою работу навсегда при вызове pause.

Листинг 10.6. Ожидание изменения значения семафора и вывод нового значения

//pxsem/semwait.c

1  #include "unpipc.h"


2  int

3  main(int argc, char **argv)

4  {

5   sem_t *sem;

6   int val;

7   if (argc != 2)

8    err_quit("usage: semwait ");

9   sem = Sem_open(argv[1], 0);

10  Sem_wait(sem);

11  Sem_getvalue(sem, &val);

12  printf("pid %ld has semaphore, value = %d\n", (long) getpid(), val);

13  pause(); /* блокируется, пока не будет удален */

14  exit(0);

15 }

Программа sempost

В листинге 10.7 приведена программа, которая выполняет операцию post для указанного семафора (то есть увеличивает его значение на 1), а затем получает значение этого семафора и выводит его.

Листинг 10.7. Увеличение значения семафора

//pxsem/sempost.c

1  #include "unpipc.h"


2  int

3  main(int argc, char **argv)

4  {

5   sem_t *sem;

6   int val;

7   if (argc != 2)

8    err_quit("usage: sempost ");

9   sem = Sem_open(argv[1], 0);

10  Sem_post(sem);

11  Sem_getvalue(sem, &val);

12  printf("value = %d\n", val);

13  exit(0);

14 }

Примеры

Для начала мы создадим именованный семафор в Digital Unix 4.0B и выведем его значение, устанавливаемое по умолчанию при инициализации:

alpha % semcreate /tmp/test1

alpha % ls-l /tmp/test1

-rw-r--r-- 1 rstevens system 264 Nov 13 08:51 /tmp/test1

alpha %semgetvalue /tmp/test1

value = 1

Аналогично очередям сообщений Posix система создает файл семафора с тем именем, которое мы указали при вызове функции.

Теперь подождем изменения семафора и прервем работу программы, установившей блокировку:

alpha % semwait /tmp/test1

pid 9702 has semaphore, value = 0 значение после возврата из sem_wait

^?клавиша прерывания работы в нашей системе

alpha % semgetvalue /tmp/test1

value = 0                         значение остается нулевым

Приведенный пример иллюстрирует упомянутые ранее особенности. Во-первых, значение семафора обладает живучестью ядра. Значение 1, установленное при создании семафора, хранится в ядре даже тогда, когда ни одна программа не пользуется этим семафором. Во-вторых, при выходе из программы semwait, заблокировавшей семафор, значение его не изменяется, то есть ресурс остается заблокированным. Это отличает семафоры от блокировок fcntl, описанных в главе 9, которые снимались автоматически при завершении работы процесса.

Покажем теперь, что в этой реализации отрицательное значение семафора используется для хранения информации о количестве процессов, ожидающих разблокирования семафора:

alpha % semgetvalue /tmp/test1

value = 0                          это значение сохранилось с конца предыдущего примера

alpha % semwait /tmp/test1 &запуск в фоновом режиме

[1] 9718                           блокируется в ожидании изменения значения семафора

alpha % semgetvalue /tmp/test1

value = –1                         один процесс ожидает изменения семафора

alpha % semwait /tmp/test1 &запуск еще одного процесса в фоновом режиме

[2] 9727                           он также блокируется

alpha % semgetvalue /tmp/test1

value = –2                         два процесса ожидают изменения семафора

alpha % sempost /tmp/test1

value = –1                         значение после возвращенияиз sem_post

pid 9718 has semaphore, value = –1 вывод программы semwait

alpha % sempost /tmp/test1

value = 0

pid 9727 has semaphore, value = 0  вывод программы semwait

При первом вызове sem_post значение семафора изменилось с –2 на –1 и один из процессов, ожидавших изменения значения семафора, был разблокирован.

Выполним те же действия в Solaris 2.6, обращая внимание на различия в реализации:

solaris % semcreate /test2

solaris % ls –l /tmp/.*test2*

-rw-r--r-- 1 rstevens other1 48 Nov 13 09:11 /tmp/.SEMDtest2

–rw-rw-rw– 1 rstevens other1  0 Nov 13 09:11 /tmp/.SEMLtest2

solaris % semgetvalue /test2

value = 1

Аналогично очередям сообщений Posix файлы создаются в каталоге /tmp, причем указываемое при вызове имя становится суффиксом имен файлов. Разрешения первого файла соответствуют указанным в вызове sem_open, а второй файл, как можно предположить, используется для блокировки доступа.

Проверим, что ядро не осуществляет автоматического увеличения значения семафора при завершении работы процесса, установившего блокировку:

solaris % semwait /test2

pid 4133 has semaphore, value = 0

^?нажимаем клавишу прерывания выполнения

solaris % semgetvalue /test2

value = 0

Посмотрим теперь, как меняется значение семафора в этой реализации при появлении новых процессов, ожидающих изменения значения семафора:

solaris % semgetvalue /test2

value = 0                       значение сохранилось с конца предыдущего примера

solaris % semwait /test2&запуск в фоновом режиме

[1] 4257                        программа блокируется

solaris % semgetvalue /test2

value = 0                        в этой реализации отрицательные значения не используются

solaris % semwait /test2&еще один фоновый процесс

[2] 4263

solaris % semgetvalue /test2

value 0                          и для двух ожидающих процессов значение остается нулевым

solaris % sempost /test2          выполняем операцию post

pid 4257 has semaphore, value = 0 вывод программы semwait

value = 0

solaris % sempost /test2

pid 4263 has semaphore, value = 0 вывод программы semwait

value = 0

Можно заметить отличие по сравнению с результатами выполнения той же последовательности команд в Digital Unix 4.0B: после изменения значения семафора управление сразу же передается ожидающему изменения семафора процессу.

10.6. Задача производителей и потребителей

В разделе 7.3 мы описали суть задачи производителей и потребителей и привели несколько возможных ее решений, в которых несколько потоков-производителей заполняли массив, который обрабатывался одним потоком-потребителем.

1. В нашем первом варианте решения (раздел 7.2) потребитель запускался только после завершения работы производителей, поэтому мы могли решить проблему синхронизации, используя единственное взаимное исключение для синхронизации производителей.

2. В следующем варианте решения (раздел 7.5) потребитель запускался до завершения работы производителей, поэтому требовалось использование взаимного исключения (для синхронизации производителей) вместе с условной переменной и еще одним взаимным исключением (для синхронизации потребителя с производителями).

Расширим постановку задачи производителей и потребителей, используя общий буфер в качестве циклического: заполнив последнее поле, производитель (buff[NBUFF-1]) возвращается к его началу и заполняет первое поле (buff[0]), и потребитель действует таким же образом. Возникает еще одно требование к синхронизации: потребитель не должен опережать производителя. Мы все еще предполагаем, что производитель и потребитель представляют собой отдельные потоки одного процесса, но они также могут быть и просто отдельными процессами, если мы сможем создать для них общий буфер (например, используя разделяемую память, часть 4).

При использовании общего буфера в качестве циклического код должен удовлетворять трем требованиям:

1. Потребитель не должен пытаться извлечь объект из буфера, если буфер пуст.

2. Производитель не должен пытаться поместить объект в буфер, если последний полон.

3. Состояние буфера может описываться общими переменными (индексами, счетчиками, указателями связных списков и т.д.), поэтому все операции с буфером, совершаемые потребителями и производителями, должны быть защищены от потенциально возможной ситуации гонок.

Наше решение использует три семафора:

1. Бинарный семафор с именем mutex защищает критические области кода: помещение данных в буфер (для производителя) и изъятие данных из буфера (для потребителя). Бинарный семафор, используемый в качестве взаимного исключения, инициализируется единицей. (Конечно, мы могли бы воспользоваться и обычным взаимным исключением вместо двоичного семафора. См. упражнение 10.10.)

2. Семафор-счетчик с именем nempty подсчитывает количество свободных полей в буфере. Он инициализируется значением, равным объему буфера (NBUFF).

3. Семафор-счетчик с именем nstored подсчитывает количество заполненных полей в буфере. Он инициализируется нулем, поскольку изначально буфер пуст.

Рис. 10.7. Состояние буфера и двух семафоров-счетчиков после инициализации


На рис. 10.7 показано состояние буфера и двух семафоров-счетчиков после завершения инициализации. Неиспользуемые элементы массива выделены темным.

В нашем примере производитель помещает в буфер целые числа от 0 до NLOOP-1 (buff[0] = 0, buff[1] = 1), работая с ним как с циклическим. Потребитель считывает эти числа и проверяет их правильность, выводя сообщения об ошибках в стандартный поток вывода.

На рис. 10.8 изображено состояние буфера и семафоров-счетчиков после помещения в буфер трех элементов, но до изъятия их потребителем.

Рис. 10.8. Буфер и семафоры после помещения в буфер трех элементов


Предположим, что потребитель изъял один элемент из буфера. Новое состояние изображено на рис. 10.9.

Рис. 10.9. Буфер и семафоры после удаления первого элемента из буфера


В листинге 10.8 приведен текст функции main, которая создает три семафора, запускает два потока, ожидает их завершения и удаляет семафоры.

Листинг 10.8. Функция main для решения задачи производителей и потребителей с помощью семафоров

//pxsem/prodcons1.с

1  #include "unpipc.h"

2  #define NBUFF 10

3  #define SEM_MUTEX "mutex" /* аргументы px_ipc_name() */

4  #define SEM_NEMPTY "nempty"

5  #define SEM_NSTORED "nstored"


6  int nitems; /* read-only для производителя и потребителя */

7  struct { /* разделяемые производителем и потребителем данные */

8   int buff[NBUFF];

9   sem_t *mutex, *nempty, *nstored;

10 } shared;

11 void *produce(void *), *consume(void *);


12 int

13 main(int argc, char **argv)

14 {

15  pthread_t tid_produce, tid_consume;

16  if (argc != 2)

17   err_quit("usage: prodcons1 <#items>");

18  nitems = atoi(argv[1]);

19  /* создание трех семафоров */

20  shared.mutex = Sem_open(Px_ipc_name(SEM_MUTEX), O_CREAT | O_EXCL,

21   FILE_MODE, 1);

22  shared.nempty = Sem_open(Px_ipc_name(SEM_NEMPTY), 0_CREAT | O_EXCL,

23   FILE_MODE, NBUFF);

24  shared.nstored = Sem_open(Px_ipc_name(SEM_NSTORED), O_CREAT | O_EXCL,

25   FILE_MODE, 0);

26  /* создание одного потока-производителя и одного потока-потребителя */

27  Set_concurrency(2);

28  Pthread_create(&tid_produce, NULL, produce, NULL);

29  Pthread_create(&tid_consume, NULL, consume, NULL);

30  /* ожидание завершения работы потоков */

31  Pthread_join(tid_produce, NULL);

32  Pthread_join(tid_consume, NULL);

33  /* удаление семафоров */

34  Sem_unlink(Px_ipc_name(SEM_MUTEX));

35  Sem_unlink(Px_ipc_name(SEM_NEMPTY));

36  Sem_unlink(Px_ipc_name(SEM_NSTORED));

37  exit(0);

38 }

Глобальные переменные

6-10 Потоки совместно используют буфер, содержащий NBUFF элементов, и три указателя на семафоры. Как говорилось в главе 7, мы объединяем эти данные в структуру, чтобы подчеркнуть, что семафоры используются для синхронизации доступа к буферу.

Создание семафоров

19-25 Мы создаем три семафора, передавая их имена функции px_ipc_name. Флаг O_EXCL мы указываем, для того чтобы гарантировать инициализацию каждого семафора правильным значением. Если после преждевременно завершенного предыдущего запуска программы остались неудаленные семафоры, мы обработаем эту ситуацию, вызвав перед их созданием sem_unlink и игнорируя ошибки. Мы могли бы проверять возвращение ошибки EEXIST при вызове sem_open с флагом O_EXCL, а затем вызывать sem_unlink и еще раз sem_open, но это усложнило бы программу. Если нам нужно проверить, что запущен только один экземпляр программы (что следует сделать перед созданием семафоров), можно обратиться к разделу 9.7, где описаны методы решения этой задачи.

Создание двух потоков

26-29 Создаются два потока, один из которых является производителем, а другой — потребителем. При запуске никакие аргументы им не передаются.

30-36 Главный поток ждет завершения работы производителя и потребителя, а затем удаляет три семафора.

ПРИМЕЧАНИЕ

Мы могли бы вызвать для каждого семафора sem_close, но это делается автоматически при завершении процесса. А вот удалить имя семафора из файловой системы необходимо явно.

В листинге 10.9 приведен текст функций produce и consume.

Листинг 10.9. Функции produce и consume

//pxsem/prodcons1.c

39 void *

40 produce(void *arg)

41 {

42  int i;

43  for (i = 0; i < nitems; i++) {

44   Sem_wait(shared.nempty); /* ожидаем освобождения поля */

45   Sem_wait(shared.mutex);

46   shared.buff[i % NBUFF] = i; /* помещаем i в циклический буфер */

47   Sem_post(shared.mutex);

48   Sem_post(shared.nstored); /* сохраняем еще 1 элемент */

49  }

50  return(NULL);

51 }


52 void *

53 consume(void *arg)

54 {

55  int i;

56  for (i = 0; i < nitems; i++) {

57   Sem_wait(shared.nstored); /* ожидаем появления объекта в буфере */

58   Sem_wait(shared.mutex);

59   if (shared.buff[i % NBUFF] != i)

60    printf("buff[%d] = %d\n", i, shared.buff[i % NBUFF]);

61   Sem_post(shared.mutex);

62   Sem_post(shared.nempty); /* еще одно пустое поле */

63  }

64  return(NULL);

65 }

Производитель ожидает освобождения места в буфере

44 Производитель вызывает sem_wait для семафора nempty, ожидая появления свободного места. В первый раз при выполнении этой команды значение семафора nempty уменьшится с NBUFF до NBUFF-1.

Производитель помещает элемент в буфер

45-48 Перед помещением нового элемента в буфер производитель должен установить блокировку на семафор mutex. В нашем примере, где производитель просто сохраняет значение в элементе массива с индексом i % NBUFF, для описания состояния буфера не используется никаких разделяемых переменных (то есть мы не используем связный список, который нужно было бы обновлять каждый раз при помещении элемента в буфер). Следовательно, установка и снятие семафора mutex не являются обязательными. Тем не менее мы иллюстрируем эту технику, потому что обычно ее применение является необходимым в задачах такого рода (обновление буфера, разделяемого несколькими потоками).

После помещения элемента в буфер блокировка с семафора mutex снимается (его значение увеличивается с 0 до 1) и увеличивается значение семафора nstored. Первый раз при выполнении этой команды значение nstored изменится с начального значения 0 до 1.

Потребитель ожидает изменения семафора nstored

57-62 Если значение семафора nstored больше 0, в буфере имеются объекты для обработки. Потребитель изымает один элемент из буфера и проверяет правильность его значения, защищая буфер в момент доступа к нему с помощью семафора mutex. Затем потребитель увеличивает значение семафора nempty, указывая производителю на наличие свободных полей.

Зависание

Что произойдет, если мы по ошибке поменяем местами вызовы Sem_wait в функции consumer (листинг 10.9)? Предположим, что первым запускается производитель (как в решении, предложенном для упражнения 10.1). Он помещает в буфер NBUFF элементов, уменьшая значение семафора nempty от NBUFF до 0 и увеличивая значение семафора nstored от 0 до NBUFF. Затем производитель блокируется в вызове Sem_wait(shared. nempty), поскольку буфер полон и помещать элементы больше некуда.

Запускается потребитель и проверяет первые NBUFF элементов буфера. Это уменьшает значение семафора nstored от NBUFF до 0 и увеличивает значение семафора nempty от 0 до NBUFF. Затем потребитель блокируется в вызове Sem_wait(shared, nstored) после вызова Sem_wait(shared, mutex). Производитель мог бы продолжать работу, поскольку значение семафора nempty уже отлично от 0, но он вызвал Sem_wait(shared, mutex) и его выполнение было приостановлено. 

Это называется зависанием программы (deadlock). Производитель ожидает освобождения семафора mutex, а потребитель не снимает с него блокировку, ожидая освобождения семафора nstored. Но производитель не может изменить nstored, пока он не получит семафор mutex. Это одна из проблем, которые часто возникают с семафорами: если в программе сделать ошибку, она будет работать неправильно.

ПРИМЕЧАНИЕ

Стандарт Posix позволяет функции sem_wait обнаруживать зависание и возвращать ошибку EDEADLK, но ни одна из систем, использовавшихся для написания примеров (Digital Unix 4.0B и Solaris 2.6), не обнаружила ошибку в данном случае.

10.7. Блокирование файлов

Вернемся к задаче о порядковом номере из главы 9. Здесь мы напишем новые версии функций my_lock и my_unlосk, использующие именованные семафоры Posix. В листинге 10.10 приведен текст этих функций.

Листинг 10.10. Блокирование файла с помощью именованных семафоров Posix

//lock/lockpxsem.c

1  #include "unpipc.h"

2  #define LOCK_PATH "pxsemlock"

3  sem_t *locksem;

4  int initflag;


5  void

6  my_lock(int fd)

7  {

8   if (initflag == 0) {

9    locksem = Sem_open(Px_ipc_name(LOCK_PATH), O_CREAT, FILE_MODE, 1);

10   initflag = 1;

11  }

12  Sem_wait(locksem);

13 }


14 void

15 my_unlock(int fd)

16 {

17  Sem_post(locksem);

18 }

Один из семафоров используется для рекомендательной блокировки доступа к файлу и инициализируется единицей при первом вызове функции. Для получения блокировки мы вызываем sem_wait, а для ее снятия — sem_post.

10.8. Функции sem_init и sem_destroy

До сих пор мы имели дело только с именованными семафорами Posix. Как мы уже говорили, они идентифицируются аргументом пате, обычно представляющим собой имя файла в файловой системе. Стандарт Posix описывает также семафоры, размещаемые в памяти, память под которые выделяет приложение (тип sem_t), а инициализируются они системой:

#include 

int sem_init(sem_t *sem, int shared, unsigned int value);

/* Возвращает –1 в случае ошибки */

int sem_destroy(sem_t *sem);

/* Возвращает 0 в случае успешного завершения, –1 – в случае ошибки */

Размещаемый в памяти семафор инициализируется вызовом sem_init. Аргумент sem указывает на переменную типа sem_t, место под которую должно быть выделено приложением. Если аргумент shared равен 0, семафор используется потоками одного процесса, в противном случае доступ к нему могут иметь несколько процессов. Если аргумент shared ненулевой, семафор должен быть размещен в одном из видов разделяемой памяти и должен быть доступен всем процессам, использующим его. Как и в вызове sem_open, аргумент value задает начальное значение семафора.

После завершения работы с размещаемым в памяти семафором его можно уничтожить, вызвав sem_destroy.

ПРИМЕЧАНИЕ 1

Функции sem_open не требуется параметр, аналогичный shared; не требуется ей и атрибут, аналогичный PTHREAD_PROCESS_SHARED (упоминавшийся в связи с взаимными исключениями и условными переменными в главе 7), поскольку именованный семафор всегда используется совместно несколькими процессами.

ПРИМЕЧАНИЕ 2

Обратите внимание, что для размещаемого в памяти семафора нет ничего аналогичного флагу O_CREAT: функция sem_init всегда инициализирует значение семафора. Следовательно, нужно быть внимательным, чтобы вызывать sem_init только один раз для каждого семафора. (Упражнение 10.2 иллюстрирует разницу в этом смысле между именованным и размещаемым в памяти семафорами.) При вызове sem_init для уже инициализированного семафора результат непредсказуем.

ПРИМЕЧАНИЕ 3

Удостоверьтесь, что вы понимаете фундаментальную разницу между sem_open и sem_init. Первая возвращает указатель на переменную типа sem_t, причем выделение места под переменную и ее инициализация выполняются этой же функцией. Напротив, первый аргумент sem_init представляет собой указатель на переменную типа sem_t, место под которую должен был заранее выделить вызывающий. Функция sem_init только инициализирует эту переменную.

ПРИМЕЧАНИЕ 4

Стандарт Posix.1 предупреждает, что для обращения к размещаемым в памяти семафорам можно использовать только указатель, являющийся аргументом при вызове sem_init. Использование копий этого указателя может привести к неопределенным результатам.

Функция sem_init возвращает –1 в случае ошибки, но она не возвращает 0 в случае успешного завершения. Это действительно странно, и примечание в Обосновании Posix. 1 говорит, что в будущих версиях функция, возможно, начнет возвращать 0 в случае успешного завершения. 

Размещаемый в памяти семафор может быть использован в тех случаях, когда нет необходимости использовать имя, связываемое с именованным семафором. Именованные семафоры обычно используются для синхронизации работы неродственных процессов. Имя в этом случае используется для идентификации семафора.

В связи с табл. 1.1 мы говорили о том, что семафоры, размещаемые в памяти, обладают живучестью процесса, но на самом деле их живучесть зависит от типа используемой разделяемой памяти. Размещаемый в памяти семафор не утрачивает функциональности до тех пор, пока память, в которой он размещен, еще доступна какому-либо процессу.

■ Если размещаемый в памяти семафор совместно используется потоками одного процесса (аргумент shared при вызове sem_init равен 0), семафор обладает живучестью процесса и удаляется при завершении последнего.

■ Если размещаемый в памяти семафор совместно используется несколькими процессами (аргумент shared при вызове seminit равен 1), он должен располагаться в разделяемой памяти, и в этом случае семафор существует столько, сколько существует эта область памяти. Вспомните, что и разделяемая память Posix, и разделяемая память System V обладают живучестью ядра (табл. 1.1). Это значит, что сервер может создать область разделяемой памяти, инициализировать в ней размещаемый в памяти семафор Posix, а затем завершить работу. Некоторое время спустя один или несколько клиентов могут присоединить эту область к своему адресному пространству и получить доступ к хранящемуся в ней семафору.

Предупреждаем, что нижеследующий код не работает так, как ожидается:

sem_t mysem;

Sem_init(&mysem, 1.0); /* 2-й аргумент 1 –> используется процессами */

if (Fork() == 0) { /* дочерний процесс */

 …

 Sem_post(&mysem);

}

Sem_wait(&mysem); /* родительский процесс: ожидание дочернего */

Проблема тут в том, что семафор не располагается в разделяемой памяти (см. раздел 10.12). Память, как правило, не делится между дочерним и родительским процессами при вызове fork. Дочерний процесс запускается с копией памяти родителя, но это не то же самое, что разделяемая память.

Пример

В качестве иллюстрации перепишем наш пример решения задачи производителей и потребителей из листингов 10.8 и 10.9 для использования размещаемых в памяти семафоров Posix. В листинге 10.11 приведен текст новой программы.

Листинг 10.11. Задача производителей и потребителей с использованием размещаемых в памяти семафоров

//pxsem/prodcons2.c

1  #include "unpipc.h"

2  #define NBUFF 10


3  int nitems; /* только для чтения производителем и потребителем */

4  struct { /* общие данные производителя и потребителя */

5   int buff[NBUFF];

6   sem_t mutex, nempty, nstored; /* семафоры, а не указатели */

7  } shared;

8  void *produce(void *), *consume(void *);


9  int

10 main(int argc, char **argv)

11 {

12  pthread_t tid_produce, tid_consume;

13  if (argc != 2)

14   err_quit("usage: prodcons2 <#items>");

15  nitems = atoi(argv[1]);

16  /* инициализация трех семафоров */

17  Sem_init(&shared.mutex, 0, 1);

18  Sem_init(&shared.nempty, 0, NBUFF);

19  Sem_init(&shared.nstored, 0, 0);

20  Set_concurrency(2);

21  Pthread_create(&tid_produce, NULL, produce, NULL);

22  Pthread_create(&tid_consume, NULL, consume, NULL);

23  Pthread_join(tid_produce, NULL);

24  Pthread_join(tid_consume, NULL):

25  Sem_destroy(&shared.mutex);

26  Sem_destroy(&shared.nempty):

27  Sem_destroy(&shared.nstored);

28  exit(0);

29 }


30 void *

31 produce(void *arg)

32 {

33  int i;

34  for (i = 0; i < nitems; i++) {

35   Sem_wait(&shared.nempty); /* ожидание одного свободного поля */

36   Sem_wait(&shared.mutex);

37   shared.buff[i % NBUFF] = i; /* помещение i в циклический буфер */

38   Sem_post(&shared.mutex);

39   Sem_post(&shared.nstored); /* поместили еще один элемент */

40  }

41  return(NULL);

42 }


43 void *

44 consume(void *arg)

45 {

46  int i;

47  for (i = 0; i < nitems; i++) {

48   Sem_wait(&shared.nstored); /* ожидаем появления хотя бы одного готового для обработки элемента */

49   Sem_wait(&shared.mutex);

50   if (shared.buff[i % NBUFF] != i)

51    printf("buff[*d] = *d\n", i, shared.buff[i % NBUFF]);

52   Sem_post(&shared.mutex);

53   Sem_post(&shared.nempty); /* еще одно пустое поле */

54  }

55  return(NULL);

56 }

Выделение семафоров

6 Мы объявляем три семафора типа sem_t, и теперь это сами семафоры, а не указатели на них.

Вызов sem_init

16-27 Мы вызываем sem_init вместо sem_open* а затем sem_destroy вместо sem_unlink. Вызывать sem_destroy на самом деле не требуется, поскольку программа все равно завершается.

Остальные изменения обеспечивают передачу указателей на три семафора при вызовах sem_wait и sem_post.

10.9. Несколько производителей, один потребитель

Решение в разделе 10.6 относится к классической задаче с одним производителем и одним потребителем. Новая, интересная модификация программы позволит нескольким производителям работать с одним потребителем. Начнем с решения из листинга 10.11, в котором использовались размещаемые в памяти семафоры. В листинге 10.12 приведены объявления глобальных переменных и функция main.

Листинг 10.12. Функция main задачи с несколькими производителями

//pxsem/prodcons3.c

1  #include "unpipc.h"

2  #define NBUFF 10

3  #define MAXNTHREADS 100


4  int nitems, nproducers; /* только для чтения производителем и потребителем */

5  struct { /* общие данные */

6   int buff[NBUFF];

7   int nput;

8   int nputval;

9   sem_t mutex, nempty, nstored; /* семафоры, а не указатели */

10 } shared;


11 void *produce(void *), *consume(void *);


12 int

13 main(int argc, char **argv)

14 {

15  int i, count[MAXNTHREADS];

16  pthread_t tid_produce[MAXNTHREADS], tid_consume;

17  if (argc != 3)

18   err_quit("usage: prodcons3 <#items><#producers>");

19  nitems = atoi(argv[1]);

20  nproducers = min(atoi(argv[2]), MAXNTHREADS);

21  /* инициализация трех семафоров */

22  Sem_init(&shared.mutex, 0, 1);

23  Sem_init(&shared.nempty, 0, NBUFF);

24  Sem_init(&shared.nstored, 0, 0);

25  /* создание всех производителей и одного потребителя */

26  Set_concurrency(nproducers + 1);

27  for (i = 0; i < nproducers; i++) {

28   count[i] = 0;

29   Pthread_create(&tid_produce[i], NULL, produce, &count[i]);

30  }

31  Pthread_create(&tid_consume, NULL, consume, NULL);

32  /* ожидание завершения всех производителей и потребителя */

33  for (i = 0; i < nproducers; i++) {

34   Pthread_join(tid_produce[i], NULL);

35   printf("count[%d] = %d\n", i, count[i]);

36  }

37  Pthread_join(tid_consume, NULL);

38  Sem_destroy(&shared.mutex);

39  Sem_destroy(&shared.nempty);

40  Sem_destroy(&shared.nstored);

41  exit(0);

42 }

Глобальные переменные

4 Глобальная переменная nitems хранит число элементов, которые должны быть совместно произведены. Переменная nproducers хранит число потоков-производителей. Оба эти значения устанавливаются с помощью аргументов командной строки.

Общая структура

5-10 В структуру shared добавляются два новых элемента: nput, обозначающий индекс следующего элемента, куда должен быть помещен объект (по модулю BUFF), и nputval следующее значение, которое будет помещено в буфер. Эти две переменные взяты из нашего решения в листингах 7.1 и 7.2. Они нужны для синхронизации нескольких потоков-производителей.

Новые аргументы командной строки

17-20 Два новых аргумента командной строки указывают полное количество элементов, которые должны быть помещены в буфер, и количество потоков-производителей. 

Запуск всех потоков

21-41 Инициализируем семафоры и запускаем потоки-производители и поток-потребитель. Затем ожидается завершение работы потоков. Эта часть кода практически идентична листингу 7.1.

В листинге 10.13 приведен текст функции produce, которая выполняется каждым потоком-производителем.

Листинг 10.13. Функция, выполняемая всеми потоками-производителями

//pxsem/prodcons3.c

43 void *

44 produce(void *arg)

45 {

46  for (;;) {

47   Sem_wait(&shared.nempty); /* ожидание освобождения поля */

48   Sem_wait(&shared.mutex);

49   if (shared.nput >= nitems) {

50    Sem_post(&shared.nempty);

51    Sem_post(&shared.mutex);

52    return(NULL); /* готово */

53   }

54   shared.buff[shared.nput % NBUFF] = shared.nputval;

55   shared.nput++;

56   shared.nputval++;

57   Sem_post(&shared.mutex);

58   Sem_post(&shared.nstored); /* еще один элемент */

59   *((int *) arg) += 1;

60  }

61 }

Взаимное исключение между потоками-производителями

49-53 Отличие от листинга 10.8 в том, что цикл завершается, когда nitems объектов будет помещено в буфер всеми потоками. Обратите внимание, что потоки-производители могут получить семафор nempty в любой момент, но только один производитель может иметь семафор mutex. Это защищает переменные nput и nval от одновременного изменения несколькими производителями.

Завершение производителей

50-51 Нам нужно аккуратно обработать завершение потоков-производителей. После того как последний объект помещен в буфер, каждый поток выполняет

Sem_wait(&shared.nempty); /* ожидание пустого поля */

в начале цикла, что уменьшает значение семафора nempty. Но прежде, чем поток будет завершен, он должен увеличить значение этого семафора, потому что он не помещает объект в буфер в последнем проходе цикла. Завершающий работу поток должен также освободить семафор mutex, чтобы другие производители смогли продолжить функционирование. Если мы не увеличим семафор nempty по завершении процесса и если производителей будет больше, чем мест в буфере, лишние потоки будут заблокированы навсегда, ожидая освобождения семафора nempty, и никогда не завершат свою работу.

Функция consume в листинге 10.14 проверяет правильность всех записей в буфере, выводя сообщение при обнаружении ошибки.

Листинг 10.14. Функция, выполняемая потоком-потребителем

//pxsem/prodcons3.с

62 void *

63 consume(void *arg)

64 {

65  int i;

66  for (i = 0; i < nitems; i++) {

67   Sem_wait(&shared.nstored); /* ожидание помещения по крайней мере одного элемента в буфер */

68   Sem_wait(&shared.mutex);

69   if (shared.buff[i % NBUFF] != i)

70    printf("error: buff[%d] = %d\n", i, shared.buff[i % NBUFF]);

71   Sem_post(&shared.mutex);

72   Sem_post(&shared.nempty); /* еще одно пустое поле */

73  }

74  return(NULL);

75 }

Условие завершения единственного потока-потребителя звучит просто: он считает все потребленные объекты и останавливается по достижении nitems.

10.10. Несколько производителей, несколько потребителей

Следующее изменение, которое мы внесем в нашу пpoгрaммy, будет заключаться в добавлении возможности одновременной работы нескольких потребителей вместе с несколькими производителями. Есть ли смысл в наличии нескольких потребителей — зависит от приложения. Автор видел два примера, в которых использовался этот метод.

1. Пpoгрaммa преобразования IP-адресов в имена узлов. Каждый потребитель берет IP-адрес, вызывает gethostbyaddr (раздел 9.6 [24]), затем дописывает имя узла к файлу. Поскольку каждый вызов gethostbyaddr обрабатывается неопределенное время, порядок IP-адресов в буфере будет, скорее всего, отличаться от порядка имен узлов в файле, созданном потоками-потребителями. Преимущество этой схемы в параллельности выполнения вызовов gethostbyaddr (каждый из которых может работать несколько секунд) — по одному на каждый поток-потребитель.

ПРИМЕЧАНИЕ

Предполагается наличие версии gethostbyaddr, допускающей многократное вхождение, что не всегда верно. Если эта версия недоступна, можно хранить буфер в разделяемой памяти и использовать процессы вместо потоков. 

2. Программа, принимающая дейтаграммы UDP, обрабатывающая их и записывающая результат в базу данных. Каждая дeйтaгрaммa обрабатывается одним потоком-потребителем, которые выполняются параллельно для ускорения процесса. Хотя дейтаграммы записываются в базу данных в порядке, вообще говоря, отличном от порядка их приема, встроенная схема упорядочения записей в базе данных справляется с этой проблемой.

В листинге 10.15 приведены глобальные переменные программы.

Листинг 10.15. Глобальные переменные

//pxsem/prodcons4.с

1  #include "unpipc.h"

2  #define NBUFF 10

3  #define MAXNTHREADS 100

4  int nitems, nproducers, nconsumers; /* только для чтения */

5  struct { /* общие данные производителей и потребителей */

6   int buff[NBUFF];

7   int nput; /* номер объекта: 0, 1. 2, … */

8   int nputval; /* сохраняемое в buff[] значение */

9   int nget; /* номер объекта: 0, 1, 2, … */

10  int ngetval; /* получаемое из buff[] значение */

11  sem_t mutex, nempty, nstored; /* семафоры, а не указатели */

12 } shared;

13 void *produce(void *), *consume(void *);

Глобальные переменные и общая структура

4-12 Количество потоков-потребителей является глобальной переменной, устанавливаемой из командной строки. В структуру shared добавилось два новых поля: nget — номер следующего объекта, получаемого одним из потоков-потребителей, и ngetval — соответствующее значение.

Функция main, текст которой приведен в листинге 10.16, запускает несколько потоков-потребителей и потоков-производителей одновременно.

19-23 Новый аргумент командной строки указывает количество потоков-потребителей. Для хранения идентификаторов потоков-потребителей выделяется место под специальный массив (tid_consume), а для подсчета обработанных каждым потоком объектов выделяется массив conscount.

24-50 Создаются несколько потоков-производителей и потребителей, после чего основной поток ждет их завершения.

Листинг 10.16. Функция main для версии с несколькими производителями и потребителями

//pxsem/prodcons4.с

14 int

15 main(int argc, char **argv)

16 {

17  int i, prodcount[MAXNTHREADS], conscount[MAXNTHREADS];

18  pthread_t tid_produce[MAXNTHREADS], tid_consume[MAXNTHREADS];

19  if (argc != 4)

20   err_quit("usage: prodcons4 <#items><#producers><#consumers>");

21  nitems = atoi(argv[1]);

22  nproducers = min(atoi(argv[2]), MAXNTHREADS);

23  nconsumers = min(atoi(argv[3]), MAXNTHREADS);

24  /* инициализация трех семафоров */

25  Sem_init(&shared.mutex, 0, 1);

26  Sem_init(&shared.nempty, 0, NBUFF);

27  Sem_init(&shared.nstored, 0, 0);

28  /* создание производителей и потребителей */

29  Set_concurrency(nproducers + nconsumers);

30  for (i = 0; i < nproducers; i++) {

31   prodcount[i] = 0;

32   Pthread_create(&tid_produce[i], NULL, produce, &prodcount[i]);

33  }

34  for (i = 0; i < nconsumers; i++) {

35   conscount[i] = 0;

36   Pthread_create(&tid_consume[i], NULL, consume, &conscount[i]);

37  }

38  /* ожидание завершения всех производителей и потребителей */

39  for (i = 0; i < nproducers: i++) {

40   Pthread_join(tid_produce[i], NULL);

41   printf("producer count[%d] = %d\n", i, prodcount[i]);

42  }

43  for (i = 0; i < nconsumers; i++) {

44   Pthread_join(tid_consume[i], NULL);

45   printf("consumer count[%d] = %d\n", i, conscount[i]);

46  }

47  Sem_destroy(&shared.mutex);

48  Sem_destroy(&shared.nempty);

49  Sem_destroy(&shared.nstored);

50  exit(0);

51 }

Функция produce содержит одну новую строку по сравнению с листингом 10.13. В части кода, относящейся к завершению потока-производителя, появляется строка, отмеченная знаком +:

 if (shared.nput >= nitems) {

+ Sem_post(&shared.nstored); /* даем возможность потребителям завершить работу */

  Sem_post(&shared.nempty);

  Sem_post(&shared.mutex);

  return(NULL); /* готово */

 }

Снова нам нужно быть аккуратными при обработке завершения процессов-производителей и потребителей. После обработки всех объектов в буфере все потребители блокируются в вызове

Sem_wait(&shared.nstored); /* Ожидание помещения объекта в буфер */

Производителям приходится увеличивать семафор nstored для разблокирования потрeбитeлeй, чтобы они узнали, что работа завершена. Функция consume приведена в листинге 10.17. 

Листинг 10.17. Функция, выполняемая всеми потоками-потребителями

//pxsem/prodcons4.c

72 void *

73 consume(void *arg)

74 {

75  int i;

76  for (;;) {

77   Sem_wait(&shared.nstored); /* ожидание помещения объекта в буфер */

78   Sem_wait(&shared.mutex);

79   if (shared.nget >= nitems) {

80    Sem_post(&shared.nstored);

81    Sem_post(&shared.mutex);

82    return(NULL); /* готово */

83   }

84   i = shared.nget % NBUFF;

85   if (shared.buff[i] != shared.ngetval)

86    printf("error: buff[%d] = %d\n", i, shared.buff[i]);

87   shared.nget++;

88   shared.ngetval++;

89   Sem_post(&shared.mutex);

90   Sem_post(&shared.nempty); /* освобождается место для элемента */

91   *((int *) arg) += 1;

92  }

93 }

Завершение потоков-потребителей

79-83 Функция consume сравнивает nget и nitems, чтобы узнать, когда следует остановиться (аналогично функции produce). Обработав последний объект в буфере, потоки-потребители блокируются, ожидая изменения семафора nstored. Когда завершается очередной поток-потребитель, он увеличивает семафор nstored, давая возможность завершить работу другому потоку-потребителю.

10.11. Несколько буферов

Во многих программах, обрабатывающих какие-либо данные, можно встретить цикл вида

while ((n = read(fdin, buff, BUFFSIZE)) > 0) {

 /* обработка данных */

 write(fdout, buff, n);

}

Например, программы, обрабатывающие текстовые файлы, считывают строку из входного файла, выполняют с ней некоторые действия, а затем записывают строку в выходной файл. Для текстовых файлов вызовы read и write часто заменяются на функции стандартной библиотеки ввода-вывода fgets и fputs.

На рис. 10.11 изображена иллюстрация к такой схеме. Здесь функция reader считывает данные из входного файла, а функция writer записывает данные в выходной файл. Используется один буфер.

Рис. 10.10. Процесс считывает данные в буфер, а потом записывает его содержимое в другой файл


Рис. 10.11. Один процесс, считывающий данные в буфер и записывающий их в файл


На рис. 10.10 приведена временная диаграмма работы такой программы. Числа слева проставлены в условных единицах времени. Предполагается, что операция чтения занимает 5 единиц, записи — 7, а обработка данных между считыванием и записью требует 2 единицы времени.

Можно изменить это приложение, разделив процесс на отдельные потоки, как показано на рис. 10.12. Здесь используется два потока (а не процесса), поскольку глобальный буфер автоматически разделяется между ними. Мы могли бы разделить приложение и на два процесса, но это потребовало бы использования разделяемой памяти, с которой мы еще не знакомы.

Рис. 10.12. Разделение копирования файла между двумя потоками


Разделение операций между потоками (или процессами) требует использования какой-либо формы уведомления между ними. Считывающий поток должен уведомлять записывающий о готовности буфера к операции записи, а записывающий должен уведомлять считывающий о том, что буфер пуст и его можно заполнять снова. На рис. 10.13 изображена временная диаграмма для новой схемы. 

Рис. 10.13. Копирование файла двумя потоками


Предполагается, что для обработки данных в буфере требуется две единицы времени. Важно отметить, что разделение чтения и записи между двумя потоками ничуть не ускорило выполнение операции копирования в целом. Мы не выиграли в скорости, мы просто распределили выполнение задачи между двумя потоками (или процессами).

В этих диаграммах мы игнорируем множество тонкостей. Например, большая часть ядер Unix выявляет операцию последовательного считывания файла и осуществляет асинхронное упреждающее чтение следующего блока данных еще до поступления запроса. Это может ускорить работу процесса, считывающего данные. Мы также игнорируем влияние других процессов на наши считывающий и записывающий потоки, а также влияние алгоритмов разделения времени, реализованных в ядре.

Следующим шагом будет использование двух потоков (или процессов) и двух буферов. Это называется классическим решением с двойной буферизацией; схема его изображена на рис. 10.14.

Рис. 10.14. Копирование файла двумя потоками с двумя буферами


На нашем рисунке считывающий поток помещает данные в первый буфер, а записывающий берет их из второго. После этого потоки меняются местами.

На рис. 10.15 изображена временная диаграмма процесса с двойной буферизацией. Считывающий поток помещает данные в буфер № 1, а затем уведомляет записывающий о том, что буфер готов к обработке. Затем считывающий процесс помещает данные в буфер № 2, а записывающий берет их из буфера № 1.

В любом случае, мы ограничены скоростью выполнения самой медленной операции — операции записи. После выполнения первых двух операций считывания серверу приходится ждать две дополнительные единицы времени, составляющие разницу в скорости выполнения операций чтения и записи. Тем не менее для нашего гипотетического примера полное время работы будет сокращено почти вдвое.

Обратите внимание, что операции записи выполняются так быстро, как только возможно. Они разделены промежутками времени всего лишь в 2 единицы, тогда как в предыдущих примерах между ними проходило 9 единиц времени (рис. 10.10 и 10.13). Это может оказаться выгодным при работе с некоторыми устройствами типа накопителей на магнитной ленте, которые функционируют быстрее, если данные записываются с максимально возможной скоростью (это называется потоковым режимом — streaming mode).

Рис. 10.15. Процесс с двойной буферизацией


Интересно, что задача с двойной буферизацией представляет собой лишь частный случай общей задачи производителей и потребителей.

Изменим нашу программу так, чтобы использовать несколько буферов. Начнем с решения из листинга 10.11, в котором использовались размещаемые в памяти семафоры. Мы получим даже не двойную буферизацию, а работу с произвольным числом буферов (задается NBUFF). В листинге 10.18 даны глобальные переменные и функция main.

Листинг 10.18. Глобальные переменные и функция main

//pxsem/mycat2.c

1  #include "unpipc.h"

2  #define NBUFF 8


3  struct { /* общие данные */

4   struct {

5    char data[BUFFSIZE]; /* буфер */

6    ssize_t n; /* объем буфера */

7   } buff[NBUFF]; /* количество буферов */

8   sem_t mutex, nempty, nstored; /* семафоры, а не указатели */

9  } shared;

10 int fd; /* входной файл, копируемый в стандартный поток вывода */

11 void *produce(void *), *consume(void *);


12 int

13 main(int argc, char **argv)

14 {

15  pthread_t tid_produce, tid_consume;

16  if (argc != 2)

17   err_quit("usage: mycat2 ");

18  fd = Open(argv[1], O_RDONLY);

19  /* инициализация трех семафоров */

20  Sem_init(&shared.mutex, 0, 1);

21  Sem_init(&shared.nempty, 0, NBUFF);

22  Sem_init(&shared.nstored, 0, 0);

23  /* один производитель, один потребитель */

24  Set_concurrency(2);

25  Pthread_create(&tid_produce, NULL, produce, NULL); /* reader thread */

26  Pthread_create(&tid_consume, NULL, consume, NULL); /* writer thread */

27  Pthread_join(tid_produce, NULL);

28  Pthread_join(tid_consume, NULL);

29  Sem_destroy(&shared.mutex);

30  Sem_destroy(&shared.nempty);

31  Sem_destroy(&shared.nstored);

32  exit(0);

33 }

Объявление нескольких буферов

2-9 Структура shared содержит массив структур buff, которые состоят из буфера и его счетчика. Мы создаем NBUFF таких буферов.

Открытие входного файла

18 Аргумент командной строки интерпретируется как имя файла, который копируется в стандартный поток вывода.

В листинге 10.19 приведен текст функций produce и consume.

Листинг 10.19. Функции produce и consume

//pxsem/mycat2.c

34 void *

35 produce(void *arg)

36 {

37  int i;

38  for (i = 0;;) {

39   Sem_wait(&shared.nempty); /* Ожидание освобождения места в буфере */

40   Sem_wait(&shared.mutex);

41   /* критическая область */

42   Sem_post(&shared.mutex);

43   shared.buff[i].n = Read(fd, shared.buff[i].data, BUFFSIZE);

44   if (shared.buff[i].n == 0) {

45    Sem_post(&shared.nstored); /* еще один объект */

46    return(NULL);

47   }

48   if (++i >= NBUFF)

49    i = 0; /* кольцевой буфер */

50   Sem_post(&shared.nstored); /* еще один объект */

51  }

52 }


53 void *

54 consume(void *arg)

55 {

56  int i;

57  for (i = 0;;) {

58   Sem_wait(&shared.nstored); /* ожидание появления объекта для обработки */

59   Sem_wait(&shared.mutex);

60   /* критическая область */

61   Sem_post(&shared.mutex);

62   if (shared.buff[i].n == 0)

63    return(NULL);

64   Write(STDOUT_FILENO, shared.buff[i].data, shared.buff[i].n);

65   if (++i >= NBUFF)

66    i=0; /* кольцевой буфер */

67   Sem_post(&shared.nempty); /* освободилось место для объекта */

68  }

69 }

Пустая критическая область

40-42 Критическая область, защищаемая семафором mutex, в данном примере пуста. Если бы буферы данных представляли собой связный список, здесь мы могли бы удалять буфер из списка, не конфликтуя при этом с производителем. Но в нашем примере, где мы просто переходим к следующему буферу с единственным потоком-производителем, защищать нам просто нечего. Тем не менее мы оставляем операции установки и снятия блокировки, подчеркивая, что они могут потребоваться в новых версиях кода.

Считывание данных и увеличение семафора nstored

43-49 Каждый раз, когда производитель получает пустой буфер, он вызывает функцию read. При возвращении из read увеличивается семафор nstored, уведомляя потребителя о том, что буфер готов. При возвращении функцией read значения 0 (конец файла) семафор увеличивается, а производитель завершает работу.

Поток-потребитель

57-68 Поток-потребитель записывает содержимое буферов в стандартный поток вывода. Буфер, содержащий нулевой объем данных, обозначает конец файла. Как и в потоке-производителе, критическая область, защищенная семафором mutex, пуста.

ПРИМЕЧАНИЕ

В разделе 22.3 книги [24] мы разработали пример с несколькими буферами. В этом примере производителем был обработчик сигнала SIGIO, а потребитель представлял собой основной цикл обработки (функцию dg_echo). Разделяемой переменной был счетчик nqueue. Потребитель блокировал сигнал SIGIO на время проверки или изменения счетчика.

10.12. Использование семафоров несколькими процессами

Правила совместного использования размещаемых в памяти семафоров несколькими процессами просты: сам семафор (переменная типа semt, адрес которой является первым аргументом sem_init) должен находиться в памяти, разделяемой всеми процессами, которые хотят его использовать, а второй аргумент функции sem_init должен быть равен 1.

ПРИМЕЧАНИЕ

Эти правила аналогичны требованиям к разделению взаимного исключения, условной переменной или блокировки чтения-записи между процессами: средство синхронизации (переменная типа pthread_mutex_t, pthread_cond_t или pthread_rwlock_t) должно находиться в разделяемой памяти и инициализироваться с атрибутом PTHREAD_PROCESS SHARED.

Что касается именованных семафоров, процессы всегда могут обратиться к одному и тому же семафору, указав одинаковое имя при вызове sem_open. Хотя указатели, возвращаемые sem_open отдельным процессам, могут быть различны, все функции, работающие с семафорами, будут обращаться к одному и тому же именованному семафору.

Что произойдет, если мы вызовем функцию sem_open, возвращающую указатель на тип sem_t, а затем вызовем fork? В описании функции fork в стандарте Posix.1 говорится, что «все открытые родительским процессом семафоры будут открыты и в дочернем процессе». Это означает, что нижеследующий код верен:

sem_t *mutex; /* глобальный указатель, копируемый, при вызове fork() */

/* родительский процесс создает именованный семафор */

mutex = Sem_open(Px_ipc_name(NAME), O_CREAT | O_EXCL, FILE_MODE, 0);

if ((childpid = Fork()) == 0) {

 /* дочерний процесс */

 …

 Sem_wait(mutex);

 …

}

/* родительский процесс */

Sem_post(mutex);

ПРИМЕЧАНИЕ

Причина, по которой следует аккуратно относиться к передаче семафоров при порождении процессов, заключается в том, что состояние семафора может храниться в переменной типа sem_t, но для его работы может требоваться и другая информация (например, дескрипторы файлов). В следующей главе мы увидим, что семафоры System V однозначно определяются их целочисленными идентификаторами, возвращаемыми функцией semget. Любой процесс, которому известен идентификатор, может получить доступ к семафору. Вся информация о семафоре System V хранится в ядре, а целочисленный идентификатор просто указывает номер семафора ядру.

10.13. Ограничения на семафоры

Стандартом Posix определены два ограничения на семафоры:

SEM_NSEMS_MAX — максимальное количество одновременно открытых семафоров для одного процесса (Posix требует, чтобы это значение было не менее 256);

SEM_VALUE_MAX — максимальное значение семафора (Posix требует, чтобы оно было не меньше 32767).

Две эти константы обычно определены в заголовочном файле и могут быть получены во время выполнения вызовом sysconf, как мы показываем ниже.

Пример: программа semsysconf

Программа в листинге 10.20 вызывает sysconf и выводит два ограничения на семафоры, зависящие от конкретной реализации. 

Листинг 10.20. Вызов sysconf для получения ограничений на семафоры

//pxsem/semsysconf.с

1 #include "unpipc.h"


2 int

3 main(int argc, char **argv)

4 {

5  printf("SEM_NSEMS_MAX = %ld, SEM_VALUE_MAX = %ld\n",

6   Sysconf(_SC_SEM_NSEMS_MAX), Sysconf(_SC_SEM_VALUE_MAX));

7  exit(0);

8 }

При запуске этой программы в наших двух тестовых системах получим следующий результат:

solaris % semsysconf

SEMS_NSEMS_MAX = 2147483647, SEM_VALUE_MAX = 2147483647

alpha % semsysconf

SEMS_NSEMS_MAX = 256, SEM_VALUE_MAX = 32767

10.14. Реализация с использованием FIFO

Займемся реализацией именованных семафоров Posix с помощью каналов FIFO. Именованный семафор реализуется как канал FIFO с конкретным именем. Неотрицательное количество байтов в канале соответствует текущему значению семафора. Функция sem_post помещает 1 байт в канал, a sem_wait считывает его оттуда (приостанавливая выполнение процесса, если канал пуст, а именно этого мы и хотим). Функция sem_open создает канал FIFO, если указан флаг O_CREAT; открывает его дважды (один раз на запись, другой — на чтение) и при создании нового канала FIFO помещает в него некоторое количество байтов, указанное в качестве начального значения.

ПРИМЕЧАНИЕ

Этот и последующие разделы данной главы содержат усложненный материал, который можно при первом чтении пропустить.

Приведем текст нашего заголовочного файла semaphore.h, определяющего фундаментальный тип sem_t (листинг 10.21).

Листинг 10.21. Заголовочный файл semaphore.h

//my_pxsem_fifo/semaphore.h

1  /* фундаментальный тип */

2  typedef struct {

3   int sem_fd[2]; /* два дескриптора fd: [0] для чтения, [1] для записи */

4   int sem_magic; /* магическое число */

5  } mysem_t;


6  #define SEM_MAGIC 0x89674523


7  #ifdef SEM_FAILED

8  #undef SEM_FAILED

9  #define SEM_FAILED ((mysem_t *)(-1)) /* чтобы компилятор не выдавал предупреждений*/


10 #endif

Тип данных sem_t

1-5 Новая структура данных содержит два дескриптора, один из которых предназначен для чтения из FIFO, а другой — для записи. Для единообразия мы храним оба дескриптора в массиве из двух элементов, в котором первый дескриптор всегда открыт на чтение, а второй — на запись.

Поле sem_magiс содержит значение SEM_MAGIC, если структура проинициализирована. Это значение проверяется всеми функциями, которым передается указатель на тип sem_t, чтобы гарантировать, что передан был действительно указатель на заранее инициализированную структуру, а не на произвольную область памяти. При закрытии семафора этому полю присваивается значение 0. Этот метод хотя и не совершенен, но дает возможность обнаружить некоторые ошибки при написании программ.

Функция sem_open

В листинге 10.22 приведен текст функции sem_open, которая создает новый семафор или открывает существующий.

Листинг 10.22. Функция sem_open

//my_pxsem_fifo/sem_open.с

1  #include "unpipc.h"

2  #include "semaphore.h"

3  #include  /* для произвольного списка аргументов */


4  mysem_t *

5  mysem_open(const char *pathname, int oflag, …)

6  {

7   int i, flags, save_errno;

8   char c;

9   mode_t mode;

10  va_list ap;

11  mysem_t *sem;

12  unsigned int value;

13  if (oflag & O_CREAT) {

14   va_start(ap, oflag); /* ар инициализируется последним аргументом */

15   mode = va_arg(ap, va_mode_t);

16   value = va_arg(ap, unsigned int);

17   va_end(ap);

18   if (mkfifo(pathname, mode) < 0) {

19    if (errno == EEXIST && (oflag & O_EXCL) == 0)

20     oflag &= ~O_CREAT; /* уже существует, OK */

21    else

22     return(SEM_FAILED);

23   }

24  }

25  if ((sem = malloc(sizeof(mysem_t))) == NULL)

26   return(SEM_FAILED);

27  sem->sem_fd[0] = sem->sem_fd[1] = –1;

28  if ((sem->sem_fd[0] = open(pathname, O_RDONLY | O_NONBLOCK)) < 0)

29   goto error;

30  if ((sem->sem_fd[1] = open(pathname, O_WRONLY | O_NONBLOCK)) < 0)

31   goto error;

32  /* отключение неблокируемого режима для sem_fd[0] */

33  if ((flags = fcntl(sem->sem_fd[0], F_GETFL, 0)) < 0)

34   goto error;

35  flags &= ~O_NONBLOCK;

36  if (fcntl(sem->sem_fd[0], F_SETFL, flags) < 0)

37   goto error;

38  if (oflag & O_CREAT) { /* инициализация семафора */

39   for (i = 0; i < value; i++)

40    if (write(sem->sem_fd[1], &c, 1) != 1)

41   goto error;

42  }

43  sem->sem_magic = SEM_MAGIC;

44  return(sem);

45 error:

46  save_errno = errno;

47  if (oflag & O_CREAT)

48   unlink(pathname); /* если мы создали FIFO */

49  close(sem->sem_fd[0]); /* игнорируем ошибку */

50  close(sem->sem_fd[1]); /* игнорируем ошибку */

51  free(sem);

52  errno = save_errno;

53  return(SEM_FAILED);

54 }

Создание нового sсемафора

13-17 Если при вызове указан флаг O_CREAT, должно быть указано четыре аргумента, а не два. Мы вызываем va_start, после чего переменная ар указывает на последний явно указанный аргумент (oflag). Затем мы используем ар и функцию va_arg для получения значений третьего и четвертого аргументов. Работу со списком аргументов переменной длины и использование нашего типа va_mode_t мы обсуждали в связи с листингом 5.17.

Создание нового канала FIFO

18-23 Создается новый канал FIFO, имя которого было указано при вызове функции. Как мы отмечали в разделе 4.6, эта функция возвращает ошибку EEXIST, если канал уже существует. Если при вызове sem_open флаг O_EXCL не был указан, мы пропускаем эту ошибку; но нам не нужно будет инициализировать этот канал, так что мы при этом сбрасываем флаг O_CREAT.

Выделение памяти под тип sem_t и открытие FIFO на чтение и запись

25-37 Мы выделяем место для типа sem_t, который содержит два дескриптора. Затем мы дважды открываем канал FIFO: один раз только на чтение, а другой — только на запись. При этом мы не хотим блокирования при вызове open, поэтому указываем флаги O_NONBLOCK при открытии очереди только для чтения (вспомните табл. 4.1). Мы также указываем флаг O_NONBLOCK при открытии канала на запись, но это предназначено для обнаружения переполнения (на тот случай, если мы попытаемся записать больше, чем позволяет PIPE_BUF). После открытия канала мы отключаем неблокируемый режим для дескриптора, открытого на чтение.

Инициализация значения созданного семафора

38-42 Если мы создали семафор, его нужно проинициализировать, записав в канал FIFO value байтов. Если указанное при вызове значение value превышает определенное реализацией ограничение PIPE_BUF, вызов write после переполнения FIFO вернет ошибку с кодом EAGAIN.

Функция sem_close

Текст функции sem_close приведен в листинге 10.23.

11-15 Мы закрываем оба дескриптора и освобождаем память, выделенную под тип sem_t.

Листинг 10.23. Функция sem_close

//my_pxsem_fifo/sem_close.с

1  #include "unpipc.h"

2  #include "semaphore.h"


3  int

4  mysem_close(mysem_t *sem)

5  {

6   if (sem->sem_magic != SEM_MAGIC) {

7    errno = EINVAL;

8    return(-1);

9   }

10  sem->sem_magic = 0; /* чтобы семафор нельзя было больше использовать */

11  if (close(sem->sem_fd[0]) == –1 || close(sem->sem_fd[1]) == –1) {

12   free(sem);

13   return(-1);

14  }

15  free(sem);

16  return(0);

17 }

Функция sem_unlink

Функция sem_unlink, текст которой приведен в листинге 10.24, удаляет из файловой системы наш семафор. Она просто вызывает unlink.

Листинг 10.24. Функция sem_unlink

//my_pxsem_fifo/sem_unlink. с

1 #include "unpipc.h"

2 #include "semaphore.h"


3 int

4 mysem_unlink(const char *pathname)

5 {

6  return(unlink(pathname));

7 }

Функция sem_post

В листинге 10.25 приведен текст функции sem_post, которая увеличивает значение семафора.

11-12 Мы записываем один байт в FIFO. Если канал был пуст, это приведет к возобновлению выполнения всех процессов, заблокированных в вызове read для этого канала.

Листинг 10.25. Функция sem_post

//my_pxsem_fifo/sem_post.с

1  #include "unpipc.h"

2  #include "semaphore.h"


3  int

4  mysem_post(mysem_t *sem)

5  {

6   char c;

7   if (sem->sem_magic != SEM_MAGIC) {

8    errno = EINVAL;

9    return(-1);

10  }

11  if (write(sem->sem_fd[1], &c, 1) == 1)

12   return(0);

13  return(-1);

14 }

Функция sem_wait

Последняя функция для работы с именованными семафорами Posix — sem_wait. Ее текст приведен в листинге 10.26.

Листинг 10.26. Функция sem_wait

//my_pxsem_fifo/sem_wait.с

1  #include "unpipc.h"

2  #include "semaphore.h"


3  int

4  mysem_wait(mysem_t *sem)

5  {

6   char c;

7   if (sem->sem_magic != SEM_MAGIC) {

8    errno = EINVAL;

9    return(-1);

10  }

11  if (read(sem->sem_fd[0], &c, 1) == 1)

12   return(0);

13  return(-1);

14 }

11-12 Мы считываем 1 байт из канала FIFO, причем работа приостанавливается, если канал пуст.

Мы еще не реализовали функцию sem_trywait, но это можно сделать, установив флаг отключения блокировки для канала и используя обычный вызов read. Мы также не реализовали функцию sem_getvalue. В некоторых реализациях при вызове функции stat или fstat возвращается количество байтов в именованном или неименованном канале, причем оно помещается в поле st_size структуры stat. Однако это не гарантируется стандартом Posix и, следовательно, не обязательно будет работать в других системах. Пример реализации этих двух функций для работы с семафорами Posix приведен в следующем разделе.

10.15. Реализация с помощью отображения в память

Теперь займемся реализацией именованных семафоров Posix с помощью отображаемых в память файлов вместе со взаимными исключениями и условными переменными Posix. Реализация, аналогичная данной, приведена в разделе В.11.3 Обоснования стандарта IEEE 1996 [8].

ПРИМЕЧАНИЕ

Отображаемые в память файлы описаны в главах 12 и 13. Данный раздел можно отложить, с тем чтобы вернуться к нему после прочтения этих глав. 

Прежде всего приведем текст нашего заголовочного файла semaphore.h (листинг 10.27), в котором определяется фундаментальный тип sem_t.

Тип sem_t

1-7 Структура данных семафора содержит взаимное исключение, условную переменную и беззнаковое целое, в котором хранится текущее значение семафора. Как уже говорилось в связи с листингом 10.21, поле sem_magiс получает значение SEM_MAGIC при инициализации структуры.

Листинг 10.27. Заголовочный файл semaphore.h

//my_pxsem_mmap/semaphore.h

1  /* фундаментальный тип */

2  typedef struct {

3   pthread_mutex_t sem_mutex; /* блокируется при проверке и изменении значения семафора */

4   pthread_cond_t sem_cond; /* при изменении нулевого значения */

5   unsigned int sem_count; /* значение семафора */

6   int sem_magic; /* магическое значение, если семафор открыт */

7  } mysem_t;


8  #define SEM_MAGIC 0x67458923


9  #ifdef SEM_FAILED

10 #undef SEM_FAILED

11 #define SEM_FAILED ((mysem_t *)(-1)) /* чтобы избежать предупреждений компилятора */

12 #endif

Функция sem_open

В листинге 10.28 приведен текст первой части функции sem_open, которая может использоваться для создания нового семафора или открытия существующего.

Листинг 10.28. Функция sem_open: первая половина

//my_pxsem_mmap/sem_open.с

1  #include "unpipc.h"

2  #include "semaphore.h"

3  #include  /* для списков аргументов переменной длины */

4  #define MAX_TRIES 10 /* количество попыток инициализации */


5  mysem_t *

6  mysem_open(const char *pathname, int oflag, …)

7  {

8   int fd, i, created, save_errno;

9   mode_t mode;

10  va_list ap;

11  mysem_t *sem, seminit;

12  struct stat statbuff;

13  unsigned int value;

14  pthread_mutexattr_t mattr;

15  pthread_condattr_t cattr;

16  created = 0;

17  sem = MAP_FAILED; /* [sic] */

18 again:

19  if (oflag & O_CREAT) {

20   va_start(ap, oflag); /* ар инициализируется последним явно указанным аргументом */

21   mode = va_arg(ap, va_mode_t) & ~S_IXUSR;

22   value = va_arg(ap, unsigned int);

23   va_end(ap);

24   /* открываем с указанием флага O_EXCL и установкой бита user-execute */

25   fd = open(pathname, oflag | O_EXCL | O_RDWR, mode | S_IXUSR);

26   if (fd < 0) {

27    if (errno == EEXIST && (oflag & O_EXCL) == 0)

28     goto exists; /* уже существует. OK */

29    else

30     return(SEM_FAILED);

31   }

32   created = 1;

33   /* кто создает файл, тот его и инициализирует */

34   /* установка размера файла */

35   bzero(&seminit, sizeof(seminit));

36   if (write(fd, &seminit, sizeof(seminit)) != sizeof(seminit))

37    goto err;

38   /* отображение файла в память */

39   sem = mmap(NULL, sizeof(mysem_t), PROT_READ | PROT_WRITE,

40    MAP_SHARED, fd, 0);

41   if (sem == MAP_FAILED)

42    goto err;

43   /* инициализация взаимного исключения, условной переменной, значения семафора */

44   if ((i = pthread_mutexattr_init(&mattr)) != 0)

45    goto pthreaderr;

46   pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);

47   i = pthread_mutex_init(&sem->sem_mutex, &mattr);

48   pthread_mutexattr_destroy(&mattr); /* не забыть удалить */

49   if (i != 0)

50    goto pthreaderr;

51   if ((i = pthread_condattr_init(&cattr)) != 0)

52    goto pthreaderr;

53   pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED);

54   i = pthread_cond_init(&sem->sem_cond, &cattr);

55   pthread_condattr_destroy(&cattr); /* не забыть удалить */

56   if (i != 0)

57    goto pthreaderr;

58   if ((sem->sem_count = value) > sysconf(_SC_SEM_VALUE_MAX)) {

59    errno = EINVAL;

60    goto err;

61   }

62   /* инициализация завершена, снимаем бит user-execute */

63   if (fchmod(fd, mode) == –1)

64    goto err;

65   close(fd);

66   sem->sem_magic = SEM_MAGIC;

67   return(sem);

68  }

Работа со списком аргументов переменной длины

19-23 Если при вызове функции указан флаг O_CREAT, мы должны принять четыре аргумента, а не два. Работа со списком аргументов переменной длины с помощью типа va_mode_t уже обсуждалась в связи с листингом 5.17, где мы использовали метод, аналогичный примененному здесь. Мы сбрасываем бит user-execute переменной mode (S_IXUSR) по причинам, которые вскоре будут раскрыты. Создается файл с указанным именем, и для него устанавливается бит user-execute.

Создание нового семафора и обработка потенциальной ситуации гонок

24-32 Если бы при указании флага O_CREAT мы просто открывали файл, отображали в память его содержимое и инициализировали поля структуры sem_t, у нас возникла бы ситуация гонок. Эта ситуация также уже обсуждалась в связи с листингом 5.17, и там мы воспользовались тем же методом, что и сейчас. Такая же ситуация гонок встретится нам, когда мы будем разбираться с листингом 10.37.

Установка размера файла

33-37 Мы устанавливаем размер созданного файла, записывая в него заполненную нулями структуру. Поскольку мы знаем, что только что созданный файл имеет размер 0, для установки его размера мы вызываем именно write, но не ftruncate, потому что, как мы отмечаем в разделе 13.3, Posix не гарантирует, что ftruncate срабатывает при увеличении размера обычных файлов. 

Отображение содержимого файла в память

38-42 Файл отображается в память вызовом mmap. Этот файл будет содержать текущее значение структуры типа sem_t, хотя, поскольку мы только что отобразили файл в память, мы обращаемся к нему через указатель, возвращаемый mmap, и никогда не вызываем read или write.

Инициализация структуры sem_t

43-57 Мы инициализируем три поля структуры sem_t: взаимное исключение, условную переменную и значение семафора. Поскольку именованный семафор Posix может совместно использоваться всеми процессами с соответствующими правами, которым известно его имя, при инициализации взаимного исключения и условной переменной необходимо указать атрибут PTHREAD_PROCESS_SHARED. Чтобы осуществить это для взаимного исключения, нужно сначала проинициализировать атрибуты, вызвав pthread_mutexattr_init, затем установить атрибут совместного использования потоками, вызвав pthread_mutexattr_setpshared, а затем проинициализировать взаимное исключение вызовом pthread_mutex_init. Аналогичные действия придется выполнить и для условной переменной. Необходимо аккуратно уничтожать переменные, в которых хранятся атрибуты, при возникновении ошибок.

Инициализация значения семафора

58-61 Наконец мы помещаем в файл начальное значение семафора. Предварительно мы сравниваем его с максимально разрешенным значением семафора, которое может быть получено вызовом sysconf (раздел 10.13).

Сброс бита user-execute

62-67 После инициализации семафора мы сбрасываем бит user-execute. Это указывает на то, что семафор был успешно проинициализирован. Затем мы закрываем файл вызовом close, поскольку он уже был отображен в память и нам не нужно держать его открытым.

В листинге 10.29 приведен текст второй половины функции sem_open. Здесь возникает ситуация гонок, обрабатываемая так же, как уже обсуждавшаяся в связи с листингом 5.19.

Листинг 10.29. Функция sem_open: вторая половина

//my_pxsem_mmap/sem_open.с

69  exists:

70   if ((fd = open(pathname, O_RDWR)) < 0) {

71    if (errno == ENOENT && (oflag & O_CREAT))

72     goto again;

73    goto err;

74   }

75   sem = mmap(NULL, sizeof(mysem_t), PROT_READ | PROT_WRITE,

76    MAP_SHARED, fd, 0);

77   if (sem == MAP_FAILED)

78    goto err;

79   /* удостоверимся, что инициализация завершена */

80   for (i = 0; i < MAX TRIES; i++) {

81    if (stat(pathname, &statbuff) == –1) {

82     if (errno == ENOENT && (oflag & O_CREAT)) {

83      close(fd);

84      goto again;

85     }

86     goto err;

87    }

88    if ((statbuff.st_mode & S_IXUSR) == 0) {

89     close(fd);

90     sem->sem_magic = SEM_MAGIC;

91     return(sem);

92    }

93    sleep(1);

94   }

95   errno = ETIMEDOUT;

96   goto err;

97  pthreaderr:

98   errno = i;

99  err:

100  /* не даем вызовам unlink и munmap изменить код errno */

101  save_errno = errno;

102  if (created)

103   unlink(pathname);

104  if (sem != MAP_FAILED)

105   munmap(sem, sizeof(mysem_t));

106  close(fd);

107  errno = save_errno;

108  return(SEM_FAILED);

109 }

Открытие существующего семафора

69-78 Здесь мы завершаем нашу работу, если либо не указан флаг O_CREAT, либо он указан, но семафор уже существует. В том и в другом случае мы открываем существующий семафор. Мы открываем файл вызовом open для чтения и записи, а затем отображаем его содержимое в адресное пространство процесса вызовом mmap.

ПРИМЕЧАНИЕ

Теперь легко понять, почему в Posix.1 сказано, что «обращение к копиям семафора приводит к неопределенным результатам». Если именованный семафор реализован через отображение файла в память, он отображается в адресное пространство всех процессов, в которых он открыт. Это осуществляется функцией sem_open для каждого процесса в отдельности. Изменения, сделанные одним процессом (например, изменение счетчика семафора), становятся доступны другим процессам через отображение в память. Если мы сделаем свою собственную копию структуры sem_t, она уже не будет общей для всех процессов. Хотя нам и может показаться, что вызовы срабатывают (функции для работы с семафором не будут возвращать ошибок, по крайней мере до вызова sem_close, которая не сможет отключить отображение для копии отображенного файла), с другими процессами мы при этом взаимодействовать не сможем. Однако заметьте (табл. 1.4), что области памяти с отображаемыми файлами передаются дочерним процессам при вызове fork, поэтому создание копии семафора ядром при порождении нового процесса проблем не вызовет. 

Удостоверимся, что семафор проинициализирован

79-96 Мы должны подождать, пока семафор не будет проинициализирован (если несколько потоков пытаются создать семафор приблизительно одновременно). Для этого мы вызываем stat и проверяем биты разрешений файла (поле st_mode структуры stat). Если бит user-execute снят, структура успешно проинициализирована.

Возврат кодов ошибок

97-108 При возникновении ошибки нужно аккуратно вернуть ее код.

Функция sem_close

В листинге 10.30 приведен текст нашей функции sem_close, которая просто вызывает munmap для отображенного в память файла. Если вызвавший процесс продолжит пользоваться указателем, который был ранее возвращен sem_open, он получит сигнал SIGSEGV.

Листинг 10.30. Функция sem_close

//my_pxsem_mmap/sem_close. с

1  #include "unpipc.h"

2  #include "semaphore.h"


3  int

4  mysem_close(mysem_t *sem)

5  {

6   if (sem->sem_magic != SEM_MAGIC) {

7    errno = EINVAL;

8    return(-1);

9   }

10  if (munmap(sem, sizeof(mysem_t)) == –1)

11  return(-1);

12  return(0);

13 }

Функция sem_unlink

Текст функции sem_unlink приведен в листинге 10.31. Она просто удаляет файл, через который реализован данный семафор, вызывая функцию unlink.

Листинг 10.31. Функция sem_unlink

//my_pxsem_mmap/sem_unlink.с

1 #include "unpipc.h"

2 #include "semaphore.h"


3 int

4 mysem_unlink(const char *pathname)

5 {

6  if (unlink(pathname) == –1)

7   return(-1);

8  return(0);

9 }

Функция sem_post

В листинге 10.32 приведен текст функции sem_post, которая увеличивает значение семафора, возобновляя выполнение всех процессов, заблокированных в ожидании этого события.

Листинг 10.32. Функция sem_post

//my_pxsem_mmap/sem_post.с

1  #include "unpipc.h"

2  #include "semaphore.h"


3  int

4  mysem_post(mysem_t *sem)

5  {

6   int n;

7   if (sem->sem_magic != SEM_MAGIC) {

8    errno = EINVAL;

9    return(-1);

10  }

11  if ((n = pthread_mutex_lock(&sem->sem_mutex)) != 0) {

12   errno = n;

13   return(-1);

14  }

15  if (sem->sem_count == 0)

16   pthread_cond_signal(&sem->sem_cond);

17  sem->sem_count++;

18  pthread_mutex_unlock(&sem->sem_mutex);

19  return(0);

20 }

11-18 Прежде чем работать со структурой, нужно заблокировать соответствующее взаимное исключение. Если значение семафора изменяется с 0 на 1, нужно вызвать pthread_cond_signal, чтобы возобновилось выполнение одного из процессов, зарегистрированных на уведомление по данной условной переменной.

Функция sem_wait

В листинге 10.33 приведен текст функции sem_wait, которая ожидает изменения значения семафора с 0 на положительное, после чего уменьшает его на 1.

Листинг 10.33. Функция sem_wait

//my_pxsem_mmap/sem_wait.с

1  #include "unpipc.h"

2  #include "semaphore.h"


3  int

4  mysem_wait(mysem_t *sem)

5  {

6   int n;

7   if (setn->sem_magic != SEM_MAGIC) {

8    errno = EINVAL;

9    return(-1);

10  }

11  if ((n = pthread_mutex_lock(&sem->sem_mutex)) != 0) {

12   errno = n;

13   return(-1);

14  }

15  while (sem->sem_count == 0)

16   pthread_cond_wait(&sem->sem_cond, &sem->sem_mutex);

17  sem->sem_count--;

18  pthread_mutex_unlock(&sem->sem_mutex);

19  return(0);

20 }

11-18 Прежде чем работать с семафором, нужно заблокировать соответствующее взаимное исключение. Если значение семафора 0, выполнение процесса приостанавливается в вызове pthread_cond_wait до тех пор, пока другой процесс не вызовет pthread_cond_signal для этого семафора, изменив его значение с 0 на 1. После того как значение становится ненулевым, мы уменьшаем его на 1 и разблокируем взаимное исключение.

Функция sem_trywait

В листинге 10.34 приведен текст функции sem_trywait, которая представляет собой просто неблокируемый вариант функции sem_wait.

11-22 Мы блокируем взаимное исключение и проверяем значение семафора. Если оно положительно, мы вычитаем из него 1 и возвращаем вызвавшему процессу код 0. В противном случае возвращается –1, а переменной errno присваивается код ошибки EAGAIN.

Листинг 10.34. Функция sem_trywait

//my_pxsem_nmap/sem_trywait.с

1  #include "unpipc.h"

2  #include "semaphore.h"


3  int

4  mysem_trywait(mysem_t *sem)

5  {

6   int n, rc;

7   if (sem->sem_magic != SEM_MAGIC) {

8    errno = EINVAL;

9    return(-1);

10  }

11  if ((n = pthread_mutex_lock(&sem->sem_mutex)) != 0) {

12   errno = n;

13   return(-1);

14  }

15  if (sem->sem_count > 0) {

16   sem->sem_count--;

17   rc = 0;

18  } else {

19   rc = –1;

20   errno = EAGAIN;

21  }

22  pthread_mutex_unlock(&sem->sem_mutex);

23  return(rc);

24 }

Функция sem_getvalue

В листинге 10.35 приведен текст последней функции в этой реализации — sem_getvalue. Она возвращает текущее значение семафора.

11-16 Мы блокируем соответствующее взаимное исключение и считываем значение семафора.

Листинг 10.35. Функция sem_getvalue

//my_pxsem_mmap/sem_getvalue.c

1  #include "unpipc.h"

2  #include "semaphore.h"


3  int

4  mysem_getvalue(mysem_t *sem, int *pvalue)

5  {

6   int n;

7   if (sem->sem_magic != SEM_MAGIC) {

8    errno = EINVAL;

9    return(-1);

10  }

11  if ((n = pthread_mutex_lock(&sem->sem_mutex)) != 0) {

12   errno = n;

13   return(-1);

14  }

15  *pvalue = sem->sem_count;

16  pthread_mutex_unlock(&sem->sem_mutex);

17  return(0);

18 }

Из этой реализации видно, что семафорами пользоваться проще, чем взаимными исключениями и условными переменными.

10.16. Реализация с использованием семафоров System V

Приведем еще один пример реализации именованных семафоров Posix — на этот раз с использованием семафоров System V. Поскольку семафоры System V появились раньше, чем семафоры Posix, эта реализация позволяет использовать последние в системах, где их поддержка не предусмотрена производителем.

ПРИМЕЧАНИЕ

Семафоры System V описаны в главе 11. Этот раздел можно пропустить при первом чтении, с тем чтобы вернуться к нему по прочтении 11 главы.

Начнем, как обычно, с заголовочного файла semaphore.h (листинг 10.36), который определяет фундаментальный тип данных sem_t.

Листинг 10.36. Заголовочный файл semaphore.h

//my_pxsem_svsem/semaphore.h

1  /* фундаментальный тип данных */

2  typedef struct {

3   int sem_semid; /* идентификатор семафора System V */

4   int sem_magic; /* магическое значение, если семафор открыт */

5  } mysem_t;


6  #define SEM_MAGIC 0x45678923


7  #ifdef SEM_FAILED

8  #undef SEM_FAILED

9  #define SEM_FAILED ((mysem_t *)(-1)) /* исключаем предупреждения компилятора */

10 #endif


11 #ifndef SEMVMX

12 #define SEMVMX 32767 /* исторически сложившееся максимальное значение для семафора System V */

13 #endif

Тип данных sem_t

1-5 Мы реализуем именованный семафор Posix с помощью набора семафоров System V, состоящего из одного элемента. Структура данных семафора содержит идентификатор семафора System V и магическое число (обсуждавшееся в связи с листингом 10.21).

Функция sem_open

В листинге 10.37 приведен текст первой половины функции sem_open, которая создает новый семафор или открывает существующий.

Листинг 10.37. Функция sem_open: первая часть

//my_pxsem_svsem/sem_open. с

1  #include "unpipc.h"

2  #include "semaphore.h"

3  #include  /* для списков аргументов переменной длины */

4  #define MAX_TRIES 10 /* количество попыток инициализации */


5  mysem_t *

6  mysem_open(const char *pathname, int oflag, … )

7  {

8   int i, fd, semflag, semid, save_errno;

9   key_t key;

10  mode_t mode;

11  va_list ap;

12  mysem_t *sem;

13  union semun arg;

14  unsigned int value;

15  struct semid_ds seminfo;

16  struct sembuf initop;

17  /* режим доступа для sem_open() без O_CREAT не указывается; угадываем */

18  semflag = SVSEM_MODE;

19  semid = –1;

20  if (oflag & O_CREAT) {

21   va_start(ap, oflag); /* инициализируем ар последним явно указанным аргументом */

22   mode = va_arg(ap, va_mode_t);

23   value = va_arg(ap, unsigned int);

24   va_end(ap);

25   /* преобразуем в ключ, который будет идентифицировать семафор System V */

26   if ((fd = open(pathname, oflag, mode)) == –1)

27    return(SEM_FAILED);

28   close(fd);

29   if ((key = ftok(pathname, 0)) == (key_t) –1)

30    return(SEM_FAILED);

31   semflag = IPC_CREAT | (mode & 0777);

32   if (oflag & O_EXCL)

33    semflag |= IPC_EXCL;

34    /* создаем семафор System V с флагом IPC_EXCL */

35   if ((semid = semget(key, 1, semflag | IPC_EXCD) >= 0) {

36    /* OK, мы успели первыми, поэтому инициализируем нулем */

37    arg.val = 0;

38    if (semctl(semid, 0, SETVAL, arg) == –1)

39     goto err;

40    /* увеличиваем значение, чтобы sem_otime стало ненулевым */

41    if (value > SEMVMX) {

42     errno = EINVAL;

43     goto err;

44    }

45    initop.sem_num = 0;

46    initop.sem_op = value;

47    initop.sem_flg = 0;

48    if (semop(semid, &initop, 1) == –1)

49     goto err;

50    goto finish;

51   } else if (errno != EEXIST || (semflag & IPC_EXCL) != 0)

52    goto err:

53   /* иначе продолжаем выполнение */

54  }

Создание нового семафора и работа со списком аргументов переменной длины

20-24 Если вызвавший процесс указывает флаг O_CREAT, мы знаем, что функции будут переданы четыре аргумента, а не два. Работа со списком аргументов переменной длины и типом данных va_mode_t обсуждалась в связи с листингом 5.17.

Создание вспомогательного файла и преобразование полного имени в ключ System V IPC

25-30 Создается обычный файл с именем, указываемым при вызове функции. Это делается для того, чтобы указать его имя при вызове функции ftok для последующей идентификации семафора. Аргумент oflag, принятый от вызвавшего процесса, передается функции open для дополнительного файла, что позволяет создать его, если он еще не существует, и вернуть ошибку EEXIST, если файл существует и указан флаг O_EXCL. Дескриптор файла затем закрывается, поскольку единственная цель создания файла была в использовании его имени при вызове ftok, преобразующей полное имя в ключ System V IPC (раздел 3.2).

Создание набора семафоров System V с одним элементом

32-33 Мы преобразуем константы O_CREAT и O_EXCL в соответствующие константы System V IРС_ххх и вызываем semget для создания набора семафоров System V, состоящего из одного элемента. Флаг IPC_EXCL указывается всегда, чтобы можно было определить, существовал ли семафор до вызова функции или был создан ею.

Инициализация семафора

34-50 В разделе 11.2 описана фундаментальная проблема, связанная с инициализацией семафоров System V, а в разделе 11.6 приведен код, позволяющий исключить потенциальную ситуацию гонок. Здесь мы пользуемся аналогичным методом. Первый поток, который создает семафор (вспомните, что мы всегда указываем флаг IPC_EXCL), инициализирует его значением 0 с помощью команды SETVAL при вызове semctl, а затем устанавливает запрошенное вызвавшим процессом начальное значение с помощью semop. Мы можем быть уверены, что значение sem_otime семафора функцией semget устанавливается в 0 и будет изменено на ненулевое вызовом semop. Следовательно, любой поток, работающий с существующим семафором, будет знать, что он уже проинициализирован, если значение sem_otime будет отлично от 0.

Проверка начального значения

40-44 Мы проверяем начальное значение, указанное вызвавшим процессом, поскольку семафоры System V обычно хранятся как беззнаковые короткие целые (unsigned short, структура sem в разделе 11.1) с максимальным значением 32767 (раздел 11.7), тогда как семафоры Posix обычно хранятся как целые с максимально возможным размером (раздел 10.13). Константа SEMVMX определяется некоторыми реализациями как максимальное значение семафора System V, а если она не определена, то мы определяем ее равной 32 767 в листинге 10.36.

52-53 Если семафор уже существует и вызвавший процесс не указал флаг O_EXCL, ошибка не возвращается. В этом случае программа переходит к открытию (не созданию) существующего семафора.

В листинге 10.38 приведен текст второй половины функции sem_open.

Листинг 10.38. Функция sem_open: вторая половина

//my_pxsem_svsem/sem_open.c

55  /*

56   * (O_CREAT не указан) или

57   * (O_CREAT без O_EXCL и семафор уже существует).

58   * Нужно открыть семафор и проверить, что он уже проинициализирован.

59   */

60  if ((key = ftok(pathname, 0)) == (key_t) –1)

61   goto err;

62  if ((semid = semget(key, 0, semflag)) == –1)

63 goto err;

64  arg.buf = &seminfo;

65  for (i = 0; i < MAX_TRIES; i++) {

66   if (semctl(semid, 0, IPC_STAT, arg) == –1)

67    goto err;

68   if (arg.buf->sem_otime != 0)

69    goto finish;

70   sleep(1);

71  }

72  errno = ETIMEDOUT;

73 err:

74  save_errno = errno; /* не даем вызову semctl() изменить значение errno */

75  if (semid != –1)

76   semctl(semid, 0, IPC_RMID);

77  errno = save_errno;

78  return(SEM_FAILED);

79 finish:

80  if ((sem = malloc(sizeof(mysem_t))) == NULL)

81   goto err;

82  sem->sem_semid = semid;

83  sem->sem_magic = SEM_MAGIC;

84  return(sem);

85 }

Открытие существующего семафора

55-63 Если семафор уже создан (флаг O_CREAT не указан или указан, но без O_EXCL, а семафор существует), мы открываем семафор System V с помощью semget. Обратите внимание, что в вызове sem_open указывать аргумент mode не нужно, если не указан флаг O_CREAT, но вызов semget требует указания режима доступа, даже если открывается существующий семафор. Ранее в тексте функции мы присваивали значение по умолчанию (константу SVSEM_MODE из нашего заголовочного файла unpipc.h) переменной, которую теперь передаем semget, если не указан флаг O_CREAT.

Ожидание инициализации семафора

64-72 Проверяем, что семафор уже инициализирован, вызывая semctl с командой IPC_STAT и сравнивая значение поля sem_otime возвращаемой структуры с нулем.

Возврат кода ошибки

73-78 Когда возникает ошибка, мы аккуратно вызываем все последующие функции, чтобы не изменить значение errno.

Выделение памяти под sem_t

79-84 Мы выделяем память под структуру sem_t и помещаем в нее идентификатор семафора System V. Функция возвращает указатель на эту структуру.

Функция sem_close

В листинге 10.39 приведен текст функции sem_close, которая вызывает free для освобождения динамически выделенной под структуру sem_t памяти. 

Листинг 10.39. Функция sem_close

//my_pxsem_svsem/sem_close.с

1  #include "unpipc.h"

2  #include "semaphore.h"


3  int

4  mysem_close(mysem_t *sem)

5  {

6   if (sem->sem_magic != SEM_MAGIC) {

7    errno = EINVAL;

8    return(-1);

9   }

10  sem->sem_magic = 0; /* на всякий случай */

11  free(sem);

12  return(0);

13 }

Функция sem_unlink

Функция sem_unlink, текст которой приведен в листинге 10.40, удаляет вспомогательный файл и семафор System V, связанные с указанным ей семафором Posix.

Листинг 10.40. Функция sem_unlink

//my_pxsem_svsem/sem_unlink.с

1  #include "unpipc.h"

2  #include "semaphore.h"


3  int

4  mysem_unlink(const char *pathname)

5  {

6   int semid;

7   key_t key;

8   if ((key = ftok(pathname, 0)) == (key_t) –1)

9    return(-1);

10  if (unlink(pathname) == –1)

11   return(-1);

12  if ((semid = semget(key, 1, SVSEM_MODE)) == –1)

13   return(-1);

14  if (semctl(semid, 0, IPC_RMID) == –1)

15   return(-1);

16  return(0);

17 }

Получение ключа System V по полному имени

8-16 Функция ftok преобразует полное имя файла в ключ System V IPC. После этого вспомогательный файл удаляется вызовом unlink (именно в этом месте кода, на тот случай, если одна из последующих функций вернет ошибку). Затем мы открываем семафор System V вызовом semget и удаляем его с помощью команды IPC_RMID для semctl.

Функция sem_post

В листинге 10.41 приведен текст функции sem_post, которая увеличивает значение семафора.

11-16 Мы вызываем semop с операцией, увеличивающей значение семафора на 1.

Листинг 10.41. Функция sem_post

//my_pxsem_svsem/sem_post.с

1  #include "unpipc.h"

2  #include "semaphore.h"


3  int

4  mysem_post(mysem_t *sem)

5  {

6   struct sembuf op;

7   if (sem->sem_magic != SEM_MAGIC) {

8    errno * EINVAL;

9    return(-1);

10  }

11  op.sem_num = 0;

12  op.sem_op = 1;

13  op.sem_flg = 0;

14  if (semop(sem->sem_semid, &op, 1) < 0)

15   return(-1);

16  return(0);

17 }

Функция sem_wait

Следующая функция приведена в листинге 10.42; она называется sem_wait и ожидает изменения значения семафора с нулевого на ненулевое, после чего уменьшает значение семафора на 1.

11-16 Мы вызываем semop с операцией, уменьшающей значение семафора на 1.

Листинг 10.42. Функция sem_wait

//my_pxsem_svsem/sem_wait.c

1  #include "unpipc.h"

2  #include "semaphore.h"


3  int

4  mysem_wait(mysem_t *sem)

5  {

6   struct sembuf op;

7   if (sem->sem_magic != SEM_MAGIC) {

8    errno = EINVAL;

9    return(-1);

10  }

11  op.sem_num = 0;

12  op.sem_op = –1;

13  op.sem_flg = 0;

14  if (semop(sem->sem_semid, &op, 1) < 0)

15   return(-1);

16  return(0);

17 }

Функция sem_trywait

В листинге 10.43 приведен текст нашей функции sem_trywait, которая представляет собой неблокируемую версию sem_wait.

13 Единственное отличие от функции sem_wait из листинга 10.42 заключается в том, что флагу sem_flg присваивается значение IPC_NOWAIT. Если операция не может быть завершена без блокирования вызвавшего потока, функция semop возвращает ошибку EAGAIN, а это именно тот код, который должен быть возвращен sem_trywait, если операция не может быть завершена без блокирования потока.

Листинг 10.43. Функция sem_trywait

//my_pxsem_svsem/sem_trywait.c

1  #include "unpipc.h"

2  #include "semaphore.h"


3  int

4  mysem_trywait(mysem_t *sem)

5  {

6   struct sembuf op;

7   if (sem->sem_magic != SEM_MAGIC) {

8    errno = EINVAL;

9    return(-1);

10  }

11  op.sem_num = 0;

12  op.sem_op = –1;

13  op.sem_flg = IPC_NOWAIT;

14  if (semop(sem->sem_semid, &op, 1) < 0)

15   return(-1);

16  return(0);

17 }

Функция sem_getvalue

Последняя функция приведена в листинге 10.44. Это функция sem_getvalue, возвращающая текущее значение семафора.

11-14 Текущее значение семафора получается отправкой команды GETVAL функции semctl.

Листинг 10.44. Функция sem_getvalue

//my_pxsem_svsem/sem_getvalue.с

1  #include "unpipc.h"

2  #include "semaphore.h"


3  int

4  mysem_getvalue(mysem_t *sem, int *pvalue)

5  {

6   int val;

7   if (sem->sem_magic != SEM_MAGIC) {

8    errno = EINVAL;

9    return(-1);

10  }

11  if ((val = semctl(sem->sem_semid, 0, GETVAL)) < 0)

12   return(-1);

13  *pvalue = val;

14  return(0);

15 }

10.17. Резюме

Семафоры Posix представляют собой семафоры-счетчики, для которых определены три основные операции:

1. Создание семафора.

2. Ожидание изменения значения семафора на ненулевое и последующее уменьшение значения.

3. Увеличение значения семафора на 1 и возобновление выполнения всех процессов, ожидающих его изменения.

Семафоры Posix могут быть именованными или неименованными (размещаемыми в памяти). Именованные семафоры всегда могут использоваться отдельными процессами, тогда как размещаемые в памяти должны для этого изначально планироваться как разделяемые между процессами. Эти типы семафоров также отличаются друг от друга по живучести: именованные семафоры обладают по меньшей мере живучестью ядра, тогда как размещаемые в памяти обладают живучестью процесса.

Задача производителей и потребителей является классическим примером для иллюстрации использования семафоров. В этой главе первое решение состояло из одного потока-производителя и одного потока-потребителя; второе решение имело нескольких производителей и одного потребителя, а последнее решение допускало одновременную работу и нескольких потребителей. Затем мы показали, что классическая задача двойной буферизации является частным случаем задачи производителей и потребителей с одним производителем и одним потребителем.

В этой главе было приведено три примера возможной реализации семафоров Posix. Первый пример был самым простым, в нем использовались каналы FIFO, а большая часть забот по синхронизации ложилась на ядро (функции read и write). Следующая реализация использовала отображение файлов в память (аналогично реализации очередей сообщений Posix из раздела 5.8), а также взаимное исключение и условную переменную (для синхронизации). Последняя реализация была основана на семафорах System V и представляла собой, по сути, удобный интерфейс для работы с ними.

Упражнения

1. Измените функции produce и consume из раздела 10.6 следующим образом. Поменяйте порядок двух вызовов Sem_wait в потребителе, чтобы возникла ситуация зависания (как описано в разделе 10.6). Затем добавьте вызов printf перед каждым Sem_wait, чтобы было ясно, какой из потоков ожидает изменения семафора. Добавьте еще один вызов printf после каждого Sem_wait, чтобы можно было определить, какой поток получил управление. Уменьшите количество буферов до двух, а затем откомпилируйте и выполните эту программу, чтобы убедиться, что она зависнет.

2. Предположим, что запущено четыре экземпляра программы, вызывающей функцию my_lock из листинга 10.10:

% lockpxsem & lockpxsem & lockpxsem & lockpxsem &

Каждый из четырех процессов запускается с значением initflag, равным 0, поэтому при вызове sem_open всегда указывается O_CREAT. Нормально ли это?

3. Что произойдет в предыдущем примере, если одна из четырех программ будет завершена после вызова my_lock, но перед вызовом my_unlock?

4. Что произошло бы с программой в листинге 10.22, если бы мы не инициализировали оба дескриптора значением –1?

5. Почему в листинге 10.22 мы сохраняем значение errno, а затем восстанавливаем его, вместо того чтобы написать просто:

if (sem->fd[0] >= 0) close(sem->fd[0]);

if (sem->fd[1] >= 0) close(sem->fd[1]);

6. Что произойдет, если два процесса вызовут нашу реализацию sem_open через FIFO (листинг 10.22) примерно одновременно, указывая флаг O_CREAT и начальное значение 5? Может ли канал быть инициализирован (неправильно) значением 10?

7. В связи с листингами 10.28 и 10.29 мы описали возможную ситуацию гонок в случае, если два процесса пытаются создать семафор примерно одновременно. Однако решение предыдущей задачи в листинге 10.22 не создавало ситуации гонок. Объясните это.

8. Стандарт Posix.1 указывает дополнительную возможность для функции semwait: она может прерываться перехватываемым сигналом и возвращать код EINTR. Напишите тестовую программу, которая определяла бы, есть ли такая возможность в вашей реализации.

Запустите эту тестовую программу с нашими реализациями, использующими FIFO (раздел 10.14), отображение в память (раздел 10.15) и семафоры System V (раздел 10.16).

9. Какая из трех реализаций sem_post этой главы является функцией типа async-signal-safe (табл. 5.1)?

10. Измените решение задачи о потребителе и производителе в разделе 10.6 так, чтобы для переменной mutex использовался тип pthread_mutex_t, а не семафор. Заметна ли разница в скорости работы программы?

11. Сравните быстродействие именованных семафоров (листинги 10.8 и 10.9) и размещаемых в памяти (листинг 10.11).

ГЛАВА 11