UNIX: взаимодействие процессов — страница 22 из 35

В этом разделе приведены тексты трех программ, измеряющих полосу пропускания каналов, очередей сообщений Posix и System V. Результаты работы этих программ приведены в табл. А.2 и А.З.

Измерение полосы пропускания канала

На рис. А.7 приведена схема описываемой программы.

Рис. А.7. Схема программы измерения полосы пропускания канала


В листинге А.1 приведен текст первой половины программы bw_pipe, измеряющей полосу пропускания канала.

Листинг А.1. Функция main, измеряющая полосу пропускания канала

//bench/bw_pipe.c

1  #include "unpipc.h"

2  void reader(int, int, int);

3  void writer(int, int);

4  void *buf;

5  int totalnbytes, xfersize;


6  int

7  main(int argc, char **argv)

8  {

9   int i, nLoop, contpipe[2], datapipe[2];

10  pid_t childpid;

11  if (argc != 4)

12   err_quit("usage: bw_pipe <#loops><#mbytes><#bytes/write>");

13  nloop = atoi(argv[1]);

14  totalnbytes = atoi(argv[2]) * 1024 * 1024;

15  xfersize = atoi(argv[3]);

16  buf = Valloc(xfersize);

17  Touch(buf, xfersize);

18  Pipe(contpipe);

19  Pipe(datapipe);

20  if ((childpid = Fork()) == 0) {

21   writer(contpipe[0], datapipe[1]); /* child */

22   exit(0);

23  }

24  /* 4parent */

25  Start_time();

26  for (i = 0; i < nloop; i++)

27   reader(contpipe[1], datapipe[0], totalnbytes);

28  printf("bandwidth: %.3f MB/sec\n",

29   totalnbytes / Stop_time() * nloop);

30  kill(childpid, SIGTERM);

31  exit(0);

32 }

Аргументы командной строки

11-15 Аргументы командной строки задают количество повторов (обычно 5), количество передаваемых мегабайтов (если указать 10, будет передано 10×1024×1024 байт) и количество байтов для каждой операции read и write (которое может принимать значения от 1024 до 65536 в наших измерениях).

Выделение буфера и помещение начального значения

16-17 Вызов valloc аналогичен malloc, но выделяемая память начинается с границы страницы памяти. Функция touch (листинг А.3) помещает 1 байт данных в каждую страницу буфера, заставляя ядро считать в память все страницы данного буфера. Мы всегда выполняем это перед проведением измерений.

ПРИМЕЧАНИЕ

Функция valloc не входит в стандарт Posix.1 и названа устаревшей в Unix 98. Она требовалась в ранних версиях спецификаций Х/Open, но уже не является необходимой. Обертка Valloc вызывает функцию malloc, если valloc недоступна.

Создание двух каналов

18-19 Создаются два канала: contpipe[0] и contpipe[1] используются для синхронизации процессов перед началом передачи, a datapipe[0] и datapipe[1] используются для передачи самих данных.

Вызов fork

20-31 Создается дочерний процесс, вызывающий функцию writer, а родительский процесс в это время вызывает функцию reader. Функция reader вызывается nlоор раз. Функция start_time вызывается непосредственно перед началом цикла, a stop_time — сразу после его окончания. Эти функции даны в листинге А.З. Полоса пропускания представляет собой количество байтов, переданных за все проходы цикла, поделенное на время, затраченное на передачу (stop_time возвращает количество микросекунд, прошедшее с момент запуска start_time). Затем дочерний процесс завершается сигналом SIGTERM и программа завершает свою работу. Вторая половина программы приведена в листинге А.2. Она состоит из функций reader и writer.

Листинг А.2. Функции reader и writer

//bench/bw_pipe.cvoid

33 void

34 writer(int contfd, int datafd)

35 {

36  int ntowrite;

37  for(;;) {

38   Read(contfd, &ntowrite, sizeof(ntowrite));

39   while (ntowrite > 0) {

40    Write(datafd, buf, xfersize);

41    ntowrite –= xfersize;

42   }

43  }

44 }


45 void

46 reader(int contfd, int datafd, int nbytes)

47 {

48  ssize_t n;

49  Write(contfd, &nbytes, sizeof(nbytes));

50  while ((nbytes > 0) &&

51   ((n = Read(datafd, buf, xfersize)) > 0)) {

52   nbytes –= n;

53  }

54 }

Функция writer

33-44 Функция writer представляет собой бесконечный цикл, вызываемый дочерним процессом. Он ожидает сообщения родительского процесса о готовности к приему данных, считывая целое число из управляющего канала. Это целое число определяет количество байтов, которое будет записано в канал данных. При получении этого числа дочерний процесс записывает данные в канал, отправляя их родителю. За один вызов write записывается xfersize байтов.

Функция reader

45-54 Эта функция вызывается родительским процессом в цикле. Каждый раз при вызове функции в управляющий канал записывается целое число, указывающее дочернему процессу на необходимость помещения соответствующего количества данных в канал данных. Затем функция вызывает read в цикле до тех пор, пока не будут приняты все данные.

Текст функций start_time, stop_time и touch приведен в листинге А.З.

Листинг А.З. Функции start_sime, stop_time и touch

//lib/timing.с

1  #include "unpipc.h"

2  static struct timeval tv_start, tv_stop;


3  int

4  start_time(void)

5  {

6   return(gettimeofday(&tv_start, NULL));

7  }


8  double

9  stop_time(void)

10 {

11  double clockus;

12  if (gettimeofday(&tv_stop, NULL) == –1)

13   return(0.0);

14  tv_sub(&tv_stop, &tv_start);

15  clockus = tv_stop.tv_sec * 1000000.0 + tv_stop.tv_usec;

16  return(clockus);

17 }


18 int

19 touch(void *vptr, int nbytes)

20 {

21  char *cptr;

22  static int pagesize = 0;

23  if (pagesize == 0) {

24   errno = 0;

25 #ifdef _SC_PAGESIZE

26   if ((pagesize = sysconf(_SC_PAGESIZE)) == –1)

27    return(-1);

28 #else

29   pagesize = getpagesize(); /* BSD */

30 #endif

31  }

32  cptr = vptr;

33  while (nbytes > 0) {

34   *cptr = 1;

35   cptr += pagesize;

36   nbytes –= pagesize;

37  }

38  return(0);

39 }

Текст функции tv_sub приведен в листинге А.4. Она осуществляет вычитание двух структур timeval, сохраняя результат в первой структуре.

Листинг А.4. Функция tv_sub: вычитание двух структур timeval

//lib/tv_sub.c

1  #include "unpipc.h"


2  void

3  tv_sub(struct timeval *out, struct timeval *in)

4  {

5   if ((out->tv_usec –= in->tv_usec) < 0) { /* out –= in */

6    --out->tv_sec;

7    out->tv_usec += 1000000;

8   }

9   out->tv_sec –= in->tv_sec;

10 }

На компьютере Sparc под управлением Solaris 2.6 при выполнении программы пять раз подряд получим следующий результат:

solaris % bw_pipe 5 10 65536

bandwidth: 13.722 MB/sec

solaris % bw_pipe 5 10 65536

bandwidth: 13.781 MB/sec

solaris % bw_pipe 5 10 65536

bandwidth: 13.685 MB/sec

solaris % bw_pipe 5 10 65536

bandwidth: 13.665 MB/sec

solaris % bw_pipe 5 10 65536

bandwidth: 13.584 MB/sec

Каждый раз мы задаем пять циклов, 10 Мбайт за цикл и 65536 байт за один вызов write или read. Среднее от этих пяти результатов даст величину 13,7 Мбайт в секунду, приведенную в табл. А.2.

Измерение полосы пропускания очереди сообщений Posix

В листинге А.5 приведена функция main программы, измеряющей полосу пропускания очереди сообщений Posix. Листинг А.6 содержит функции reader и writer. Эта программа устроена аналогично предыдущей, измерявшей полосу пропускания канала.

ПРИМЕЧАНИЕ

Обратите внимание, что в программе приходится указывать максимальное количество сообщений в очереди при ее создании. Мы указываем значение 4. Размер канала IPC может влиять на производительность, потому что записывающий процесс может отправить это количество сообщений, прежде чем будет заблокирован в вызове mq_send, что приведет к переключению контекста на считывающий процесс. Следовательно, производительность программы зависит от этого магического числа. Изменение его с 4 на 8 в Solaris 2.6 никак не влияет на величины, приведенные в табл. А.2, но в Digital Unix 4.0B производительность уменьшается на 12%. Мы могли ожидать, что производительность возрастет с увеличением количества сообщений в очереди, поскольку требуется в два раза меньше переключений контекста. Однако если используется отображение файла в память, это увеличивает размер отображаемого файла в два раза, как и требуемое количество памяти.

Листинг А.5. Функция main для измерения полосы пропускания очереди сообщений Posix

//bench/bw_pxmsg.c

1  #include "unpipc.h"

2  #define NAME "bw_pxmsg"

3  void reader(int, mqd_t, int);

4  void writer(int, mqd_t);

5  void *buf;

6  int totalnbytes, xfersize;


7  int

8  main(int argc, char **argv)

9  {

10  int i, nloop, contpipe[2];

11  mqd_t mq;

12  pid_t childpid;

13  struct mq_attr attr;

14  if (argc != 4)

15   err_quit("usage: bw_pxmsg <#loops><#mbytes><#bytes/write>");

16  nloop = atoi(argv[1]);

17  totalnbytes = atoi(argv[2]) * 1024 * 1024;

18  xfersize = atoi(argv[3]);

19  buf = Valloc(xfersize);

20  Touch(buf, xfersize);

21  Pipe(contpipe);

22  mq_unlink(Px_ipc_name(NAME)); /* error OK */

23  attr.mq_maxmsg = 4;

24  attr.mq_msgsize = xfersize;

25  mq = Mq_open(Px_ipc_name(NAME), O_RDWR | O_CREAT, FILE_MODE, &attr);

26  if ((childpid = Fork()) == 0) {

27   writer(contpipe[0], mq); /* child */

28   exit(0);

29  }

30  /* 4parent */

31  Start_time();

32  for (i = 0; i < nloop; i++)

33   reader(contpipe[1], mq, totalnbytes);

34  printf("bandwidth: %.3f MB/sec\n",

35  totalnbytes / Stop_time() * nloop);

36  kill(childpid, SIGTERM);

37  Mq_close(mq);

38  Mq_unlink(Px_ipc_name(NAME));

39  exit(0);

40 }

Листинг А.6. Функции reader и writer

//bench/bw_pxmsg.c

41 void

42 writer(int contfd, mqd_t mqsend)

43 {

44  int ntowrite;

45  for(;;) {

46   Read(contfd, &ntowrite, sizeof(ntowrite));

47   while (ntowrite > 0) {

48    Mq_send(mqsend, buf, xfersize, 0);

49    ntowrite –= xfersize;

50   }

51  }

52 }


53 void

54 reader(int contfd, mqd_t mqrecv, int nbytes)

55 {

56  ssize_t n;

57  Write(contfd, &nbytes, sizeof(nbytes));

58  while ((nbytes > 0) &&

59   ((n = Mq_receive(mqrecv, buf, xfersize, NULL)) > 0)) {

60   nbytes –= n;

61  }

62 }

Программа измерения полосы пропускания очереди System V

В листинге А.7 приведен текст функции main, измеряющей полосу пропускания очередей сообщений System V, а в листинге А.8 текст функций reader и writer.

Листинг А.7. Функция main для измерения полосы пропускания очереди сообщений System V

//bench/bw_svmsg.c

1  #include "unpipc.h"

2  void reader(int, int, int);

3  void writer(int, int);

4  struct msgbuf *buf;

5  int totalnbytes, xfersize;


6  int

7  main(int argc, char **argv)

8  {

9   int i, nloop, contpipe[2], msqid;

10  pid_t childpid;

11  if (argc != 4)

12   err_quit("usage: bw_svmsg <#loops><#mbytes><#bytes/write>");

13  nloop = atoi(argv[1]);

14  totalnbytes = atoi(argv[2]) * 1024 * 1024;

15  xfersize = atoi(argv[3]);

16  buf = Valloc(xfersize);

17  Touch(buf, xfersize);

18  buf->mtype = 1;

19  Pipe(contpipe);

20  msqid = Msgget(IPC_PRIVATE, IPC_CREAT | SVMSG_MODE);

21  if ((childpid = Fork()) == 0) {

22   writer(contpipe[0], msqid); /* дочерний процесс */

23   exit(0);

24  }

25  Start_time();

26  for (i = 0; i < nloop; i++)

27   reader(contpipe[1], msqid, totalnbytes);

28  printf("bandwidth: %.3f MB/sec\n",

29  totalnbytes / Stop_time() * nloop);

30  kill(childpid, SIGTERM);

31  Msgctl(msqid, IPC_RMID, NULL);

32  exit(0);

33 }

Листинг А.8. Функции reader и writer

//bench/bw_svmsg.c

34 void

35 writer(int contfd, int msqid)

36 {

37  int ntowrite;

38  for (;;) {

39   Read(contfd, &ntowrite, sizeof(ntowrite));

40   while (ntowrite > 0) {

41    Msgsnd(msqid, buf, xfersize – sizeof(long), 0);

42    ntowrite –= xfersize;

43   }

44  }

45 }


46 void

47 reader(int contfd, int msqid, int nbytes)

48 {

49  ssize_t n;

50  Write(contfd, &nbytes, sizeof(nbytes));

51  while ((nbytes > 0) &&

52   ((n = Msgrcv(msqid, buf, xfersize – sizeof(long), 0, 0)) > 0)) {

53   nbytes –= n + sizeof(long);

54  }

55 }

Программа измерения полосы пропускания дверей

Программа измерения полосы пропускания интерфейса дверей сложнее, чем предыдущие, поскольку нам нужно вызвать fork перед созданием двери. Родительский процесс создает дверь и с помощью канала оповещает дочерний процесс о том, что ее можно открывать.

Другое изменение заключается в том, что в отличие от рис. А.7 функция reader не принимает данные. Данные принимаются функцией server, которая является процедурой сервера для данной двери. На рис. А.8 изображена схема программы. 

Рис. А.8. Схема программы измерения полосы пропускания дверей


Поскольку двери поддерживаются только в Solaris, мы упростим программу, предполагая наличие двустороннего канала (раздел 4.4).

Еще одно изменение вызвано фундаментальным различием между передачей сообщений и вызовом процедуры. В программе, работавшей с очередью сообщений Posix, например, записывающий процесс просто помещал сообщения в очередь в цикле, что осуществляется асинхронно. В какой-то момент очередь будет заполнена или записывающий процесс будет просто приостановлен, и тогда считывающий процесс получит сообщения. Если, например, в очередь помещается 8 сообщений и записывающий процесс помещал в нее 8 сообщений каждый раз, когда получал управление, а считывающий процесс считывал 8 сообщений, отправка N сообщений требовала N/4 переключения контекста. Интерфейс дверей является синхронным: вызывающий процесс блокируется каждый раз при вызове door_call и не может возобновиться до тех пор, пока сервер не завершит работу. Передача N сообщений в этом случае требует N/2 переключений контекста. С той же проблемой мы столкнемся при измерении полосы пропускания вызовов RPC. Несмотря на увеличившееся количество переключений контекста, из рис. А.1 следует, что двери обладают наибольшей полосой пропускания при размере сообщений не более 25 Кбайт.

В листинге А.9 приведен текст функции main нашей программы. Функции writer, server и reader приведены в листинге А.10.

Листинг А.9. Функция main измерения полосы пропускания интерфейса дверей

//bench/bw_door.c

1  #include "unpipc.h"

2  void reader(int, int);

3  void writer(int);

4  void server(void *, char *, size_t, door_desc_t *, size_t);

5  void *buf;

6  int totalnbytes, xfersize, contpipe[2];


7  int

8  main(int argc, char **argv)

9  {

10  int i, nloop, doorfd;

11  char c;

12  pid_t childpid;

13  ssize_t n;

14  if (argc != 5)

15   err_quit("usage: bw_door <#loops><#mbytes><#bytes/write>");

16  nloop = atoi(argv[2]);

17  totalnbytes = atoi(argv[3]) * 1024 * 1024;

18  xfersize = atoi(argv[4]);

19  buf = Valloc(xfersize);

20  Touch(buf, xfersize);

21  unlink(argv[1]);

22  Close(Open(argv[1], O_CREAT | O_EXCL | O_RDWR, FILE_MODE));

23  Pipe(contpipe); /* предполагается наличие двустороннего канала SVR4 */

24  if ((childpid = Fork()) == 0) {

25   /* дочерний процесс = клиент */

26   if ((n = Read(contpipe[0], &c, 1)) != 1)

27    err_quit("child: pipe read returned %d", n);

28   doorfd = Open(argv[1], O_RDWR);

29   writer(doorfd);

30   exit(0);

31  }

32  /* родительский процесс = сервер */

33  doorfd = Door_create(server, NULL, 0);

34  Fattach(doorfd, argv[1]);

35  Write(contpipe[1], &c, 1); /* уведомление о готовности двери */

36  Start_time();

37  for (i = 0; i < nloop; i++)

38   reader(doorfd, totalnbytes);

39  printf("bandwidth: %.3f MB/sec\n",

40  totalnbytes / Stop_time() * nloop);

41  kill(childpid, SIGTERM);

42  unlink(argv[1]);

43  exit(0);

44 }

Листинг A.10. Функции writer, server, reader для интерфейса дверей

//bench/bw_door.c

45 void

46 writer(int doorfd)

47 {

48  int ntowrite;

49  door_arg_t arg;

50  arg.desc_ptr = NULL; /* дескрипторы не передаются */

51  arg.desc_num = 0;

52  arg.rbuf = NULL; /* значения не возвращаются */

53  arg.rsize = 0;

54  for(;;) {

55   Read(contpipe[0], &ntowrite, sizeof(ntowrite));

56   while (ntowrite > 0) {

57    arg.data_ptr = buf;

58    arg.data_size = xfersize;

59    Door_call(doorfd, &arg);

60    ntowrite –= xfersize;

61   }

62  }

63 }


64 static int ntoread, nread;


65 void

66 server(void *cookie, char *argp, size_t arg_size,

67  door_desc_t *dp, size_t n_descriptors)

68 {

69  char c;

70  nread += arg_size;

71  if (nread >= ntoread)

72   Write(contpipe[0], &c, 1); /* запись закончена */

73  Door_return(NULL, 0, NULL, 0);

74 }


75 void

76 reader(int doorfd, int nbytes)

77 {

78  char c;

79  ssize_t n;

80  ntoread = nbytes; /* глобальные переменные процедуры сервера */

81  nread = 0;

82  Write(contpipe[1], &nbytes, sizeof(nbytes));

83  if ((n = Read(contpipe[1], &c, 1)) != 1)

84   err_quit("reader: pipe read returned %d", n);

85 }

Программа определения полосы пропускания Sun RPC

Поскольку вызовы процедур в Sun RPC являются синхронными, для них действует то же ограничение, что и для дверей (см. выше). В данном случае проще создать две программы (клиент и сервер), поскольку они создаются автоматически программой rpcgen. В листинге А.11 приведен файл спецификации RPC. Мы объявляем единственную процедуру, принимающую скрытые данные переменной длины в качестве входного аргумента и ничего не возвращающую.

В листинге А.12 приведен текст программы-клиента, а в листинге А.13 — процедура сервера. Мы указываем протокол в качестве аргумента командной строки при вызове клиента, что позволяет нам измерить скорость работы обоих протоколов.

Листинг А.11. Спецификация RPC для измерения полосы пропускания RPC

//bench/bw_sunrpc.х

1 %#define DEBUG /* сервер выполняется в приоритетном режиме */


2 struct data_in {

3  opaque data<>; /* скрытые данные переменной длины */

4 };


5 program BW_SUNRPC_PROG {

6  version BW_SUNRPC_VERS {

7   void BW_SUNRPC(data_in) = 1;

8  } = 1;

9 } = 0x31230001;

Листинг A.12. Клиент RPC для измерения полосы пропускания

//bench/bw_sunrpc_client.с

1  #include "unpipc.h"

2  #include "bw_sunrpc.h"


3  void *buf;

4  int totalnbytes, xfersize;


5  int

6  main(int argc, char **argv)

7  {

8   int i, nloop, ntowrite;

9   CLIENT *cl;

10  data_in in;

11  if (argc != 6)

12   err_quit("usage: bw_sunrpc_client <#loops>"

13   " <#mbytes><#bytes/write>");

14  nloop = atoi(argv[2]);

15  totalnbytes = atoi(argv[3]) * 1024 * 1024;

16  xfersize = atoi(argv[4]);

17  buf = Valloc(xfersize);

18  Touch(buf, xfersize);

19  cl = Clnt_create(argv[1], BW_SUNRPC_PROG, BW_SUNRPC_VERS, argv[5]);

20  Start_time();

21  for (i = 0; i < nloop; i++) {

22   ntowrite = totalnbytes;

23   while (ntowrite > 0) {

24    in.data.data_len = xfersize;

25    in.data.data_val = buf;

26    if (bw_sunrpc_1(&in, cl) == NULL)

27     err_quit("%s", clnt_sperror(cl, argv[1]));

28    ntowrite –= xfersize;

29   }

30  }

31  printf("bandwidth: %.3f MB/sec\n",

32   totalnbytes / Stop_time() * nloop);

33  exit(0);

34 }

Листинг A.13. Процедура сервера для измерения полосы пропускания RPC

//bench/bw_sunrpc_server.c

1  #include "unpipc.h"

2  #include "bw_sunrpc.h"

3  #ifndef RPCGEN_ANSIC

4  #define bw_sunrpc_1_svc bw_sunrpc_1

5  #endif


6  void *

7  bw_sunrpc_1_svc(data_in *inp, struct svc_req *rqstp)

8  {

9   static int nbytes;

10  nbytes = inp->data.data_len;

11  return(&nbytes); /* должен быть ненулевым, но xdr_void игнорирует */

12 }

А.4. Измерение задержки передачи сообщений: программы