UNIX: взаимодействие процессов — страница 9 из 35

СИНХРОНИЗАЦИЯ

ГЛАВА 7Взаимные исключения и условные переменные

7.1. Введение

Эта глава начинается с обсуждения синхронизации — принципов синхронизации действий нескольких программных потоков или процессов. Обычно это требуется для предоставления нескольким потокам или процессам совместного доступа к данным. Взаимные исключения (mutual exclusion — mutex) и условные переменные (conditional variables) являются основными средствами синхронизации.

Взаимные исключения и условные переменные появились в стандарте Posix.1 для программных потоков, и всегда могут быть использованы для синхронизации отдельных потоков одного процесса. Стандарт Posix также разрешает использовать взаимное исключение или условную переменную и для синхронизации нескольких процессов, если это исключение или переменная хранится в области памяти, совместно используемой процессами.

ПРИМЕЧАНИЕ

Эта возможность является дополнительной согласно Posix, но обязательной в Unix 98 (см. табл. 1.3).

В этой главе мы разберем классическую схему производитель-потребитель, используя взаимные исключения и условные переменные. В примере будут использоваться программные потоки, а не процессы, поскольку предоставить потокам общий буфер данных, предполагаемый в этой задаче, легко, а вот создать буфер данных между процессами можно только с помощью одной из форм разделяемой памяти (которая будет описана только в четвертой части книги). Еще одно решение этой задачи, уже с использованием семафоров, появится в главе 10.

7.2. Взаимные исключения: установка и снятие блокировки

Взаимное исключение (mutex) является простейшей формой синхронизации. Оно используется для защиты критической области (critical region), предотвращая одновременное выполнение участка кода несколькими потоками (если взаимное исключение используется потоками) или процессами (если взаимное исключение используется несколькими процессами). Выглядит это обычно следующим образом:

блокировать_mutex(…);

критическая область

разблокировать_mutex(…);

Поскольку только один поток может заблокировать взаимное исключение в любой момент времени, это гарантирует, что только один поток будет выполнять код, относящийся к критической области.

Взаимные исключения по стандарту Posix объявлены как переменные с типом pthread_mutex_t. Если переменная-исключение выделяется статически, ее можно инициализировать константой PTHREAD_MUTEX_INITIALIZER:

static pthread_mutex_t lock=PTHREAD_MUTEX_INITIALIZER;

При динамическом выделении памяти под взаимное исключение (например, вызовом mallос) или при помещении его в разделяемую память мы должны инициализировать эту переменную во время выполнения, вызвав функцию pthread_ mutex_init, как показано в разделе 7.7.

ПРИМЕЧАНИЕ

Вам может попасться код, в котором взаимные исключения не инициализируются, поскольку в реализации инициализирующая константа имеет значение 0, а статические переменные автоматически инициализируются этим значением. Однако такой код некорректен.

Следующие три функции используются для установки и снятия блокировки взаимного исключения:

#include 

int pthread_mutex_lock(pthread_mutex_t *mptr);

int pthread_mutex_trylock(pthread_mutex_t *mptr);

int pthread_mutex_unlock(pthread_mutex_t *mptr);

/* Все три возвращают 0 в случае успешного завершения, положительное значение Еххх – случае ошибки */

При попытке заблокировать взаимное исключение, которое уже заблокировано другим потоком, функция pthread_mutex_lock будет ожидать его разблокирования, a pthread_mutex_trylock (неблокируемая функция) вернет ошибку с кодом EBUSY.

ПРИМЕЧАНИЕ

Если несколько процессов ожидают освобождения взаимного исключения, какой из них начнет выполняться первым? Одна из возможностей, добавленных стандартом 1003.1b-1993, заключается в установке приоритета потоков. Мы не будем углубляться в эту область, отметим лишь, что разным потокам могут быть присвоены разные значения приоритета и функции синхронизации (работающие с взаимными исключениями, блокировками чтения-записи и семафорами) будут передавать управление заблокированному потоку с наивысшим значением приоритета. Раздел 5.5 книги [3] описывает возможности планирования выполнения в Posix.1 более подробно.

Хотя мы говорим о защите критической области кода программы, на самом деле речь идет о защите данных, с которыми работает эта часть кода. То есть взаимное исключение обычно используется для защиты совместно используемых несколькими потоками или процессами данных.

Взаимные исключения представляют собой блокировку коллективного пользования. Это значит, что если совместно используемые данные представляют собой, например, связный список, то все потоки, работающие с этим списком, должны блокировать взаимное исключение. Ничто не может помешать потоку работать со списком, не заблокировав взаимное исключение. Взаимные исключения предполагают добровольное сотрудничество потоков.

7.3. Схема производитель-потребитель

Одна из классических задач на синхронизацию называется задачей производителя и потребителя. Она также известна как задача ограниченного буфера. Один или несколько производителей (потоков или процессов) создают данные, которые обрабатываются одним или несколькими потребителями. Эти данные передаются между производителями и потребителями с помощью одной из форм IPC.

С этой задачей мы регулярно сталкиваемся при использовании каналов Unix. Команда интерпретатора, использующая канал

grep pattern chapters.* | wc -l

является примером такой задачи. Программа grep выступает как производитель (единственный), a wc — как потребитель (тоже единственный). Канал используется как форма IPC. Требуемая синхронизация между производителем и потребителем обеспечивается ядром, обрабатывающим команды write производителя и read покупателя. Если производитель опережает потребителя (канал переполняется), ядро приостанавливает производителя при вызове write, пока в канале не появится место. Если потребитель опережает производителя (канал опустошается), ядро приостанавливает потребителя при вызове read, пока в канале не появятся данные.

Такой тип синхронизации называется неявным; производитель и потребитель не знают о том, что синхронизация вообще осуществляется. Если бы мы использовали очередь сообщений Posix или System V в качестве средства IPC между производителем и потребителем, ядро снова взяло бы на себя обеспечение синхронизации.

При использовании разделяемой памяти как средства IPC производителя и потребителя, однако, требуется использование какого-либо вида явной синхронизации. Мы продемонстрируем это на использовании взаимного исключения. Схема рассматриваемого примера изображена на рис. 7.1.

В одном процессе у нас имеется несколько потоков-производителей и один поток-потребитель. Целочисленный массив buff содержит производимые и потребляемые данные (данные совместного пользования). Для простоты производители просто устанавливают значение buff[0] в 0, buff [1] в 1 и т.д. Потребитель перебирает элементы массива, проверяя правильность записей.

В этом первом примере мы концентрируем внимание на синхронизации между отдельными потоками-производителями. Поток-потребитель не будет запущен, пока все производители не завершат свою работу. В листинге 7.1 приведена функция main нашего примера.

Рис. 7.1. Производители и потребитель


Листинг 7.1.[1] Функция main

//mutex/prodcons2.с

1  #include "unpipc.h"

2  #define MAXNITEMS 1000000

3  #define MAXNTHREADS 100

4  int nitems; /* только для чтения потребителем и производителем */


5  struct {

6   pthread_mutex_t mutex;

7   int buff[MAXNITEMS];

8   int nput;

9   int nval;

10 } shared = {

11  PTHREAD_MUTEX_INITIALIZER

12 };


13 void *produce(void *), *consume(void *);


14 int

15 main(int argc, char **argv)

16 {

17  int i, nthreads, count[MAXNTHREADS];

18  pthread_t tid_produce[MAXNTHREADS], tid_consume;

19  if (argc != 3)

20   err_quit("usage: prodcons2 <#items><#threads>");

21  nitems = min(atoi(argv[1]), MAXNITEMS);

22  nthreads = min(atoi(argv[2]), MAXNTHREADS);

23  Set_concurrency(nthreads);

24  /* запуск всех потоков-производителей */

25  for (i = 0; i < nthreads; i++) {

26   count[i] = 0;

27   Pthread_create(&tid_produce[i], NULL, produce, &count[i]);

28  }

29  /* ожидание завершения всех производителей */

30  for (i = 0; i < nthreads; i++) {

31   Pthread_join(tid_produce[i], NULL);

32   printf("count[%d] = %d\n", i, count[i]);

33  }

34  /* запуск и ожидание завершения потока-потребителя */

35  Pthread_create(&tid_consume, NULL, consume, NULL);

36  Pthread_join(tid_consume, NULL);

37  exit(0);

38 }

Совместное использование глобальных переменных потоками

4-12 Эти переменные совместно используются потоками. Мы объединяем их в структуру с именем shared вместе с взаимным исключением, чтобы подчеркнуть, что доступ к ним можно получить только вместе с ним. Переменная nput хранит индекс следующего элемента массива buff, подлежащего обработке, a nval содержит следующее значение, которое должно быть в него помещено (0, 1, 2 и т.д.). Мы выделяем память под эту структуру и инициализируем взаимное исключение, используемое для синхронизации потоков-производителей.

ПРИМЕЧАНИЕ

Мы всегда будем стараться размещать совместно используемые данные вместе со средствами синхронизации, к ним относящимися (взаимными исключениями, условными переменными, семафорами), в одной структуре, как мы сделали в этом примере. Это хороший стиль программирования. Однако во многих случаях совместно используемые данные являются динамическими, представляя собой, например, связный список. Мы, наверное, сможем поместить в структуру первый элемент списка вместе со средствами синхронизации (как в структуре mq_hdr в листинге 5.16), но оставшаяся часть списка в структуру не попадет. Следовательно, это решение не всегда является идеальным.

Аргументы командной строки

19-22 Первый аргумент командной строки указывает количество элементов, которые будут произведены производителями, а второй — количество запускаемых потоков-производителей.

Установка уровня параллельности

23 Функция set_concurrency (наша собственная) указывает подсистеме потоков количество одновременно выполняемых потоков. В Solaris 2.6 она просто вызывает thr_setconcurrency, причем ее запуск необходим, если мы хотим, чтобы у нескольких процессов-производителей была возможность начать выполняться. Если мы не сделаем этого вызова в системе Solaris, будет запущен только первый поток. В Digital Unix 4.0B наша функция set_concurrency не делает ничего, поскольку в этой системе по умолчанию все потоки процесса имеют равные права на вычислительные ресурсы.

ПРИМЕЧАНИЕ

Unix 98 требует наличия функции pthread_setconcurrency, выполняющей это же действие. Эта функция требуется для тех реализаций, которые мультиплексируют пользовательские потоки (создаваемые функцией pthread_create) на небольшое множество выполняемых потоков ядра. Такие реализации часто называются «многие-к-немногим» (many-to-few), «двухуровневые» (two-level) или «М-на-N» (M-to-N). В разделе 5.6 книги [3] отношения между пользовательскими потоками и потоками ядра рассматриваются более подробно.

Создание процессов-производителей

24-28 Создаются потоки-производители, каждый из которых вызывает функцию produce. Идентификаторы потоков хранятся в массиве tid_produce. Аргументом каждого потока-производителя является указатель на элемент массива count. Счетчики инициализируются значением 0, и каждый поток увеличивает значение своего счетчика на 1 при помещении очередного элемента в буфер. Содержимое массива счетчиков затем выводится на экран, так что мы можем узнать, сколько элементов было помещено в буфер каждым из потоков.

Ожидание завершения работы производителей, запуск потребителя

29-36 Мы ожидаем завершения работы всех потоков-производителей, выводя содержимое счетчика для каждого потока, а затем запускаем единственный процесс-потребитель. Таким образом (на данный момент) мы исключаем необходимость синхронизации между потребителем и производителями. Мы ждем завершения работы потребителя, а затем завершаем работу процесса. В листинге 7.2 приведен текст функций produce и consume.

Листинг 7.2. Функции produce и consume

//mutex/prodcons2.с

39 void *

40 produce(void *arg)

41 {

42  for (;;) {

43   Pthread_mutex_lock(&shared.mutex);

44   if (shared.nput >= nitems) {

45    Pthread_mutex_unlock(&shared.mutex);

46    return(NULL); /* массив полный, готово */

47   }

48   shared.buff[shared.nput] = shared.nval;

49   shared.nput++;

50   shared.nval++;

51   Pthread_mutex_unlock(&shared.mutex);

52   *((int *) arg) += 1;

53  }

54 }


55 void *

56 consume(void *arg)

57 {

58  int i;

59  for (i = 0; i < nitems; i++) {

60   if (shared.buff[i] != i)

61    printf("buff[%d] = %d\n", i, shared.buff[i]);

62  }

63  return(NULL);

64 }

Формирование данных

42-53 Критическая область кода производителя состоит из проверки на достижение конца массива (завершение работы)

if (shared.nput >= nitems)

и трех строк, помещающих очередное значение в массив:

shared.buff[shared.nput] = shared.nval;

shared.nput++;

shared.nval++;

Мы защищаем эту область с помощью взаимного исключения, не забыв разблокировать его после завершения работы. Обратите внимание, что увеличение элемента count (через указатель arg) не относится к критической области, поскольку у каждого потока счетчик свой (массив count в функции main). Поэтому мы не включаем эту строку в блокируемую взаимным исключением область. Один из принципов хорошего стиля программирования заключается в минимизации объема кода, защищаемого взаимным исключением.

Потребитель проверяет содержимое массива

59-62 Потребитель проверяет правильность значений всех элементов массива и выводит сообщение в случае обнаружения ошибки. Как уже говорилось, эта функция запускается в единственном экземпляре и только после того, как все потоки-производители завершат свою работу, так что надобность в синхронизации отсутствует.

При запуске только что описанной программы с пятью процессами-производителями, которые должны вместе создать один миллион элементов данных, мы получим следующий результат:

solaris % prodcons2 1000000 5

count[0] = 167165

count[1] = 249891

count[2] = 194221

count[3] = 191815

count[4] = 196908

Как мы отмечали ранее, если убрать вызов set_concurrency, в системе Solaris 2.6 значение count[0] будет 1000000, а все остальные счетчики будут нулевыми.

Если убрать из этого примера блокировку с помощью взаимного исключения, он перестанет работать, как и предполагается. Потребитель обнаружит множество элементов buff[i], значения которых будут отличны от i. Также мы можем убедиться, что удаление блокировки ничего не изменит, если будет выполняться только один поток.

7.4. Блокировка и ожидание

Продемонстрируем теперь, что взаимные исключения предназначены для блокирования, но не для ожидания. Изменим наш пример из предыдущего раздела таким образом, чтобы потребитель запускался сразу же после запуска всех производителей. Это даст возможность потребителю обрабатывать данные по мере их формирования производителями в отличие от пpoгрaммы в листинге 7.1, в которой потребитель не запускался до тех пор, пока все производители не завершали свою работу. Теперь нам придется синхронизовать потребителя с производителями, чтобы первый обрабатывал только данные, уже сформированные последними.

В листинге 7.3 приведен текст функции main. Начало кода (до объявления функции main) не претерпело никаких изменений по сравнению с листингом 7.1.

Листинг 7.3. Функция main: запуск потребителя сразу после запуска производителей

//mutex/prodcons3.c

14 int

15 main(int argc, char **argv)

16 {

17  int i, nthreads, count[MAXNTHREADS];

18  pthread_t tid_produce[MAXNTHREADS], tid_consume;

19  if (argc != 3)

20   err_quit("usage: prodcons3 <#items><#threads>");

21  nitems = min(atoi(argv[1]), MAXNITEMS);

22  nthreads = min(atoi(argv[2]), MAXNTHREADS);

23  /* создание всех производителей и одного потребителя */

24  Set_concurrency(nthreads + 1);

25  for (i = 0; i < nthreads; i++) {

26   count[i] = 0;

27   Pthread_create(&tid_produce[i], NULL, produce, &count[i]);

28  }

29  Pthread_create(&tid_consume, NULL, consume, NULL);

30  /* ожидание завершения производителей и потребителя */

31  for (i = 0; i < nthreads; i++) {

32   Pthread_join(tid_produce[i], NULL);

33   printf("count[%d] = %d\n", i, count[i]);

34  }

35  Pthread_join(tid_consume, NULL);

36  exit(0);

37 }

24 Мы увеличиваем уровень параллельного выполнения на единицу, чтобы учесть поток-потребитель, выполняемый параллельно с производителями.

25-29 Поток-потребитель создается сразу же после создания потоков-производителей.

Функция produce по сравнению с листингом 7.2 не изменяется. В листинге 7.4 приведен текст функции consume, вызывающей новую функцию consume_wait. 

Листинг 7.4. Функции consume и consume_wait

//mutex/prodcons3.с

54 void

55 consume wait(int i)

56 {

57  for (;;) {

58   Pthread_mutex_lock(&shared.mutex);

59   if (i < shared.nput) {

60    Pthread_mutex_unlock(&shared.mutex);

61    return; /* элемент готов */

62   }

63   Pthread_mutex_unlock(&shared.mutex);

64  }

65 }


66 void *

67 consume(void *arg)

68 {

69  int i;

70  for (i = 0; i < nitems; i++) {

71   consume_wait(i);

72   if (shared.buff[i] != i)

73    printf("buff[%d] = %d\n", i, shared.buff[i]);

74  }

75  return(NULL);

76 }

Потребитель должен ждать

71 Единственное изменение в функции consume заключается в добавлении вызова consume_wait перед обработкой следующего элемента массива.

Ожидание производителей

57-64 Наша функция consume_wait должна ждать, пока производители не создадут i-й элемент. Для проверки этого условия производится блокировка взаимного исключения и значение i сравнивается с индексом производителя nput. Блокировка необходима, поскольку значение nput может быть изменено одним из производителей в момент его проверки.

Главная проблема — что делать, если нужный элемент еще не готов. Все, что нам остается и что мы делаем в листинге 7.4, — это повторять операции в цикле, устанавливая и снимая блокировку и проверяя значение индекса. Это называется опросом (spinning или polling) и является лишней тратой времени процессора.

Мы могли бы приостановить выполнение процесса на некоторое время, но мы не знаем, на какое. Что нам действительно нужно — это использовать какое-то другое средство синхронизации, позволяющее потоку или процессу приостанавливать работу, пока не произойдет какое-либо событие.

7.5. Условные переменные: ожидание и сигнализация

Взаимное исключение используется для блокирования, а условная переменная — для ожидания. Это два различных средства синхронизации, и оба они нужны. Условная переменная представляет собой переменную типа pthread_cond_t. Для работы с такими переменными предназначены две функции:

#include 

int pthread_cond_wait(pthread_cond_t *cptr, pthread_m_tex_t *mptr);

int pthread_cond_signal(pthread_cond_t *cptr);

/* Обе функции возвращают 0 в случае успешного завершения, положительное значение Еххх – в случае ошибки */

Слово signal в имени второй функции не имеет никакого отношения к сигналам Unix SIGxxx.

Мы определяем условие, уведомления о выполнении которого будем ожидать.

Взаимное исключение всегда связывается с условной переменной. При вызове pthread_cond_wait для ожидания выполнения какого-либо условия мы указываем адрес условной переменной и адрес связанного с ней взаимного исключения.

Мы проиллюстрируем использование условных переменных, переписав пример из предыдущего раздела. В листинге 7.5 объявляются глобальные переменные.

Переменные производителя и взаимное исключение объединяются в структуру

7-13 Две переменные nput и rival ассоциируются с mutex, и мы объединяем их в структуру с именем put. Эта структура используется производителями.

14-20 Другая структура, nready, содержит счетчик, условную переменную и взаимное исключение. Мы инициализируем условную переменную с помощью PTHREAD_ COND_INITIALIZER.

Функция main по сравнению с листингом 7.3 не изменяется.

Листинг 7.5. Глобальные переменные: использование условной переменной

//mutex/prodcons6.c

1  #include "unpipc.h"

2  #define MAXNITEMS 1000000

3  #define MAXNTHREADS 100


4  /* глобальные переменные для всех потоков */

5  int nitems; /* только для чтения потребителем и производителем */

6  int buff[MAXNITEMS];


7  struct {

8   pthread_mutex_t mutex;

9   int nput; /* следующий сохраняемый элемент */

10  int nval; /* следующее сохраняемое значение */

11 } put = {

12  PTHREAD_MUTEX_INITIALIZER

13 };


14 struct {

15  pthread_mutex_t mutex:

16  pthread_cond_t cond;

17  int nready; /* количество готовых для потребителя */

18 } nready = {

19  PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER

20 };

Функции produce и consume претерпевают некоторые изменения. Их текст дан в листинге 7.6.

Листинг 7.6. Функции produce и consume

//mutex/prodcons6.c

46 void *

47 produce(void *arg)

48 {

49  for (;;) {

50   Pthread_mutex_lock(&put.mutex);

51   if (put.nput >= nitems) {

52    Pthread_mutex_unlock(&put.mutex);

53    return(NULL); /* массив заполнен, готово */

54   }

55   buff[put.nput] = put.nval;

56   put.nput++;

57   put.nval++;

58   Pthread_mutex_unlock(&put.mutex);

59   Pthread_mutex_lock(&nready.mutex):

60   if (nready.nready == 0)

61    Pthread_cond_signal(&nready.cond);

62   nready.nready++;

63   Pthread_mutex_unlock(&nready.mutex);

64   *((int *) arg) += 1;

65  }

66 }


67 void*

68 consume(void *arg)

69 {

70  int i;

71  for (i = 0; i < nitems; i++) {

72   Pthread_mutex_lock(&nready.mutex);

73   while (nready.nready == 0)

74    Pthread_cond_wait(&nready.cond, &nready.mutex);

75   nready.nready--;

76   Pthread_mutex_unlock(&nready.mutex);

77   if (buff[i] != i)

78    printf("buff[%d] = *d\n", i, buff[i]);

79  }

80  return(NULL);

81 }

Помещение очередного элемента в массив

50-58 Для блокирования критической области в потоке-производителе теперь используется исключение put.mutex. 

Уведомление потребителя

59-64 Мы увеличиваем счетчик nready.nready, в котором хранится количество элементов, готовых для обработки потребителем. Перед его увеличением мы проверяем, не было ли значение счетчика нулевым, и если да, то вызывается функция pthread_cond_signal, позволяющая возобновить выполнение всех потоков (в данном случае потребителя), ожидающих установки ненулевого значения этой переменной. Теперь мы видим, как взаимодействуют взаимное исключение и связанная с ним условная переменная. Счетчик используется совместно потребителем и производителями, поэтому доступ к нему осуществляется с блокировкой соответствующего взаимного исключения (nready.mutex). Условная переменная используется для ожидания и передачи сигнала.

Потребитель ждет, пока значение nready.nready не станет отличным от нуля

72-76 Потребитель просто ждет, пока значение счетчика nready. nready не станет отличным от нуля. Поскольку этот счетчик используется совместно с производителями, его значение можно проверять только при блокировке соответствующего взаимного исключения. Если при проверке значение оказывается нулевым, мы вызываем pthread_cond_wait для приостановки процесса. При этом выполняются два атомарных действия:

1. Разблокируется nready.mutex.

2. Выполнение потока приостанавливается, пока какой-нибудь другой поток не вызовет pthread_cond_signal для этой условной переменной.

Перед возвращением управления потоку функция pthread_cond_wait блокирует nready.mutex. Таким образом, если после возвращения из функции мы обнаруживаем, что счетчик имеет ненулевое значение, мы уменьшаем этот счетчик (зная, что взаимное исключение заблокировано) и разблокируем взаимное исключение. Обратите внимание, что после возвращения из pthread_cond_wait мы всегда заново проверяем условие, поскольку может произойти ложное пробуждение при отсутствии выполнения условия. Различные реализации стремятся уменьшить количество ложных пробуждений, но они все равно происходят.

Код, передающий сигнал условной переменной, выглядит следующим образом:

struct {

 pthread_mutex_t mutex;

 pthread_cond_t cond;

 переменные, для которых устанавливается условие

} var = { PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER, … };


Pthread_mutex_lock(&var.mutex);

установка истинного значения условия

Pthread_cond_signal(&var.cond);

Pthread_mutex_unlock(&var.mutex);

В нашем примере переменная, для которой устанавливалось условие, представляла собой целочисленный счетчик, а установка условия означала просто увеличение счетчика. Мы оптимизировали программу, посылая сигнал только при изменении значения счетчика с 0 на 1.

Код, проверяющий условие и приостанавливающий процесс, если оно не выполняется, обычно выглядит следующим образом:

Pthread_mutex_lock(&var.mutex);

while (условие ложно)

 Pthread_cond_wait(&var.cond, &var.mutex);

изменение условия

Pthread_mutex_unlock(&var.mutex);

Исключение конфликтов блокировок

В приведенном выше фрагменте кода, как и в листинге 7.6, функция pthread_cond_signal вызывалась потоком, блокировавшим взаимное исключение, относящееся к условной переменной, для которой отправлялся сигнал. Мы можем представить себе, что в худшем варианте система немедленно передаст управление потоку, которому направляется сигнал, и он начнет выполняться и немедленно остановится, поскольку не сможет заблокировать взаимное исключение. Альтернативный код, помогающий этого избежать, для листинга 7.6 будет иметь следующий вид:

int dosignal;

Pthread_mutex_lock(nready.mutex);

dosignal = (nready.nready == 0);

nready.nready++;

Pthread_mutex_unlock(&nready.mutex);

if (dosignal)

 Pthread_cond_signal(&nready.cond);

Здесь мы отправляем сигнал условной переменной только после разблокирования взаимного исключения. Это разрешено стандартом Posix: поток, вызывающий pthread_cond_signal, не обязательно должен в этот момент блокировать связанное с переменной взаимное исключение. Однако Posix говорит, что если требуется предсказуемое поведение при одновременном выполнении потоков, это взаимное исключение должно быть заблокировано процессом, вызывающим pthread_cond_signal.

7.6. Условные переменные: время ожидания и широковещательная передача

В обычной ситуации pthread_cond_signal запускает выполнение одного потока, ожидающего сигнал по соответствующей условной переменной. В некоторых случаях поток знает, что требуется пробудить несколько других процессов. Тогда можно воспользоваться функцией pthread_cond_broadcast для пробуждения всех процессов, заблокированных в ожидании сигнала данной условной переменной.

ПРИМЕЧАНИЕ

Пример ситуации, в которой требуется возобновление выполнения нескольких процессов, появится в главе 8, когда мы будем обсуждать задачу с несколькими считывающими и записывающими процессами. Когда записывающий процесс снимает блокировку, он должен разбудить все ждущие считывающие процессы, поскольку одновременное считывание в данном случае разрешено.

Альтернативным (и более безопасным) методом является использование широковещательной передачи во всех тех случаях, когда требуется использование сигналов. Сигнал является оптимизацией для тех случаев, когда известно, что все ожидающие процессы правильно написаны и требуется разбудить только один из них, и какой именно будет разбужен, значения не имеет. Во всех других ситуациях следует использовать широковещательную передачу. 

#include 

int pthread_cond_broadcast(pthread_cond_t *cptr);

int pthread_cond_timedwait(pthread_cond_t *cptr, pthread_mutex_t *mptr, const struct timespec *abstime);

/* Функции возвращают 0 в случае успешного завершения, положительный код Еххх — в случае ошибки */

Функция pthread_cond_timedwait позволяет установить ограничение на время блокирования процесса. Аргумент abstime представляет собой структуру timespec:

struct timespec {

 time_t tv_sec; /* секунды */

 long tv_nsec; /* наносекунды */

};

Эта структура задает конкретный момент системного времени, в который происходит возврат из функции, даже если сигнал по условной переменной еще не будет получен. В этом случае возвращается ошибка с кодом ETIMEDOUT.

Эта величина представляет собой абсолютное значение времени, а не промежуток. Аргумент abstime задает таким образом количество секунд и наносекунд с 1 января 1970 UTC до того момента времени, в который должен произойти возврат из функции. Это отличает функцию от select, pselect и poll (глава 6 [24]), которые в качестве аргумента принимают некоторое количество долей секунды, спустя которое должен произойти возврат. (Функция select принимает количество микросекунд, pselect — наносекунд, a poll — миллисекунд.) Преимущество использования абсолютного времени заключается в том, что если функция возвратится до ожидаемого момента (например, при перехвате сигнала), ее можно будет вызвать еще раз, не изменяя содержимого структуры timespec.

7.7. Атрибуты взаимных исключений и условных переменных

В наших примерах в этой главе мы хранили взаимные исключения и условные переменные как глобальные данные всего процесса, поскольку они использовались для синхронизации потоков внутри него. Инициализировали мы их с помощью двух констант: PTHREAD_MUTEX_INITIALIZER и PTHREAD_COND_INTIALIZER. Инициализируемые таким образом исключения и условные переменные приобретали значения атрибутов по умолчанию, но мы можем инициализировать их и с другими значениями атрибутов.

Прежде всего инициализировать и удалять взаимное исключение и условную переменную можно с помощью функций

#include 

int pthread_mutex_imt(pthread_mutex_t *mptr, const pthread_mutexattr_t *attr);

int pthread_mutex_destroy(pthread_mutex_t *mptr);

int pthread_cond_init(pthread_cond_t *cрtr, const pthread_condattr_t *attr);

int pthread_cond_destroy(pthread_cond_t *cptr);

/* Все четыре функции возвращают 0 в случае успешного завершения работы, положительное значение Еххх – в случае ошибки */

Рассмотрим, например, взаимное исключение. Аргумент mptr должен указывать на переменную типа pthread_mutex_t, для которой должна быть уже выделена память, и тогда функция pthread_mutex_init инициализирует это взаимное исключение. Значение типа pthread_mutexattr_t, на которое указывает второй аргумент функции pthread_mutex_init(attr ), задает атрибуты этого исключения. Если этот аргумент представляет собой нулевой указатель, используются значения атрибутов по умолчанию.

Атрибуты взаимного исключения имеют тип pthread_mutexattr_t, а условной переменной — pthread_condattr_t, и инициализируются и уничтожаются с помощью следующих функций:

#include 

int pthread_mutexattr_init(pthread_mutexattr_t *attr);

int pthread_mutexattr_destroy(pthread_mutexattr_t *attr);

int pthread_condattr_init(pthread_condattr_t *attr);

int pthread_condattr_destroy(pthread_condattr_t *attr);

/* Все четыре функции возвращают 0 в случае успешного завершения, положительное значение Еххх – в случае ошибки */

После инициализации атрибутов взаимного исключения или условной переменной для включения или выключения отдельных атрибутов используются отдельные функции. Например, один из атрибутов позволяет использовать данное взаимное исключение или условную переменную нескольким процессам (а не потокам одного процесса). Этот атрибут мы будем использовать в последующих главах. Его значение можно узнать и изменить с помощью следующих функций:

#include 

int pthread_mutexattr_getpshared(const pthread_mutexattr_t *attr, int *valptr);

int pthread_mutexattr_setpshared(pthread_mutexattr_t *attr, int value);

int pthread_condattr_getpshared(const pthread_condattr_t *attr, int *valptr);

int pthread_condattr_setpshared(pthread_condattr_t *attr, int value);

/* Все четыре функции возвращают 0 в случае успешного завершения, положительное значение Еххх – в случае ошибки */

Две функции get возвращают текущее значение атрибута через целое, на которое указывает valptr, а две функции set устанавливают значение атрибута равным значению value. Значение value может быть либо PTHREAD_PROCESS_PRIVATE, либо PTHREAD_PROCESS_SHARED. Последнее также называется атрибутом совместного использования процессами. 

ПРИМЕЧАНИЕ

Эта возможность поддерживается только в том случае, если константа _POSIX_THREAD_PROCESS_SHARED определена в заголовочном файле . Она является дополнительной согласно Posix.1 и обязательной по Unix 98 (табл. 1.3). 

Нижеследующий фрагмент кода показывает, как нужно инициализировать взаимное исключение, чтобы его можно было совместно использовать нескольким процессам:

pthread_mutex_t *mptr; /* указатель на взаимное исключение, находящееся в разделяемой памяти */

pthread_mutexattr_t mattr; /* атрибуты взаимного исключения */

mptr = /* некоторое значение, указывающее на разделяемую память */

Pthread_mutexattr_init(&mattr);

#ifdef _POSIX_THREAD_PROCESS_SHARED

 Pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);

#else

 # error Эта реализация не поддерживает _POSIX_THREAD_PROCESS_SHARED

#endif

Pthread_mutex_init(mptr, &mattr);

Мы объявляем переменную mattr типа pthread_mutexattr_t, инициализируем ее значениями атрибутов по умолчанию, а затем устанавливаем атрибут PTHREAD_PROCESS_SHARED, позволяющий совместно использовать взаимное исключение нескольким процессам. Затем pthread_mutex_init инициализирует само исключение с соответствующими атрибутами. Количество разделяемой памяти, которое следует выделить под взаимное исключение, равно sizeof(pthread_mutex_t).

Практически такая же последовательность команд (с заменой mutex на cond) позволяет установить атрибут PTHREAD_PROCESS_SHARED для условной переменной, хранящейся в разделяемой несколькими процессами памяти.

Пример совместно используемых несколькими процессами взаимных исключений и условных переменных был приведен в листинге 5.18.

Завершение процесса, заблокировавшего ресурс

Когда взаимное исключение используется совместно несколькими процессами, всегда существует возможность, что процесс будет завершен (возможно, принудительно) во время работы с заблокированным им ресурсом. Не существует способа заставить систему автоматически снимать блокировку во время завершения процесса. Мы увидим, что это свойственно и блокировкам чтения-записи, и семафорам Posix. Единственный тип блокировок, автоматически снимаемых системой при завершении процесса, — блокировки записей fcntl (глава 9). При использовании семафоров System V можно специально указать ядру, следует ли автоматически снимать блокировки при завершении работы процесса (функция SEM_UNDO, о которой будет говориться в разделе 11.3).

Поток также может быть завершен в момент работы с заблокированным ресурсом, если его выполнение отменит (cancel) другой поток или он сам вызовет pthread_exit. Последнему варианту не следует уделять много внимания, поскольку поток должен сам знать, блокирует ли он взаимное исключение в данный момент или нет, и в зависимости от этого вызывать pthread_exit. На случай отмены другим потоком можно предусмотреть обработчик сигнала, вызываемый при отмене потока, что продемонстрировано в разделе 8.5. Если же для потока возникают фатальные условия, это обычно приводит к завершению работы всего процесса. Например, если поток делает некорректную операцию с указателем, что приводит к отправке сигнала SIGSEGV, это приводит к остановке всего процесса, если сигнал не перехватывается, и мы возвращаемся к предыдущей ситуации с гибелью процесса, заблокировавшего ресурс.

Даже если бы система автоматически разблокировала ресурсы после завершения процесса, это не всегда решало бы проблему. Блокировка защищала критическую область, в которой, возможно, изменялись какие-то данные. Если процесс был завершен посреди этой области, что стало с данными? Велика вероятность того, что возникнут несоответствия, если, например, новый элемент будет не полностью добавлен в связный список. Если бы ядро просто разблокировало взаимное исключение при завершении процесса, следующий процесс, обратившийся к списку, обнаружил бы, что тот поврежден.

В некоторых случаях автоматическое снятие блокировки (или счетчика — для семафора) при завершении процесса не вызывает проблем. Например, сервер может использовать семафор System V (с функцией SEM_UNDO) для подсчета количества одновременно обслуживаемых клиентов. Каждый раз при порождении процесса вызовом fork он увеличивает значение семафора на единицу, уменьшая его при завершении работы дочернего процесса. Если дочерний процесс завершит работу досрочно, ядро само уменьшит значение семафора. Пример, в котором автоматическое снятие блокировки ядром (а не уменьшение счетчика, как в вышеописанной ситуации) также не вызывает проблем, приведен в разделе 9.7. Демон блокирует один из файлов данных при записи в него и не снимает эту блокировку до завершения работы. Если кто-то попробует запустить копию демона, она завершит работу досрочно, когда обнаружит наличие блокировки на запись. Это гарантирует работу единственного экземпляра демона. Если же демон досрочно завершит работу, ядро само снимет блокировку, что позволит запустить копию демона.

7.8. Резюме

Взаимные исключения (mutual exclusion — mutex) используются для защиты критических областей кода, запрещая его одновременное выполнение несколькими потоками. В некоторых случаях потоку, заблокировавшему взаимное исключение, требуется дождаться выполнения какого-либо условия для выполнения последующих действий. В этом случае используется ожидание сигнала по условной переменной. Условная переменная всегда связывается с каким-либо взаимным исключением. Функция pthread_cond_wait, приостанавливающая работу процесса, разблокирует взаимное исключение перед остановкой работы и заново блокирует его при возобновлении работы процесса спустя некоторое время. Сигнал по условной переменной передается каким-либо другим потоком, и этот поток может разбудить либо только один произвольный поток из множества ожидающих (pthread_cond_signal), либо все их одновременно (pthread_cond_broadcast).

Взаимные исключения и условные переменные могут быть статическими. В этом случае они инициализируются также статически. Они могут быть и динамическими, что требует динамической инициализации. Динамическая инициализация дает возможность указать атрибуты, в частности атрибут совместного использования несколькими процессами, что действенно, если взаимное исключение или условная переменная находится в разделяемой этими процессами памяти.

Упражнения

1. Удалите взаимное исключение из листинга 7.2 и убедитесь, что программа работает неправильно, если одновременно запущено более одного производителя.

2. Что произойдет с листингом 7.1, если убрать вызов Pthread_join для потока-потребителя?

3. Напишите пpoгрaммy, вызывающую pthread_mutexatt_init и pthread_condattr_init в бесконечном цикле. Следите за используемой этим процессом памятью с помощью какой-нибудь программы, например ps. Что происходит? Теперь добавьте вызовы pthread_mutexattr_destroy и pthread_condattr_destroy и убедитесь, что утечки памяти нет.

4. В программе из листинга 7.6 производитель вызывает pthread_cond_signal только при изменении nready.nready с 0 на 1. Чтобы убедиться в эффективности этой оптимизации, вызывайте pthread_cond_signal каждый раз, когда nready.nready увеличивается на 1, и выведите его значение в главном потоке после завершения работы потребителя. 

ГЛАВА 8