UNIX: разработка сетевых приложений — страница 28 из 42

Программные потоки

26.1. Введение

Согласно традиционной модели Unix, когда процессу требуется, чтобы некое действие было выполнено каким-либо другим объектом, он порождает дочерний процесс, используя функцию

fork
, и этим порожденным процессом выполняется необходимое действие. Большинство сетевых серверов под Unix устроены именно таким образом, как мы видели при рассмотрении примера параллельного (concurrent) сервера: родительский процесс осуществляет соединение с помощью функции
accept
и порождает дочерний процесс, используя функцию
fork
, а затем дочерний процесс занимается обработкой клиентского запроса.

Хотя эта концепция с успехом использовалась на протяжении многих лет, с функцией

fork
связаны определенные неудобства.

■ Стоимость функции

fork
довольно высока, так как при ее использовании требуется скопировать все содержимое памяти из родительского процесса в дочерний, продублировать все дескрипторы и т.д. Текущие реализации используют технологию, называемую копированием при записи (copy-on-write), при которой копирование пространства данных из родительского процесса в дочерний происходит лишь тогда, когда дочернему процессу требуется своя собственная копия. Но несмотря на эту оптимизацию, стоимость функции
fork
остается высокой.

■ Для передачи данных между родительским и дочерним процессами после вызова функции

fork
требуется использовать средства взаимодействия процессов (IPC). Передача информации перед вызовом
fork
не вызывает затруднений, так как при запуске дочерний процесс получает от родительского копию пространства данных и копии всех родительских дескрипторов. Но возвращение информации из дочернего процесса в родительский требует большей работы.

Обе проблемы могут быть разрешены путем использования программных потоков (threads). Программные потоки иногда называются облегченными процессами (lightweight processes), так как поток проще, чем процесс. В частности, создание потока требует в 10–100 раз меньше времени, чем создание процесса.

Все потоки одного процесса совместно используют его глобальные переменные, поэтому им легко обмениваться информацией, но это приводит к необходимости синхронизации.

Однако общими становятся не только глобальные переменные. Все потоки одного процесса разделяют:

■ инструкции процесса;

■ большую часть данных;

■ открытые файлы (например, дескрипторы);

■ обработчики сигналов и вообще настройки для работы с сигналами (действие сигнала);

■ текущий рабочий каталог;

■ идентификаторы пользователя и группы пользователей.

У каждого потока имеются собственные:

■ идентификатор потока;

■ набор регистров, включая счетчик команд и указатель стека;

■ стек (для локальных переменных и адресов возврата);

■ переменная

errno
;

■ маска сигналов;

■ приоритет.

ПРИМЕЧАНИЕ

Как сказано в разделе 11.18, можно рассматривать обработчик сигнала как некую разновидность потока. В традиционной модели Unix у нас имеется основной поток выполнения и обработчик сигнала (другой поток). Если в основном потоке в момент возникновения сигнала происходит корректировка связного списка и обработчик сигнала также пытается изменить связный список, обычно начинается путаница. Основной поток и обработчик сигнала совместно используют одни и те же глобальные переменные, но у каждого из них имеется свой собственный стек.

В этой книге мы рассматриваем потоки POSIX, которые также называются Pthreads (POSIX threads). Они были стандартизованы в 1995 году как часть POSIX.1c и будут поддерживаться большинством версий Unix. Мы увидим, что все названия функций Pthreads начинаются с символов

pthread_
. Эта глава является введением в концепцию потоков, необходимым для того, чтобы в дальнейшем мы могли использовать потоки в наших сетевых приложениях. Более подробную информацию вы можете найти в [15].

26.2. Основные функции для работы с потоками: создание и завершение потоков

В этом разделе мы рассматриваем пять основных функций для работы с потоками, а в следующих двух разделах мы используем эти функции для написания потоковой модификации клиента и сервера TCP.

Функция pthread_create

Когда программа запускается с помощью функции

exec
, создается один поток, называемый начальным (initial) или главным (main). Дополнительные потоки создаются функцией
pthread_create
.

#include 


int pthread_create(pthread_t* tid, const pthread_attr_t *attr,

 void *(*func)(void*), void *arg);

Возвращает: 0 в случае успешного выполнения, положительное значение Exxx в случае ошибки

Каждый поток процесса обладает собственным идентификатором потока (thread ID), относящимся к типу данных

pthread_t
(как правило, это
unsigned int
). При успешном создании нового потока его идентификатор возвращается через указатель
tid
.

У каждого потока имеется несколько атрибутов: его приоритет, исходный размер стека, указание на то, должен ли этот поток являться демоном или нет, и т.д. При создании потока мы можем задать эти атрибуты, инициализируя переменную типа

pthread_attr_t
, что позволяет заменить значение, заданное по умолчанию. Обычно мы используем значение по умолчанию, в этом случае мы задаем аргумент
attr
равным пустому указателю.

Наконец, при создании потока мы должны указать, какую функцию будет выполнять этот поток. Выполнение потока начинается с вызова заданной функции, а завершается либо явно (вызовом

pthread_exit
), либо неявно (когда вызванная функция возвращает управление). Адрес функции задается аргументом
func
, и она вызывается с единственным аргументом-указателем
arg
. Если этой функции необходимо передать несколько аргументов, следует поместить их в некоторую структуру и передать адрес этой структуры как единственный аргумент функции.

Обратите внимание на объявления

func
и
arg
. Функции передается один аргумент — универсальный указатель
void*
. Это позволяет нам передавать потоку с помощью единственного указателя все, что требуется, и точно так же поток возвращает любые данные, используя этот указатель.

Возвращаемое значение функций Pthreads — это обычно 0 в случае успешного выполнения или ненулевая величина в случае ошибки. Но в отличие от функций сокетов и большинства системных вызовов, для которых в случае ошибки возвращается -1 и переменной

errno
присваивается некоторое положительное значение (код ошибки), функции Pthreads возвращают сам код ошибки. Например, если функция
pthread_create
не может создать новый поток, так как мы превысили допустимый системный предел количества потоков, функция возвратит значение
EAGAIN
. Функции Pthreads не присваивают переменной
errno
никаких значений. Соглашение о том, что 0 является индикатором успешного выполнения, а ненулевое значение — индикатором ошибки, не приводит к противоречию, так как все значения
Exxx
, определенные в заголовочном файле
, являются положительными. Ни одному из имен ошибок Exxx не сопоставлено нулевое значение.

Функция pthread_join

Мы можем приостановить выполнение текущего потока и ждать завершения выполнения какого-либо другого потока, используя функцию

pthread_join
. Сравнивая потоки и процессы Unix, можно сказать, что функция
pthread_create
аналогична функции
fork
, а функция
pthread_join
— функции
waitpid
.

#include 


int pthread_join(pthread_t tid, void **status);

Возвращает: 0 в случае успешного выполнения, положительное значение Exxx в случае ошибки

Следует указать идентификатор

tid
того потока, завершения которого мы ждем. К сожалению, нет способа указать, что мы ждем завершения любого потока данного процесса (тогда как при работе с процессами мы могли с помощью функции
waitpid
ждать завершения любого процесса, задав аргумент идентификатора процесса, равный -1). Мы вернемся к этой проблеме при обсуждении листинга 26.11.

Если указатель

status
непустой, то значение, возвращаемое потоком (указатель на некоторый объект), хранится в ячейке памяти, на которую указывает
status
.

Функция pthread_self

Каждый поток снабжен идентификатором, уникальным в пределах данного процесса. Идентификатор потока возвращается функцией

pthread_create
и, как мы видели, используется функцией
pthread_join
. Поток может узнать свой собственный идентификатор с помощью вызова
pthread_self
.

#include 


pthread_t pthread_self(void);

Возвращает: идентификатор вызывающего потока

Сравнивая потоки и процессы Unix, можно отметить, что функция

pthread_self
аналогична функции
getpid
.

Функция pthread_detach

Поток может быть либо присоединяемым (joinable), каким он является по умолчанию, либо отсоединенным (detached). Когда присоединяемый поток завершает свое выполнение, его статус завершения и идентификатор сохраняются, пока другой поток данного процесса не вызовет функцию

pthread_join
. В свою очередь, отсоединенный поток напоминает процесс-демон: когда он завершается, все занимаемые им ресурсы освобождаются и мы не можем отслеживать его завершение. Если один поток должен знать, когда завершится выполнение другого потока, нам следует оставить последний присоединяемым.

Функция

pthread_detach
изменяет состояние потока, превращая его из присоединяемого в отсоединенный.

#include 


int pthread_detach(pthread_t tid);

Возвращает: 0 в случае успешного выполнения, положительное значение Exxx в случае ошибки

Эта функция обычно вызывается потоком при необходимости изменить собственный статус в следующем формате:

pthread_detach(pthread_self());

Функция pthread_exit

Одним из способов завершения потока является вызов функции

pthread_exit
.

#include 


void pthread_exit(void *status);

Ничего не возвращает вызвавшему потоку

Если поток не является отсоединенным, идентификатор потока и статус завершения сохраняются до того момента, пока какой-либо другой поток данного процесса не вызовет функцию

pthread_join
.

Указатель

status
не должен указывать на объект, локальный по отношению к вызывающему потоку, так как этот объект будет уничтожен при завершении потока.

Существуют и другие способы завершения потока.

■ Функция, которая была вызвана потоком (третий аргумент функции

pthread_create
), может возвратить управление в вызывающий процесс. Поскольку, согласно своему объявлению, эта функция возвращает указатель
void
, возвращаемое ею значение играет роль статуса завершения данного потока.

■ Если функция

main
данного процесса возвращает управление или любой поток вызывает функцию
exit
, процесс завершается вместе со всеми своими потоками.

26.3. Использование потоков в функции str_cli

В качестве первого примера использования потоков мы перепишем нашу функцию

str_cli
. В листинге 16.6 была представлена версия этой функции, в которой использовалась функция
fork
. Напомним, что были также представлены и некоторые другие версии этой функции: изначально в листинге 5.4 функция блокировалась в ожидании ответа и была, как мы показали, далека от оптимальной в случае пакетного ввода; в листинге 6.2 применяется блокируемый ввод-вывод и функция
select
; версии, показанные в листинге 16.1 и далее, используют неблокируемый ввод-вывод.

На рис. 26.1 показана структура очередной версии функции str_cli, на этот раз использующей потоки, а в листинге 26.1[1] представлен код этой функции.

Рис. 26.1. Измененная функция str_cli, использующая потоки

Листинг 26.1. Функция str_cli, использующая потоки

//threads/strclithread.c

 1 #include "unpthread.h"


 2 void *copyto(void*);


 3 static int sockfd; /* глобальная переменная, доступная обоим потокам */

 4 static FILE *fp;


 5 void

 6 str_cli(FILE *fp_arg, int sockfd_arg)

 7 {

 8  char recvline[MAXLINE];

 9  pthread_t tid;


10  sockfd = sockfd_arg; /* копирование аргументов во внешние переменные */

11  fp = fp_arg;


12  Pthread_create(&tid, NULL, copyto, NULL);


13  while (Readline(sockfd, recvline. MAXLINE) > 0)

14   Fputs(recvline, stdout);

15 }


16 void*

17 copyto(void *arg)

18 {

19  char sendline[MAXLINE];


20  while (Fgets(sendline, MAXLINE, fp) != NULL)

21   Writen(sockfd, sendline, strlen(sendline));


22  Shutdown(sockfd, SHUT_WR); /* признак конца файла в стандартном

                                  потоке ввода, отправка сегмента FIN */

23  return (NULL);

24  /* завершение потока происходит, когда в стандартном потоке ввода

       встречается признак конца файла */

25 }

Заголовочный файл unpthread.h

1
Мы впервые встречаемся с заголовочным файлом
unpthread.h
. Он включает наш обычный заголовочный файл
unp.h
, затем — заголовочный файл POSIX
, и далее определяет прототипы наших потоковых функций-оберток для
pthread_XXX
(см. раздел 1.4), название каждой из которых начинается с
Pthread_
.

Сохранение аргументов во внешних переменных

10-11
 Для потока, который мы собираемся создать, требуются значения двух аргументов функции
str_cli
:
fp
 — указатель на структуру
FILE
для входного файла, и
sockfd
— сокет TCP, связанный с сервером. Для простоты мы храним эти два значения во внешних переменных. Альтернативой является запись этих двух значений в структуру, указатель на которую затем передается в качестве аргумента создаваемому потоку.

Создание нового потока

12
 Создается поток, и значение нового идентификатора потока сохраняется в
tid
. Функция, выполняемая новым потоком, — это
copyto
. Никакие аргументы потоку не передаются.

Главный цикл потока: копирование из сокета в стандартный поток вывода

13-14
 В основном цикле вызываются функции
readline
и
fputs
, которые осуществляют копирование из сокета в стандартный поток вывода.

Завершение

15
 Когда функция
str_cli
возвращает управление, функция main завершается при помощи вызова функции
exit
(см. раздел 5.4). При этом завершаются все потоки данного процесса. В обычном сценарии второй поток уже должен завершиться в результате считывания признака конца файла из стандартного потока ввода. Но в случае, когда сервер преждевременно завершил свою работу (см. раздел 5.12), при вызове функции
exit
завершается также и второй поток, чего мы и добиваемся.

Поток copyto

16-25
 Этот поток осуществляет копирование из стандартного потока ввода в сокет. Когда он считывает признак конца файла из стандартного потока ввода, на сокете вызывается функция
shutdown
и отсылается сегмент FIN, после чего поток возвращает управление. При выполнении оператора
return
(то есть когда функция, запустившая поток, возвращает управление) поток также завершается.

В конце раздела 16.2 мы привели результаты измерений времени выполнения для пяти различных реализаций функции

str_cli
. Мы отметили, что многопоточная версия выполняется всего 8,5 с — немногим быстрее, чем версия, использующая функцию
fork
(как мы и ожидали), но медленнее, чем версия с неблокируемым вводом-выводом. Тем не менее, сравнивая устройство версии с неблокируемым вводом-выводом (см. раздел 16.2) и версии с использованием потоков, мы заметили, что первая гораздо сложнее. Поэтому мы рекомендуем использовать именно версию с потоками, а не с неблокируемым вводом-выводом.

26.4. Использование потоков в эхо-сервере TCP

Теперь мы перепишем эхо-сервер TCP, приведенный в листинге 5.1, используя для каждого клиента по одному потоку вместо одного процесса. Кроме того, с помощью нашей функции

tcp_listen
мы сделаем эту версию не зависящей от протокола. В листинге 26.2 показан код сервера.

Листинг 26.2. Эхо-сервер TCP, использующий потоки

//threads/tcpserv01.с

 1 #include "unpthread.h"


 2 static void *doit(void*); /* каждый поток выполняет эту функцию */


 3 int

 4 main(int argc, char **argv)

 5 {

 6  int listenfd, connfd;

 7  pthread_t tid;

 8  socklen_t addrlen, len;

 9  struct sockaddr *cliaddr;


10  if (argc == 2)

11   listenfd = Tcp_listen(NULL, argv[1], &addrlen);

12  else if (argc == 3)

13   listenfd = Tcp_listen(argv[1], argv[2], &addrlen);

14  else

15   err_quit("usage: tcpserv01 [  ] ");


16  cliaddr = Malloc(addrlen);


17  for (;;) {

18   len = addrlen;

19   connfd = Accept(listenfd, cliaddr, &len);

20   Pthread_create(&tid, NULL, &doit, (void*)connfd);

21  }

22 }


23 static void*

24 doit(void *arg)

25 {

26  Pthread_detach(pthread_self());

27  str_echo((int)arg); /* та же функция, что и раньше */

28  Close((int)arg); /* мы закончили с присоединенным сокетом */

29  return (NULL);

30 }

Создание потока

17-21
 Когда функция
accept
возвращает управление, мы вызываем функцию
pthread_create
вместо функции
fork
. Мы передаем функции
doit
единственный аргумент — дескриптор присоединенного сокета
connfd
.

ПРИМЕЧАНИЕ

Мы преобразуем целочисленный дескриптор сокета к универсальному указателю (void). В ANSI С не гарантируется, что такое преобразование будет выполнено корректно, — мы можем быть уверены лишь в том, что оно сработает в тех системах, в которых размер целого числа не превышает размера указателя. К счастью, большинство реализаций Unix обладают этим свойством (см. табл. 1.5). Далее мы поговорим об этом подробнее.

Функция потока

23-30
doit
— это функция, выполняемая потоком. Поток отделяет себя с помощью функции
pthread_detach
, так как нет причины, по которой главному потоку имело бы смысл ждать завершения каждого созданного им потока. Функция
str_echo
не изменилась и осталась такой же, как в листинге 5.2. Когда эта функция завершается, следует вызвать функцию
close
для того, чтобы закрыть присоединенный сокет, поскольку этот поток использует все дескрипторы совместно с главным потоком. При использовании функции
fork
дочерний процесс не должен специально закрывать присоединенный сокет, так как при завершении дочернего процесса все открытые дескрипторы закрываются (см. упражнение 26.2).

Обратите также внимание на то, что главный поток не закрывает присоединенный сокет, что всегда происходило, когда параллельный сервер вызывал функцию

fork
. Это объясняется тем, что все потоки внутри процесса совместно используют все дескрипторы, поэтому если главному потоку потребуется вызвать функцию
close
, это приведет к закрытию соединения. Создание нового потока не влияет на счетчики ссылок для открытых дескрипторов, в отличие от того, что происходит при вызове функции
fork
.

В этой программе имеется одна неявная ошибка, о которой рассказывается в разделе 26.5. Можете ли вы ее обнаружить? (См. упражнение 26.5.)

Передача аргументов новым потокам

Мы уже упомянули, что в листинге 26.2 мы преобразуем целочисленную переменную

connfd
к указателю на неопределенный тип (
void
), но этот способ не работает в некоторых системах. Для корректной обработки данной ситуации требуются дополнительные усилия.

В первую очередь, заметим, что мы не можем просто передать адрес

connfd
нового потока, то есть следующий код не будет работать:

int main(int argc, char **argv) {

 int listenfd, connfd;

 ...


 for (;;) {

  len = addrlen;

  connfd = Accept(listenfd, cliaddr, &len);


  Pthread_create(&tid, NULL, &doit, &connfd);

 }

}


static void* doit(void *arg) {

 int connfd;


 connfd = *((int*)arg);

 Pthread_detach(pthread_self());

 str_echo(connfd); /* та же функция, что и прежде */

 Close(connfd);    /* мы закончили с присоединенным сокетом */

 return(NULL);

}

С точки зрения ANSI С здесь все в порядке: мы гарантированно можем преобразовать целочисленный указатель к типу

void*
и затем обратно преобразовать получившийся указатель на неопределенный тип к целочисленному указателю. Проблема заключается в другом — на что именно он будет указывать?

В главном потоке имеется одна целочисленная переменная

connfd
, и при каждом вызове функции
accept
значение этой переменной меняется на новое (в соответствии с новым присоединенным сокетом). Может сложиться следующая ситуация:

■ Функция

accept
возвращает управление, записывается новое значение переменной
connfd
(допустим, новый дескриптор равен 5) и в главном потоке вызывается функция
pthread_create
. Указатель на
connfd
(а не фактическое его значение!) является последним аргументом функции
pthread_create
.

■ Создается новый поток, и начинает выполняться функция

doit
.

■ Готово другое соединение, и главный поток снова начинает выполняться (прежде, чем начнется выполнение вновь созданного потока). Завершается функция

accept
, записывается новое значение переменной
connfd
(например, значение нового дескриптора равно 6) и главный поток вновь вызывает функцию
pthread_create
.

Хотя созданы два новых потока, оба они будут работать с одним и тем же последним значением переменной

connfd
, которое, согласно нашему предположению, равно 6. Проблема заключается в том, что несколько потоков получают доступ к совместно используемой переменной (целочисленному значению, хранящемуся в
connfd
) при отсутствии синхронизации. В листинге 26.2 мы решаем эту проблему, передавая значение переменной
connfd
функции
pthread_create
, вместо того чтобы передавать указатель на это значение. Этот метод работает благодаря тому способу, которым целочисленные значения в С передаются вызываемой функции (копия значения помещается в стек вызванной функции).

В листинге 26.3 показано более удачное решение описанной проблемы.

Листинг 26.3. Эхо-сервер TCP, использующий потоки с более переносимой передачей аргументов

//threads/tcpserv02.c

 1 #include "unpthread.h"


 2 static void *doit(void*); /* каждый поток выполняет эту функцию */


 3 int

 4 main(int argc, char **argv)

 5 {

 6  int listenfd, *iptr;

 7  thread_t tid;

 8  socklen_t addrlen, len;

 9  struct sockaddr *cliaddr;


10  if (argc == 2)

11   listenfd = Tcp_listen(NULL, argv[1], &addrlen);

12  else if (argc == 3)

13   listenfd = Tcp_listen(argv[1], argv[2], &addrlen);

14  else

15   err_quit("usage: tcpserv01 [  ] ");


16  cliaddr = Malloc(addrlen);


17  for (;;) {

18   len = addrlen;

19   iptr = Malloc(sizeof(int));

20   *iptr = Accept(listenfd, cliaddr, &len);

21   Pthread_create(&tid, NULL, &doit, iptr);

22  }

23 }


24 static void*

25 doit(void *arg)

26 {

27  int connfd;


28  connfd = *((int*)arg);

29  free(arg);


30  Pthread_detach(pthread_self());

31  str_echo(connfd); /* та же функция, что и раньше */

32  Close(connfd); /* мы закончили с присоединенным сокетом */

33  return (NULL);

34 }

17-22
 Каждый раз перед вызовом функции
accept
мы вызываем функцию
malloc
и выделяем в памяти пространство для целочисленной переменной (дескриптора присоединенного сокета). Таким образом каждый поток получает свою собственную копию этого дескриптора.

28-29
 Поток получает значение дескриптора присоединенного сокета, а затем освобождает занимаемую им память с помощью функции
free
.

Исторически функции

malloc
и
free
не допускали повторного вхождения. Это означает, что при вызове той или иной функции из обработчика сигнала в то время, когда главный поток выполняет одну из них, возникает большая путаница, так как эти функции оперируют статическими структурами данных. Как же мы можем вызывать эти две функции в листинге 26.3? Дело в том, что в POSIX требуется, чтобы эти две функции, так же как и многие другие, были безопасными в многопоточной среде (thread-safe). Обычно это достигается с помощью некоторой разновидности синхронизации, осуществляемой внутри библиотечных функций и являющейся для нас прозрачной (то есть незаметной).

Функции, безопасные в многопоточной среде

Стандарт POSIX.1 требует, чтобы все определенные в нем функции, а также функции, определенные в стандарте ANSI С, были безопасными в многопоточной среде. Исключения из этого правила приведены в табл. 26.1.

К сожалению, в POSIX.1 ничего не сказано о безопасности в многопоточной среде по отношению к функциям сетевого API. Последние пять строк в этой таблице появились благодаря Unix 98. В разделе 11.18 мы говорили о том, что функции

gethostbyname
и
gethostbyaddr
не допускают повторного вхождения. Как уже отмечалось, некоторые производители определяют версии этих функций, обладающие свойством безопасности в многопоточной среде (их названия заканчиваются на
_r
), но поскольку они не стандартизованы, лучше от них отказаться. Все функции
getXXX
, не допускающие повторного вхождения, были приведены в табл. 11.5.


Таблица 26.1. Функции, безопасные в многопоточной среде

Могут не быть безопасными в многопоточной средеДолжны быть безопасными в многопоточной средеКомментарии
Asctimeasctime_rБезопасна в многопоточной среде только в случае непустого аргумента
ctermid
Ctimectime_r
getc_unlocked
getchar_unlocked
Getgridgetgrid_r
Getgrnamgetgrnam_r
Getlogingetlogin_r
Getpwnamgetpwnam_r
Getpwuidgetpwuid_r
Gmtimegmtime_r
Localtimelocaltime_r
putc_unlocked
putchar_unlocked
Randrand_r
Readdirreaddir_r
Strtockstrtock_r
tmpnamБезопасна в многопоточной среде только в случае непустого аргумента
Ttynamettyname_r
GethostXXX
GetnetXXX
GetprotoXXX
GetservXXX
inet_ntoa

Приведенная таблица позволяет заключить, что общим способом сделать функцию допускающей повторное вхождение является определение новой функции с названием, оканчивающимся на

_r
. Обе функции будут безопасными в многопоточной среде, только если вызывающий процесс выделяет в памяти место для результата и передает соответствующий указатель как аргумент функции.

26.5. Собственные данные потоков

При преобразовании существующих функций для использования в многопоточной среде часто возникают проблемы, связанные со статическими переменными. Функция, сохраняющая состояние в собственном буфере или возвращающая результат в виде указателя на статический буфер, не является безопасной в многопоточной среде, поскольку несколько потоков не могут использовать один и тот же буфер для хранения разных данных. Такая проблема имеет несколько решений.

1. Использование собственных данных потоков (thread-specific data). Это нетривиальная задача, и функция при этом преобразуется к такому виду, что может использоваться только в системах, поддерживающих потоки. Преимущество этого подхода заключается в том, что не меняется вызывающая последовательность, и все изменения связаны с библиотечной функцией, а не с приложениями, которые вызывают эту функцию. Позже в этом разделе мы покажем безопасную в многопоточной среде версию функции

readline
, созданную с применением собственных данных потоков.

2. Изменение вызывающей последовательности таким образом, чтобы вызывающий процесс упаковывал все аргументы в некую структуру, а также записывал в нее статические переменные из листинга 3.12. Это также было сделано, и в листинге 26.4 показана новая структура и новые прототипы функций.

Листинг 26.4. Структура данных и прототип функции для версии функции readline, допускающей повторное вхождение

typedef struct {

 int    read_fd;    /* дескриптор, указывающий, откуда считываются данные */

 char   *read_ptr;   /* буфер, куда передаются данные */

 size_t read_maxlen; /* максимальное количество байтов, которое может быть считано */

 /* следующие три элемента для внутреннего использования функцией */

 int    rl_cnt;      /* инициализируется нулем */

 char   *rl_bufptr;  /* инициализируется значением rl_buf */

 char   rl_buf[MAXLINE];

} Rline;


void readline_rinit(int, void*, size_t, Rline*);

ssize_t readline_r(Rline*);

ssize_t Readline_r(Rline*);

Эти новые функции могут использоваться как в системах с поддержкой потоков, так и в тех, где потоки не поддерживаются, но все приложения, вызывающие функцию

readline
, должны быть изменены.

3. Реструктуризация интерфейса для исключения статических переменных и обеспечения безопасности функции в многопоточной среде. Для

readline
это будет означать отказ от увеличения быстродействия, достигнутого в листинге 3.12, и возвращение к более старой версии, представленной в листинге 3.11. Поскольку мы назвали старую версию «ужасно медленной», это решение не всегда пригодно на практике.

Использование собственных данных потоков — это распространенный способ сделать существующую функцию безопасной в многопоточной среде. Прежде чем описывать функции Pthread, работающие с такими данными, мы опишем саму концепцию и возможный способ реализации, так как эти функции кажутся более сложными, чем являются на самом деле.

Частично осложнения возникают по той причине, что во всех книгах, где идет речь о потоках, описание собственных данных потоков дается по образцу стандарта Pthreads. Пары ключ-значение и ключи рассматриваются в них как непрозрачные объекты. Мы описываем собственные данные потоков в терминах индексов и указателей, так как обычно в реализациях в качестве ключей используются небольшие положительные целые числа (индексы), а значение, ассоциированное с ключом, — это просто указатель на область памяти, выделяемую потоку с помощью функции

malloc
.

В каждой системе поддерживается ограниченное количество объектов собственных данных потоков. В POSIX требуется, чтобы этот предел не превышал 128 (на каждый процесс), и в следующем примере мы используем именно это значение. Система (вероятно, библиотека потоков) поддерживает один массив структур (которые мы называем структурами

Key
) для каждого процесса, как показано на рис. 26.2.

Рис. 26.2. Возможная реализация собственных данных потока

Флаг в структуре

Key
указывает, используется ли в настоящий момент данный элемент массива. Все флаги инициализируются как указывающие на то, что элемент не используется. Когда поток вызывает функцию
pthread_key_create
для создания нового элемента собственных данных потока, система отыскивает в массиве структур
Key
первую структуру, не используемую в настоящий момент. Индекс этой структуры, который может иметь значение от 0 до 127, называется ключом и возвращается вызывающему потоку как результат выполнения функции. О втором элементе структуры
Key
, так называемом указателе-деструкторе, мы поговорим чуть позже.

В дополнение к массиву структур

Key
, общему для всего процесса, система хранит набор сведений о каждом потоке процесса в структуре
Pthread
. Частью этой структуры является массив указателей, состоящий из 128 элементов, который мы называем
pkey
. Это показано на рис. 26.3.

Рис. 26.3. Информация, хранящаяся в системе для каждого потока

Все элементы массива

pkey
инициализируются пустыми указателями. Эти 128 указателей являются «значениями», ассоциированными с каждым из 128 «ключей» процесса.

Когда мы с помощью функции

pthread_key_create
создаем ключ, система сообщает нам фактическое значение ключа (индекс). Затем каждый поток может сохранить значение (указатель), связанное с этим ключом, и, как правило, каждый поток получает этот указатель в виде возвращаемого значения функции
malloc
. Частично путаница с собственными данными потока обусловлена тем, что указатель в паре ключ-значение играет роль значения, но сами собственные данные потока — это то, на что указывает данный указатель.

Теперь мы перейдем к примеру применения собственных данных потока, предполагая, что наша функция

readline
использует их для хранения информации о состоянии каждого потока при последовательных обращениях к ней. Вскоре мы покажем код, выполняющий эту задачу, в котором функция
readline
модифицирована так, чтобы реализовать представленную далее последовательность шагов.

1. Запускается процесс, и создается несколько потоков.

2. Один из потоков вызовет функцию

readline
первой, а та, в свою очередь, вызовет функцию
phtread_key_create
. Система отыщет первую неиспользуемую структуру
Key
(см. рис. 26.2) и возвратит вызывающему процессу ее индекс. В данном примере мы предполагаем, что индекс равен 1.

Мы будем использовать функцию

pthread_once
, чтобы гарантировать, что функция
pthread_key_create
вызывается только первым потоком, вызвавшим функцию
readline
.

3. Функция

readline
вызывает функцию
pthread_getspecific
, чтобы получить значение
pkey[1]
(«указатель» на рис. 26.3 для ключа, имеющего значение 1) для данного потока, но эта функция возвращает пустой указатель. Тогда функция
readline
вызывает функцию
malloc
для выделения памяти, которая необходима для хранения информации о каждом потоке при последовательных вызовах функции
readline
. Функция
readline
инициализирует эти области памяти по мере надобности и вызывает функцию
pthread_setspecific
, чтобы установить указатель собственных данных потока (
pkey[1]
), соответствующий данному ключу, на только что выделенную область памяти. Мы показываем этот процесс на рис. 26.4, предполагая, что вызывающий поток — это поток с номером 0 в данном процессе.

Рис. 26.4. Соответствие между областью памяти, выделенной функцией malloc, и указателем собственных данных потока

На этом рисунке мы отмечаем, что структура Pthread поддерживается системой (вероятно, библиотекой потоков), но фактически собственные данные потока, которые мы размещаем в памяти с помощью функции

malloc
, поддерживаются нашей функцией (в данном случае
readline
). Все, что делает функция
pthread_setspecific
, — это установка указателя для данного ключа в структуре Pthread на выделенную область памяти. Аналогично, действие функции
pthread_getspecific
сводится к возвращению этого указателя.

4. Другой поток, например поток с номером

n
, вызывает функцию
readline
, возможно, в тот момент, когда поток с номером 0 все еще находится в стадии выполнения функции
readline
.

Функция

readline
вызывает функцию
pthread_once
, чтобы инициализировать ключ этого элемента собственных данных, но так как эта функция уже была однажды вызвана, то больше она не выполняется.

5. Функция

readline
вызывает функцию
pthread_getspecific
для получения значения указателя
pkey[1]
для данного потока, но возвращается пустой указатель. Тогда поток вызывает функцию
malloc
и функцию
pthread_setspecific
, как и в случае с потоком номер 0, инициализируя элемент собственных данных потока, соответствующий этому ключу (1). Этот процесс иллюстрирует рис. 26.5.

Рис. 26.5. Структуры данных после того, как поток n инициализировал свои собственные данные

6. Поток номер n продолжает выполнять функцию

readline
, используя и модифицируя свои собственные данные.

Один вопрос, который мы пока не рассмотрели, заключается в следующем: что происходит, когда поток завершает свое выполнение? Если поток вызвал функцию

readline
, эта функция выделила в памяти область, которая должна быть освобождена по завершении выполнения потока. Для этого используется указатель-деструктор, показанный на рис. 26.2. Когда поток, создающий элемент собственных данных, вызывает функцию
pthread_key_create
, одним из аргументов этой функции является указатель на функцию-деструктор. Когда выполнение потока завершается, система перебирает массив
pkey
для данного потока, вызывая соответствующую функцию-деструктор для каждого непустого указателя
pkey
. Под «соответствующим деструктором» мы понимаем указатель на функцию, хранящийся в массиве
Key
с рис. 26.2. Таким образом осуществляется освобождение памяти, занимаемой собственными данными потока, когда выполнение потока завершается.

Первые две функции, которые обычно вызываются при работе с собственными данными потока, — это

pthread_once
и
pthread_key_create
.

#include 


int pthread_once(pthread_once_t *onceptr, void (*init)(void));

int pthread_key_create(pthread_key_t *keyptr, void (*destructor)(void *value));

Обе функции возвращают: 0 в случае успешного выполнения, положительное значение Exxx в случае ошибки

Функция

pthread_once
обычно вызывается при вызове функции, манипулирующей собственными данными потока, но
pthread_once
использует значение переменной, на которую указывает
onceptr
, чтобы гарантировать, что функция
init
вызывается для каждого процесса только один раз.

Функция

pthread_key_create
должна вызываться только один раз для данного ключа в пределах одного процесса. Значение ключа возвращается с помощью указателя
keyptr
, а функция-деструктор (если аргумент является непустым указателем) будет вызываться каждым потоком по завершении его выполнения, если этот поток записывал какое-либо значение, соответствующее этому ключу.

Обычно эти две функции используются следующим образом (если игнорировать возвращение ошибок):

pthread_key_t rl_key;

pthread_once_t rl_once = PTHREAD_ONCE_INIT;


void readline_destructor(void *ptr) {

 free(ptr);

}


void readline_once(void) {

 pthread_key_create(&rl_key, readline_destructor);

}


ssize_t readline(...) {

 ...


 pthread_once(&rl_once, readline_once);


 if ((ptr = pthread_getspecific(rl_key)) == NULL) {

  ptr = Malloc(...);

  pthread_setspecifiс(rl_key, ptr);

  /* инициализация области памяти, на которую указывает ptr */

 }

 ...

 /* используются значения, на которые указывает ptr */

}

Каждый раз, когда вызывается функция

readline
, она вызывает функцию
pthread_once
. Эта функция использует значение, на которое указывает ее аргумент-указатель
onceptr
(содержащийся в переменной
rl_once
), чтобы удостовериться, что функция
init
вызывается только один раз. Функция инициализации
readline_once
создает ключ для собственных данных потока, который хранится в
rl_key
и который функция
readline
затем использует в вызовах функций
pthread_getspecific
и
pthread_setspecific
.

Функции

pthread_getspecific
и
pthread_setspecific
используются для того, чтобы получать и задавать значение, ассоциированное с данным ключом. Это значение представляет собой тот указатель, который показан на рис. 26.3. На что указывает этот указатель — зависит от приложения, но обычно он указывает на динамически выделяемый участок памяти.

#include 


void *pthread_getspecific(pthread_key_t key);

Возвращает: указатель на собственные данные потока (возможно, пустой указатель)


int pthread_setspecific(pthread_key_t key, const void *value);

Возвращает: 0 в случае успешного выполнения, положительное значение Exxx в случае ошибки

Обратите внимание на то, что аргументом функции

pthread_key_create
является указатель на ключ (поскольку эта функция хранит значение, присвоенное ключу), в то время как аргументами функций
get
и
set
являются сами ключи (которые, скорее всего, представляют собой небольшие целые числа, как уже говорилось).

Пример: функция readline, использующая собственные данные потока

В этом разделе мы приводим полный пример использования собственных данных потока, преобразуя оптимизированную версию функции

readline
из листинга 3.12 к виду, безопасному в многопоточной среде, не изменяя последовательность вызовов.

В листинге 26.5 показана первая часть функции: переменные

pthread_key_t
и
pthread_once_t
, функции
readline_destructor
и
readline_once
и наша структура
Rline
, которая содержит всю информацию, нужную нам для каждого потока.

Листинг 26.5. Первая часть функции readline, безопасной в многопоточной среде

//threads/readline.c

 1 #include "unpthread.h"


 2 static pthread_key_t rl_key;

 3 static pthread_once_t rl_once = PTHREAD_ONCE_INIT;


 4 static void

 5 readline_destructor(void *ptr)

 6 {

 7  free(ptr);

 8 }


9 static void

10 readline_once(void)

11 {

12  Pthread_key_create(&rl_key, readline_destructor);

13 }


14 typedef struct {

15  int rl_cnt;      /* инициализируется нулем */

16  char *rl_bufptr; /* инициализируется значением rl_buf */

17  char rl_buf[MAXLINE];

18 } Rline;

Деструктор

4-8
 Наша функция-деструктор просто освобождает всю память, которая была выделена для данного потока.

«Одноразовая» функция

9-13
 Мы увидим, что наша «одноразовая» (то есть вызываемая только один раз) функция вызывается однократно из функции
pthread_once
и создает ключ, который затем используется в функции
readline
.

Структура Rline

14-18
 Наша структура
Rline
содержит три переменные, которые, будучи объявленными как статические (
static
) в листинге 3.12, привели к возникновению описанных далее проблем. Такая структура динамически выделяется в памяти для каждого потока, а по завершении выполнения этого потока она освобождается функцией-деструктором.

В листинге 26.6 показана сама функция

readline
, а также функция
my_read
, которую она вызывает. Этот листинг является модификацией листинга 3.12.

Листинг 26.6. Вторая часть функции readline, безопасной в многопоточной среде

//threads/readline.c

19 static ssize_t

20 my_read(Rline *tsd, int fd, char *ptr)

21 {

22  if (tsd->rl_cnt <= 0) {

23 again:

24   if ((tsd->rl_cnt = read(fd, tsd->rl_buf, MAXLINE)) < 0) {

25    if (errno == EINTR)

26     goto again;

27    return (-1);

28   } else if (tsd->rl_cnt == 0)

29    return (0);

30   tsd->rl_bufptr = tsd->rl_buf;

31  }

32  tsd->rl_cnt--;

33  *ptr = *tsd->rl_bufptr++;

34  return (1);

35 }


36 ssize_t

37 readline(int fd, void *vptr, size_t maxlen)

38 {

39  int n, rc;

40  char c, *ptr;

41  Rline *tsd;


42  Pthread_once(&rl_once, readline_once);

43  if ((tsd = pthread_getspecific(rl_key)) == NULL) {

44   tsd = Calloc(1, sizeof(Rline)); /* инициализируется нулем */

45   Pthread_setspecifiс(rl_key, tsd);

46  }


47  ptr = vptr;

48  for (n = 1; n < maxlen; n++) {

49   if ((rc = my_read(tsd, fd, &c)) == 1) {

50    *ptr++ = c;

51    if (c == '\n')

52     break;

53   } else if (rc == 0) {

54    *ptr = 0;

55    return (n-1); /* EOF, данные не были считаны */

56   } else

57    return (-1); /* ошибка, errno устанавливается функцией read() */

58   }


59  *ptr = 0;

60  return (n);

61 }

Функция my_read

19-35
 Первым аргументом функции теперь является указатель на структуру
Rline
, которая была размещена в памяти для данного потока (и содержит собственные данные этого потока).

Размещение собственных данных потока в памяти

42
 Сначала мы вызываем функцию
pthread_once
, так чтобы первый поток, вызывающий функцию
readline
в этом процессе, вызвал бы функцию
readline_once
для создания ключа собственных данных потока.

Получение указателя на собственные данные потока

43-46
 Функция
pthread_getspecific
возвращает указатель на структуру
Rline
для данного потока. Но если это первый вызов функции
readline
данным потоком, то возвращаемым значением будет пустой указатель. В таком случае мы выделяем в памяти место для структуры
Rline
, а элемент
rl_cnt
этой структуры инициализируется нулем с помощью функции
calloc
. Затем мы записываем этот указатель для данного потока, вызывая функцию
pthread_setspecific
. Когда этот поток вызовет функцию
readline
в следующий раз, функция
pthread_getspecific
возвратит этот указатель, который был только что записан.

26.6. Веб-клиент и одновременное соединение (продолжение)

Вернемся к нашему примеру с веб-клиентом из раздела 16.5 и перепишем его с использованием потоков вместо неблокируемой функции

connect
. Мы можем оставить сокеты в их заданном по умолчанию виде — блокируемыми, и создать один поток на каждое соединение. Каждый поток может блокироваться в вызове функции
connect
, так как ядро будет просто выполнять какой-либо другой поток, готовый к работе.

В листинге 26.7 показана первая часть нашей программы, глобальные переменные и начало функции

main
.

Листинг 26.7. Глобальные переменные и начало функции main

//threads/web01.c

 1 #include "unpthread.h"

 2 #include  /* потоки Solaris */


 3 #define MAXFILES 20

 4 #define SERV    "80" /* номер порта или имя службы */


 5 struct file {

 6  char      *f_name; /* имя файла */

 7  char      *f_host; /* имя узла или IP-адрес */

 8  int       f_fd;    /* дескриптор */

 9  int       f_flags; /* F_xxx ниже */

10  pthread_t f_tid;   /* идентификатор потока */

11 } file[MAXFILES];

12 #define F_CONNECTING 1 /* функция connect () в процессе

                             выполнения */

13 #define F_READING 2    /* функция connect() завершена;

                             выполняется считывание */

14 #define F_DONE 4       /* все сделано */


15 #define GET_CMD "GET %s HTTP/1.0\r\n\r\n"


16 int nconn, nfiles, nlefttoconn, nlefttoread;


17 void *do_get_read(void*);

18 void home_page(const char*, const char*);

19 void write_get_cmd(struct file*);


20 int

21 main(int argc, char **argv)

22 {

23  int i, n, maxnconn;

24  pthread_t tid;

25  struct file *fptr;


26  if (argc < 5)

27   err_quit("usage: web <#conns> file1 ...");

28  maxnconn = atoi(argv[1]);


29  nfiles = min(argc - 4, MAXFILES);

30  for (i = 0; i < nfiles; i++) {

31   file[i].f_name = argv[i + 4];

32   file[i].f_host = argv[2];

33   file[i].f_flags = 0;

34  }

35  printf("nfiles = %d\n", nfiles);


36  home_page(argv[2], argv[3]);


37  nlefttoread = nlefttoconn = nfiles;

38  nconn = 0;

Глобальные переменные

1-16
 Мы подключаем заголовочный файл
вдобавок к обычному
, так как нам требуется использовать потоки Solaris в дополнение к потокам Pthreads, как мы вскоре покажем.

10
 Мы добавили к структуре
file
один элемент — идентификатор потока f
_tid
. Остальная часть этого кода аналогична коду в листинге 16.9. В этой версии нам не нужно использовать функцию
select
, а следовательно, не нужны наборы дескрипторов и переменная
maxfd
.

36
 Функция
home_page
не изменилась относительно листинга 16.10. В листинге 26.8 показан основной рабочий цикл потока main.

Листинг 26.8. Основной рабочий цикл потока main

//threads/web01.c

39  while (nlefttoread > 0) {

40   while (nconn < maxnconn && nlefttoconn > 0) {

41    /* находим файл для считывания */

42    for (i = 0; i < nfiles; i++)

43     if (file[i].f_flags == 0)

44      break;

45    if (i == nfiles)

46     err_quit("nlefttoconn = %d but nothing found", nlefttoconn);


47    file[i].f_flags = F_CONNECTING;

48    Pthread_create(&tid, NULL, &do_get_read, &file[i]);

49    file[i].f_tid = tid;

50    nconn++;

51    nlefttoconn--;

52   }


53   if ((n = thr_join(0, &tid, (void**)&fptr)) != 0)

54    errno = n, err_sys("thr_join error");


55   nconn--;

56   nlefttoread--;

57   printf("thread id %d for %s done\n", tid, fptr->f_name);

58  }


59  exit(0);

60 }

По возможности создаем другой поток

40-52
 Если имеется возможность создать другой поток (
nconn
меньше, чем
maxconn
), мы так и делаем. Функция, которую выполняет каждый новый поток, — это
do_get_read
, а ее аргументом является указатель на структуру
file
.

Ждем, когда завершится выполнение какого-либо потока

53-54
 Мы вызываем функцию потоков
thr_join
Solaris с нулевым первым аргументом, чтобы дождаться завершения выполнения какого-либо из наших потоков. К сожалению, в Pthreads не предусмотрен способ, с помощью которого мы могли бы ждать завершения выполнения любого потока, и функция
pthread_join
требует, чтобы мы точно указали, завершения какого потока мы ждем. В разделе 26.9 мы увидим, что решение этой проблемы в случае применения технологии Pthreads оказывается сложнее и требует использования условной переменной для сообщения главному потоку о завершении выполнения дополнительного потока.

ПРИМЕЧАНИЕ

Показанное здесь решение, в котором используется функция потоков thr_join Solaris, не является, вообще говоря, совместимым со всеми системами. Тем не менее мы приводим здесь эту версию веб-клиента, использующую потоки, чтобы не осложнять обсуждение рассмотрением условных переменных и взаимных исключений (mutex). К счастью, в Solaris допустимо смешивать потоки Pthreads и потоки Solaris.

В листинге 26.9 показана функция

do_get_read
, которая выполняется каждым потоком. Эта функция устанавливает соединение TCP, посылает серверу команду HTTP
GET
и считывает ответ сервера.

Листинг 26.9. Функция do_get_read

//threads/web01.c

61 void*

62 do_get_read(void *vptr)

63 {

64  int fd, n;

65  char line[MAXLINE];

66  struct file *fptr;


67  fptr = (struct file*)vptr;


68  fd = Tcp_connect(fptr->f_host, SERV);

69  fptr->f_fd = fd;

70  printf("do_get_read for %s, fd %d, thread %d\n",

71   fptr->f_name, fd, fptr->f_tid);

72  write_get_cmd(fptr);


73  /* Чтение ответа сервера */

74  for (;;) {

75   if ((n = Read(fd, line, MAXLINE)) == 0)

76    break; /* сервер закрывает соединение */

77   printf ("read %d bytes from %s\n", n, fptr->f_name);

78  }

79  printf("end-of-file on %s\n\", fptr->f_name);

80  Close(fd);

81  fptr->f_flags = F_DONE; /* сбрасываем F_READING */


82  return (fptr); /* завершение потока */

83 }

Создание сокета TCP, установление соединения

68-71
 Создается сокет TCP, и с помощью функции
tcp_connect
устанавливается соединение. В данном случае используется обычный блокируемый сокет, поэтому поток будет блокирован при вызове функции
connect
, пока не будет установлено соединение.

Отправка запроса серверу

72
 Функция
write_get_cmd
формирует команду HTTP
GET
и отсылает ее серверу. Мы не показываем эту функцию заново, так как единственным отличием от листинга 16.12 является то, что в версии, использующей потоки, не вызывается макрос
FD_SET
и не используется
maxfd
.

Чтение ответа сервера

73-82
 Затем считывается ответ сервера. Когда соединение закрывается сервером, устанавливается флаг
F_DONE
и функция возвращает управление, завершая выполнение потока.

Мы также не показываем функцию

home_page
, так как она полностью повторяет версию, приведенную в листинге 16.10.

Мы вернемся к этому примеру, заменив функцию Solaris

thr_join
на более переносимую функцию семейства Pthreads, но сначала нам необходимо обсудить взаимные исключения и условные переменные.

26.7. Взаимные исключения

Обратите внимание на то, что в листинге 26.8 при завершении выполнения очередного потока в главном цикле уменьшаются на единицу и

nconn
, и
nlefttoread
. Мы могли бы поместить оба эти оператора уменьшения в одну функцию
do_get_read
, что позволило бы каждому потоку уменьшать эти счетчики непосредственно перед тем, как выполнение потока завершается. Но это привело бы к возникновению трудноуловимой серьезной ошибки параллельного программирования.

Проблема, возникающая при помещении определенного кода в функцию, которая выполняется каждым потоком, заключается в том, что обе эти переменные являются глобальными, а не собственными переменными потока. Если один поток в данный момент уменьшает значение переменной и это действие приостанавливается, чтобы выполнился другой поток, который также станет уменьшать на единицу эту переменную, может произойти ошибка. Предположим, например, что компилятор С осуществляет уменьшение переменной на единицу в три этапа: загружает информацию из памяти в регистр, уменьшает значение регистра, а затем сохраняет значение регистра в памяти. Рассмотрим возможный сценарий.

1. Выполняется поток А, который загружает в регистр значение переменной

nconn
(равное 3).

2. Система переключается с выполнения потока А на выполнение потока В. Регистры потока А сохранены, регистры потока В восстановлены.

3. Поток В выполняет три действия, составляющие оператор декремента в языке С (

nconn--
), сохраняя новое значение переменной
nconn
, равное 2.

4. Впоследствии в некоторый момент времени система переключается на выполнение потока А. Восстанавливаются регистры потока А, и он продолжает выполняться с того места, на котором остановился, а именно начиная со второго этапа из трех, составляющих оператор декремента. Значение регистра уменьшается с 3 до 2, и значение 2 записывается в переменную

nconn
.

Окончательный результат таков: значение

nconn
равно 2, в то время как оно должно быть равным 1. Это ошибка.

Подобные ошибки параллельного программирования трудно обнаружить по многим причинам. Во-первых, они возникают нечасто. Тем не менее это ошибки, которые по закону Мэрфи вызывают сбои в работе программ. Во-вторых, ошибки такого типа возникают не систематически, так как зависят от недетерминированного совпадения нескольких событий. Наконец, в некоторых системах аппаратные команды могут быть атомарными. Это значит, что имеется аппаратная команда уменьшения значения целого числа на единицу (вместо трехступенчатой последовательности, которую мы предположили выше), а аппаратная команда не может быть прервана до окончания своего выполнения. Но это не гарантировано для всех систем, так что код может работать в одной системе и не работать в другой.

Программирование с использованием потоков является параллельным (parallel), или одновременным (concurrent), программированием, так как несколько потоков могут выполняться параллельно (одновременно), получая доступ к одним и тем же переменным. Хотя ошибочный сценарий, рассмотренный нами далее, предполагает систему с одним центральным процессором, вероятность ошибки также присутствует, если потоки А и В выполняются в одно и то же время на разных процессорах в многопроцессорной системе. В обычном программировании под Unix мы не сталкиваемся с подобными ошибками, так как при использовании функции

fork
родительский и дочерний процессы не используют совместно ничего, кроме дескрипторов. Тем не менее мы столкнемся с ошибками этого типа при обсуждении совместного использовании памяти несколькими процессами.

Эту проблему можно с легкостью продемонстрировать на примере потоков. В листинге 26.11 показана программа, которая создает два потока, после чего каждый поток увеличивает некоторую глобальную переменную 5000 раз.

Мы повысили вероятность ошибки за счет того, что потребовали от программы получить текущее значение переменной

counter
, вывести это значение и записать его. Если мы запустим эту программу, то получим результат, представленный в листинге 26.10.

Листинг 26.10. Результат выполнения программы, приведенной в листинге 26.11

4: 1

4: 2

4: 3

4: 4

 продолжение выполнения потока номер 4

4: 517

4: 518

5: 518 теперь выполняется поток номер 5

5: 519

5: 520

 продолжение выполнения потока номер 5

5: 926

5: 927

4: 519 теперь выполняется поток номер 4, записывая неверные значения

4: 520

Листинг 26.11. Два потока, которые неверно увеличивают значение глобальной переменной

//threads/example01.c

 1 #include "unpthread.h"


 2 #define NLOOP 5000


 3 int counter; /* потоки должны увеличивать значение этой переменной */


 4 void *doit(void*);


 5 int

 6 main(int argc, char **argv)

 7 {

 8  pthread_t tidA, tidB;


 9  Pthread_create(&tidA, NULL, &doit, NULL);

10  Pthread_create(&tidB, NULL, &doit, NULL);


11  /* ожидание завершения обоих потоков */

12  Pthread_join(tidA, NULL);

13  Pthread_join(tidB, NULL);


14  exit(0);

15 }


16 void*

17 doit(void *vptr)

18 {

19  int i, val;


20  /* Каждый поток получает, выводит и увеличивает на

21   * единицу переменную counter NLOOP раз. Значение

22   * переменной должно увеличиваться монотонно.

23   */


24  for (i = 0; i < NLOOP; i++) {

25   val = counter;

26   printf("%d: %d\n", pthread_self(), val + 1);

27   counter = val + 1;

28  }


29  return (NULL);

30 }

Обратите внимание на то, что в первый раз ошибка происходит при переключении системы с выполнения потока номер 4 на выполнение потока номер 5: каждый поток в итоге записывает значение 518. Это происходит множество раз на протяжении 10 000 строк вывода.

Недетерминированная природа ошибок такого типа также будет очевидна, если мы запустим программу несколько раз: каждый раз результат выполнения программы будет отличаться от предыдущего. Также, если мы переадресуем вывод результатов в файл на диске, эта ошибка иногда не будет возникать, так как программа станет работать быстрее, что приведет к уменьшению вероятности переключения системы между потоками. Наибольшее количество ошибок возникнет в случае, если программа будет работать интерактивно, записывая результат на медленный терминал, но при этом также сохраняя результат в файл при помощи программы Unix

script
(которая описана в главе 19 книги [110]).

Только что описанная проблема, возникающая, когда несколько потоков изменяют значение одной переменной, является самой простой из проблем параллельного программирования. Для решения этой проблемы используются так называемые взаимные исключения (mutex — mutual exclusion), с помощью которых контролируется доступ к переменной. В терминах Pthreads взаимное исключение — это переменная типа

pthread_mutex_t
, которая может быть заблокирована и разблокирована с помощью следующих двух функций:

#include 


int pthread_mutex_lock(pthread_mutex_t *mptr);

int pthread_mutex_unlock(pthread_mutex_t *mptr);

Обе функции возвращают: 0 в случае успешного выполнения, положительное значение Exxx в случае ошибки

Если некоторый поток попытается блокировать взаимное исключение, которое уже блокировано каким-либо другим потоком (то есть принадлежит ему в данный момент времени), этот поток окажется заблокированным до освобождения взаимного исключения.

Если переменная-исключение размещена в памяти статически, следует инициализировать ее константой

PTHREAD_MUTEX_INITIALIZER
. В разделе 30.8 мы увидим, что если мы размещаем исключение в совместно используемой (разделяемой) памяти, мы должны инициализировать его во время выполнения программы путем вызова функции
pthread_mutex_init
.

ПРИМЕЧАНИЕ

Некоторые системы (например, Solaris) определяют константу PTHREAD_MUTEX_INITIALIZER как 0. Если данная инициализация будет опущена, это ни на что не повлияет, так как статически размещаемые переменные все равно автоматически инициализируются нулем. Но для других систем такой гарантии дать нельзя — например, в Digital Unix константа инициализации ненулевая.

В листинге 26.12 приведена исправленная версия листинга 26.11, в которой используется одно взаимное исключение для блокирования счетчика при работе с двумя потоками.

Листинг 26.12. Исправленная версия листинга 26.11, использующая взаимное исключение для защиты совместно используемой переменной

//threads/examplе01.с

 1 #include "unpthread.h"


 2 #define NLOOP 5000


 3 int counter; /* увеличивается потоками */

 4 pthread_mutex_t counter_mutex = PTHREAD_MUTEX_INITIALIZER;


 5 void *doit(void*);


 6 int

 7 main(int argc, char **argv)

 8 {

 9  pthread_t tidA, tidB;


10  Pthread_create(&tidA, NULL, &doit, NULL);

11  Pthread_create(&tidB, NULL, &doit, NULL);


12  /* ожидание завершения обоих потоков */

13  Pthread_join(tidA, NULL);

14  Pthread_join(tidB, NULL);


15  exit(0);

16 }


17 void*

18 doit(void *vptr)

19 {

20  int i, val;


21  /*

22   * Каждый поток считывает, выводит и увеличивает счетчик NLOOP раз.

23   * Значение счетчика должно возрастать монотонно.

24   */


25  for (i = 0; i < NLOOP; i++) {

26   Pthread_mutex_lock(&counter_mutex);


27   val = counter;

28   printf(%d: %d\n", pthread_self(), val + 1);

29   counter = val + 1;


30   Pthread_mutex_unlock(&counter_mutex);

31  }


32  return(NULL);

33 }

Мы объявляем взаимное исключение с именем

counter_mutex
. Это исключение должно быть заблокировано потоком на то время, когда он манипулирует переменной counter. Когда мы запускали эту программу, результат всегда был правильным: значение переменной увеличивалось монотонно, а ее окончательное значение всегда оказывалось равным 10 000.

Насколько серьезной является дополнительная нагрузка, связанная с использованием взаимных исключений? Мы изменили программы, приведенные в листингах 26.11 и 26.12, заменив значение

NLOOP
на 50 000 (вместо исходного значения 5000), и засекли время, направив вывод на устройство
/dev/null
. Время работы центрального процессора в случае корректной версии, использующей взаимное исключение, увеличилось относительно времени работы некорректной версии без взаимного исключения на 10 %. Это означает, что использование взаимного исключения не связано со значительными издержками.

26.8. Условные переменные

Взаимное исключение позволяет предотвратить одновременный доступ к совместно используемой (разделяемой) переменной, но для того чтобы перевести поток в состояние ожидания (спящее состояние) до момента выполнения некоторого условия, необходим другой механизм. Продемонстрируем сказанное на следующем примере. Вернемся к нашему веб-клиенту из раздела 26.6 и заменим функцию Solaris

thr_join
на
pthread_join
. Но мы не можем вызвать функцию
pthread_join
до тех пор, пока не будем знать, что выполнение потока завершилось. Сначала мы объявляем глобальную переменную, которая служит счетчиком количества завершившихся потоков, и организуем управление доступом к ней с помощью взаимного исключения.

int ndone; /* количество потоков, завершивших выполнение */

pthread_mutex_t ndone_mutex = PTHREAD_MUTEX_INITIALIZER;

Затем мы требуем, чтобы каждый поток по завершении своего выполнения увеличивал этот счетчик на единицу, используя соответствующее взаимное исключение.

void* do_get_read(void *vptr) {

 ...


 Pthread_mutex_lock(&ndone_mutex);

 ndone++;

 Pthread_mutex_unlock(&ndone_mutex);


 return(fptr); /* завершение выполнения потока */

}

Но каким при этом получается основной цикл? Взаимное исключение должно быть постоянно блокировано основным циклом, который проверяет, какие потоки завершили свое выполнение.

while (nlefttoread > 0) {

 while (nconn < maxnconn && nlefttoconn > 0) {

  /* находим файл для чтения */

  ...

 }

 /* Проверяем, не завершен ли поток */

 Pthread_mutex_lock(&ndone_mutex);

 if (ndone > 0) {

  for (i =0; i < nfiles; i++) {

   if (file[i].f_flags & F_DONE) {

    Pthread_join(file[i].f_tid, (void**)&fptr);

    /* обновляем file[i] для завершенного потока */

    ...

   }

  }

 }

 Pthread_mutex_unlock(&ndone_mutex);

}

Это означает, что главный поток никогда не переходит в спящее состояние, а просто входит в цикл, проверяя каждый раз значение переменной

ndone
. Этот процесс называется опросом (polling) и рассматривается как пустая трата времени центрального процессора.

Нам нужен метод, с помощью которого главный цикл мог бы входить в состояние ожидания, пока один из потоков не оповестит его о том, что какая-либо задача выполнена. Эта возможность обеспечивается использованием условной переменной (conditional variable) вместе со взаимным исключением. Взаимное исключение используется для реализации блокирования, а условная переменная обеспечивает сигнальный механизм.

В терминах Pthreads условная переменная — это переменная типа

pthread_cond_t
. Такие переменные используются в следующих двух функциях:

#include 


int pthread_cond_wait(pthread_cond_t *cptr, pthread_mutex_t *mptr);

int pthread_cond_signal(pthread_cond_t *cptr);

Обе функции возвращают: 0 в случае успешного выполнения, положительное значение Exxx в случае ошибки

Слово

signal
в названии второй функции не имеет отношения к сигналам Unix
SIGxxx
.

Проще всего объяснить действие этих функций на примере. Вернемся к нашему примеру веб-клиента. Счетчик

ndone
теперь ассоциируется и с условной переменной, и с взаимным исключением:

int ndone;

pthread_mutex_t ndone_mutex = PTHREAD_MUTEX_INITIALIZER;

pthread_cond_t ndone_cond = PTHREAD_COND_INITIALIZER;

Поток оповещает главный цикл о своем завершении, увеличивая значение счетчика, пока взаимное исключение принадлежит данному потоку (блокировано им), и используя условную переменную для сигнализации.

Pthread_mutex_lock(&ndone_mutex);

ndone++;

Pthread_cond_signal(&ndone_cond);

Pthread_mutex_unlock(&ndone_mutex);

Затем основной цикл блокируется в вызове функции

pthread_cond_wait
, ожидая оповещения о завершении выполнения потока:

while (nlefttoread > 0) {

 while (nconn < maxnconn && nlefttoconn > 0) {

  /* находим файл для чтения */

  ...

 }


 /* Ждем завершения выполнения какого-либо потока */

 Pthread_mutex_lock(&ndone_mutex);

 while (ndone == 0)

  Pthread_cond_wait(&ndone_cond, &ndone_mutex);


 for (i = 0; i < nfiles; i++) {

  if (file[i].f_flags & F_DONE) {

   Pthread_join(file[i].f_tid, (void**)&fptr);


   /* обновляем file[i] для завершенного потока */

   ...

  }

 }

 Pthread_mutex_unlock(&ndone_mutex);

}

Обратите внимание на то, что переменная

ndone
по-прежнему проверяется, только если потоку принадлежит взаимное исключение. Тогда, если не требуется выполнять какое-либо действие, вызывается функция
pthread_cond_wait
. Таким образом, вызывающий поток переходит в состояние ожидания, и разблокируется взаимное исключение, которое принадлежало этому потоку. Кроме того, когда управление возвращается потоку функцией
pthread_cond_wait
(после того как поступил сигнал от какого-либо другого потока), он снова блокирует взаимное исключение.

Почему взаимное исключение всегда связано с условной переменной? «Условие» обычно представляет собой значение некоторой переменной, используемой совместно несколькими потоками. Взаимное исключение требуется для того, чтобы различные потоки могли задавать и проверять значение условной переменной. Например, если в примере кода, приведенном ранее, отсутствовало бы взаимное исключение, то проверка в главном цикле выглядела бы следующим образом:

/* Ждем завершения выполнения одного или нескольких потоков */

while (ndone == 0)

 Pthread_cond_wait(&ndone_cond, &ndone_mutex);

Но при этом существует вероятность, что последний поток увеличивает значение переменной

ndone
после проверки главным потоком условия
ndone == 0
, но перед вызовом функции
pthread_cond_wait
. Если это происходит, то последний «сигнал» теряется, и основной цикл оказывается заблокированным навсегда, так как он будет ждать события, которое никогда не произойдет.

По этой же причине при вызове функции

pthread_cond_wait
поток должен блокировать соответствующее взаимное исключение, после чего эта функция разблокирует взаимное исключение и помещает вызывающий поток в состояние ожидания, выполняя эти действия как одну атомарную операцию. Если бы эта функция не разблокировала взаимное исключение и не заблокировала его снова после своего завершения, то выполнять эти операции пришлось бы потоку, как показано в следующем фрагменте кода:

/* Ждем завершения выполнения одного или нескольких потоков */

Pthread_mutex_lock(&ndone_mutex);

while (ndone == 0) {

 Pthread_mutex_unlock(&ndone_mutex);

 Pthread_cond_wait(&ndone_cond, &ndone_mutex);

 Pthread_mutex_lock(&ndone_mutex);

}

Существует вероятность того, что по завершении выполнения поток увеличит на единицу значение переменной

ndone
и это произойдет между вызовом функций
pthread_mutex_unlock
и
pthread_cond_wait
.

Обычно функция

pthread_cond_signal
выводит из состояния ожидания один поток, на который указывает условная переменная. Существуют ситуации, когда некоторый поток знает, что из состояния ожидания должны быть выведены несколько потоков. В таком случае используется функция
pthread_cond_broadcast
, выводящая из состояния ожидания все потоки, которые блокированы условной переменной.

#include 


int pthread_cond_broadcast(pthread_cond_t *cptr);

int pthread_cond_timedwait(pthread_cond_t *cptr, pthread_mutex_t *mptr,

 const struct timespec *abstime);

Обе функции возвращают: 0 в случае успешного выполнения, положительное значение Exxx в случае ошибки

Функция

pthread_cond_timedwait
позволяет потоку задать предельное время блокирования. Аргумент
abstime
представляет собой структуру
timespec
(определенную в разделе 6.9 при рассмотрении функции
pselect
), которая задает системное время для момента, когда функция должна возвратить управление, даже если к этому моменту условная переменная не подала сигнал. Если возникает такая ситуация, возвращается ошибка
ETIME
.

В данном случае значение времени является абсолютным значением времени, в отличие от относительного значения разницы во времени (time delta) между некоторыми событиями. Иными словами,

abstime
— это системное время, то есть количество секунд и наносекунд, прошедших с 1 января 1970 года (UTC) до того момента, когда эта функция должна вернуть управление. Здесь имеется различие как с функцией
pselect
, так и с функцией
select
, задающими количество секунд (и наносекунд в случае
pselect
) до некоторого момента в будущем, когда функция должна вернуть управление. Обычно для этого вызывается функция
gettimeofday
, которая выдает текущее время (в виде структуры
timeval
), а затем оно копируется в структуру
timespec
и к нему добавляется требуемое значение:

struct timeval tv;

struct timespec ts;


if (gettimeofday(&tv, NULL) < 0)

 err_sys("gettimeofday error");

ts.tv_sec = tv.tv_sec + 5; /* 5 с в будущем */

ts.tv_nsec = tv.tv_usec * 1000; /* микросекунды переводим в наносекунды */


pthread_cond_timedwait( , &ts);

Преимущество использования абсолютного времени (в противоположность относительному) заключается в том, что функция может завершиться раньше (возможно, из-за перехваченного сигнала). Тогда функцию можно вызвать снова, не меняя содержимое структуры

timespec
. Недостаток этого способа заключается в необходимости вызывать дополнительно функцию
gettimeofday
перед тем, как в первый раз вызывать функцию
pthread_cond_timedwait
.

ПРИМЕЧАНИЕ

В POSIX определена новая функция clock_gettime, возвращающая текущее время в виде структуры timespec.

26.9. Веб-клиент и одновременный доступ

Изменим код нашего веб-клиента из раздела 26.6: уберем вызов функции Solaris

thr_join
и заменим его вызовом функции
pthread_join
. Как сказано в разделе 26.6, теперь нам нужно точно указать, завершения какого потока мы ждем. Для этого мы используем условную переменную, описанную в разделе 26.8.

Единственным изменением в отношении глобальных переменных (см. листинг 26.7) является добавление нового флага и условной переменной:

#define F_JOINED 8 /* количество потоков */


int ndone; /* количество завершившихся потоков */

pthread_mutex_t ndone_mutex = PTHREAD_MUTEX_INITIALIZER;

pthread_cond_t ndone_cond = PTHREAD_COND_IINITIALIZER;

Единственным изменением функции

do_get_read
(см. листинг 26.9) будет увеличение на единицу значения переменной
ndone
и оповещение главного цикла о завершении выполнения потока:

 printf("end-of-file on %s\n", fptr->f_name);

 Close(fd);


 Pthread_mutex_lock(&ndone_mutex);

 fptr->f_flags = F_DONE; /* сбрасывает флаг F_READING */

 ndone++;

 Pthread_cond_signal(&ndone_cond);

 Pthread_mutex_unlock(&ndone_mutex);


 return(fptr); /* завершение выполнения потока */

}

Большинство изменений касается главного цикла, представленного в листинге 26.8. Новая версия показана в листинге 26.13.

Листинг 26.13. Основной рабочий цикл функции main

//threads/web03.c

43  while (nlefttoread > 0) {

44   while (nconn < maxnconn && nlefttoconn > 0) {

45    /* находим файл для считывания */

46    for (i = 0; i < nfiles; i++)

47     if (file[i].f_flags == 0)

48      break;

49    if (i == nfiles)

50     err_quit("nlefttoconn = %d but nothing found", nlefttoconn);


51    file[i].f_flags = F_CONNECTING;

52    Pthread_create(&tid, NULL, &do_get_read, &file[i]);

53    file[i].f_tid = tid;

54    nconn++;

55    nlefttoconn--;

56   }


57   /* Ждем завершения выполнения одного из потоков */

58   Pthread_mutex_lock(&ndone_mutex);

59   while (ndone == 0)

60    Pthread_cond_wait(&ndone_cond, &ndone_mutex);


61   for (i = 0; i < nfiles; i++) {

62    if (file[i].f_flags & F_DONE) {

63     Pthread_join(file[i].f_tid, (void**)&fptr);


64     if (&file[i] != fptr)

65      err_quit("file[i] != fptr");

66     fptr->f_flags = F_JOINED; /* clears F_DONE */

67     ndone--;

68     nconn--;

69     nlefttoread--;

70     printf("thread %d for %s done\n", fptr->f_tid, fptr->f_name);

71    }

72   }

73   Pthread_mutex_unlock(&ndone_mutex);

74  }


75  exit(0);

76 }

По возможности создаем новый поток

44-56
 Эта часть кода не изменилась.

Ждем завершения выполнения потока

57-60
 Мы ждем завершения выполнения потоков, отслеживая, когда значение
ndone
станет равно нулю. Как сказано в разделе 26.8, эта проверка должна быть проведена перед тем, как взаимное исключение будет блокировано, а переход потока в состояние ожидания осуществляется функцией
pthread_cond_wait
.

Обработка завершенного потока

61-73
 Когда выполнение потока завершилось, мы перебираем все структуры
file
, отыскивая соответствующий поток, вызываем
pthread_join
, а затем устанавливаем новый флаг
F_JOINED
.

В табл. 16.1 показано, сколько времени требует выполнение этой версии веб-клиента, а также версии, использующей неблокируемую функцию

connect
.

26.10. Резюме

Создание нового потока обычно требует меньше времени, чем порождение нового процесса с помощью функции

fork
. Одно это уже является большим преимуществом использования потоков на активно работающих сетевых серверах. Многопоточное программирование, однако, представляет собой отдельную технологию, требующую большей аккуратности при использовании.

Все потоки одного процесса совместно используют глобальные переменные и дескрипторы, тем самым эта информация становится доступной всем потокам процесса. Но совместное использование информации вносит проблемы, связанные с синхронизацией доступа к разделяемым переменным, и поэтому нам следует использовать примитивы синхронизации технологии Pthreads — взаимные исключения и условные переменные. Синхронизация доступа к совместно используемым данным — необходимое условие почти для любого приложения, работающего с потоками.

При разработке функций, которые могут быть вызваны таким приложением, нужно учитывать требование безопасности в многопоточной среде. Это требование выполнимо при использовании собственных данных потоков (thread-specific data), пример которых мы показали при рассмотрении функции

readline
в этой главе.

К модели потоков мы вернемся в главе 30, где сервер при запуске создает пул потоков. Для обслуживания очередного клиентского запроса используется любой свободный поток.

Упражнения

1. Сравните использование дескриптора в случае, когда в коде сервера применяется функция

fork
, и в случае, когда используются потоки. Предполагается, что одновременно обслуживается 100 клиентов.

2. Что произойдет в листинге 26.2, если поток при завершении функции

str_echo
не вызовет функцию
close
для закрытия сокета?

3. В листингах 5.4 и 6.2 мы выводили сообщение

Server terminated prematurely
(Сервер завершил работу преждевременно), когда мы ждали от сервера прибытия отраженной строки, а вместо этого получали признак конца файла (см. раздел 5.12). Модифицируйте листинг 26.1 таким образом, чтобы в соответствующих случаях также выдавалось аналогичное сообщение.

4. Модифицируйте листинги 26.5 и 26.6 таким образом, чтобы программы можно было компилировать в системах, не поддерживающих потоки.

5. Чтобы увидеть ошибку в функции

readline
, приведенной в листинге 26.2, запустите эту программу на стороне сервера. Затем измените эхо-клиент TCP из листинга 6.2, корректно работающий в пакетном режиме. Возьмите какой- либо большой текстовый файл в своей системе и трижды запустите клиент в пакетном режиме, чтобы он считывал текст из этого файла и записывал результат во временный файл. Если есть возможность, запустите клиенты на другом узле (не на том, на котором запущен сервер). Если все три клиента выполнят работу правильно (часто они зависают), посмотрите на файлы с результатом и сравните их с исходным файлом.

Теперь создайте версию сервера, используя корректную версию функции

readline
из раздела 26.5. Повторите тест, используя три эхо-клиента. Теперь все три клиента должны работать исправно. Также поместите функцию
printf
в функции
readline_destructor
,
readline_once
и в вызов функции
malloc
в
readline
. Это даст вам возможность увидеть, что ключ создается только один раз, но для каждого потока выделяется область памяти и вызывается функция-деструктор.

Глава 27