Параллельное программирование на С++ в действии — страница 15 из 53

Использование описанных выше средств синхронизации в качестве строительных блоков позволяет сосредоточиться на самих нуждающихся в синхронизации операциях, а не на механизмах реализации. В частности, код можно упростить, применяя более функциональный (в смысле функционального программирования) подход к программированию параллелизма. Вместо того чтобы напрямую разделять данные между потоками, мы можем снабдить каждый поток необходимыми ему данными, а результаты вычисления предоставить другим потокам, которые в них заинтересованы, с помощью будущих результатов.

4.4.1. Функциональное программирование с применением будущих результатов

Термином функциональное программирование (ФП) называют такой стиль программирования, при котором результат функции зависит только от ее параметров, но не от внешнего состояния. Это напрямую соотносится с понятием функции в математике и означает, что если два раза вызвать функцию с одними и теми же параметрами, то получатся одинаковые результаты. Таким свойством обладают многие математические функции в стандартной библиотеке С++, например

sin
,
cos
и
sqrt
, а также простые операции над примитивными типами, например
3+3
,
6*9
или
1.3/4.7
. Чистая функция не модифицирует никакое внешнее состояние, она воздействует только на возвращаемое значение.

При таком подходе становится проще рассуждать о функциях, особенно в присутствии параллелизма, поскольку многие связанные с разделяемой памятью проблемы, обсуждавшиеся в главе 3, просто не возникают. Если разделяемые данные не модифицируются, то не может быть никакой гонки и, стало быть, не нужно защищать данные с помощью мьютексов. Это упрощение настолько существенно, что в программировании параллельных систем все более популярны становятся такие языки, как Haskell[9], где все функции чистые по умолчанию. В таком окружении нечистые функции, которые все же модифицируют разделяемые данные, отчетливо выделяются, поэтому становится проще рассуждать о том, как они укладываются в общую структуру приложения.

Но достоинства функционального программирования проявляются не только в языках, где эта парадигма применяется по умолчанию. С++ — мультипарадигменный язык, и на нем, безусловно, можно писать программы в стиле ФП. С появлением в С++11 лямбда-функций (см. приложение А, раздел А.6), включением шаблона

std::bind
из Boost и TR1 и добавлением автоматического выведения типа переменных (см. приложение А, раздел А.7) это стало даже проще, чем в С++98. Будущие результаты — это последний элемент из тех, что позволяют реализовать на С++ параллелизм в стиле ФП; благодаря передаче будущих результатов результат одного вычисления можно сделать зависящим от результата другого без явного доступа к разделяемым данным.

Быстрая сортировка в духе ФП

Чтобы продемонстрировать использование будущих результатов при написании параллельных программ в духе ФП, рассмотрим простую реализацию алгоритма быстрой сортировки Quicksort. Основная идея алгоритма проста: имея список значений, выбрать некий опорный элемент и разбить список на две части — в одну войдут элементы, меньшие опорного, в другую — большие или равные опорному. Отсортированный список получается путем сортировки обоих частей и объединения трех списков: отсортированного множества элементов, меньших опорного элемента, самого опорного элемента и отсортированного множества элементов, больших или равных опорному элементу. На рис. 4.2 показано, как этот алгоритм сортирует список из 10 целых чисел. В листинге ниже приведена последовательная реализация алгоритма в духе ФП; в ней список принимается и возвращается по значению, а не сортируется по месту в

std::sort()
.

Рис. 4.2. Рекурсивная сортировка в духе ФП


Листинг 4.12. Последовательная реализация Quicksort в духе ФП

template

std::list sequential_quick_sort(std::list input) {

 if (input.empty()) {

  return input;

 }

 std::list result;

 result.splice(result.begin(), input, input.begin());←
(1)


 T const& pivot = *result.begin();                   ←
(2)


 auto divide_point = std::partition(input.begin(), input.end(),

  [&](T const& t) { return t < pivot; });←
(3)


 std::list lower_part;

 lower_part.splice(

  lower_part.end(), input, input.begin(), divide_point); ←
(4)


 auto new_lower(

  sequential_quick_sort(std::move(lower_part)));         ←
(5)

 auto new_higher(

  sequential_quick_sort(std::move(input)));              ←
(6)


 result.splice(result.end(), new_higher);  ←
(7)

 result.splice(result.begin(), new_lower); ←
(8)


 return result;

}

Хотя интерфейс выдержан в духе ФП, прямое применение ФП привело бы к неоправданно большому числу операций копирования, поэтому внутри мы используем «обычный» императивный стиль. В качестве опорного мы выбираем первый элемент и отрезаем его от списка с помощью функции

splice()
(1). Потенциально это может привести к неоптимальной сортировке (в терминах количества операций сравнения и обмена), но любой другой подход при работе с
std::list
может существенно увеличить время за счет обхода списка. Мы знаем, что этот элемент должен войти в результат, поэтому можем сразу поместить его в список, где результат будет храниться. Далее мы хотим использовать этот элемент для сравнения, поэтому берем ссылку на него, чтобы избежать копирования (2). Теперь можно с помощью алгоритма
std::partition
разбить последовательность на две части: меньшие опорного элемента и не меньшие опорного элемента (3). Критерий разбиения проще всего задать с помощью лямбда-функции; мы запоминаем ссылку в замыкании, чтобы не копировать значение
pivot
(подробнее о лямбда-функциях см. в разделе А.5 приложения А).

Алгоритм

std::partition()
переупорядочивает список на месте и возвращает итератор, указывающий на первый элемент, который не меньше опорного значения. Полный тип итератора довольно длинный, поэтому мы используем спецификатор типа
auto
, чтобы компилятор вывел его самостоятельно (см. приложение А, раздел А.7).

Раз уж мы выбрали интерфейс в духе ФП, то для рекурсивной сортировки обеих «половин» нужно создать два списка. Для этого мы снова используем функцию

splice()
, чтобы переместить значения из списка
input
до
divide_point
включительно в новый список
lower_part
(4). После этого
input
будет со держать только оставшиеся значения. Далее оба списка можно отсортировать путем рекурсивных вызовов (5), (6). Применяя
std::move()
для передачи списков, мы избегаем копирования — результат в любом случае неявно перемещается. Наконец, мы еще раз вызываем
splice()
, чтобы собрать result в правильном порядке. Значения из списка
new_higher
попадают в конец списка (7), после опорного элемента, а значения из списка
new_lower
— в начало списка, до опорного элемента (8).

Параллельная реализация Quicksort в духе ФП

Раз уж мы все равно применили функциональный стиль программирования, можно без труда распараллелить этот код с помощью будущих результатов, как показано в листинге ниже. Набор операций тот же, что и раньше, только некоторые из них выполняются параллельно.


Листинг 4.13. Параллельная реализация Quicksort с применением будущих результатов

template

std::list parallel_quick_sort(std::list input) {

 if (input.empty()) {

  return input;

 }


 std::list result;

 result.splice(result.begin(), input, input.begin());

 T const& pivot = *result.begin();


 auto divide_point = std::partition(input.begin(), input.end(),

  [&](T const& t) {return t

 std::list lower_part;

 lower_part.splice(

  lower_part.end(), input, input.begin(), divide_point);


 std::future> new_lower( ←
(1)

  std::async(¶llel_quick_sort, std::move(lower_part)));


 auto new_higher(

  parallel_quick_sort(std::move(input))); ←
(2)


 result.splice(result.end(), new_higher); ←
(3)


 result.splice(result.begin(), new_lower.get()); ←
(4)

 return result;

}

Существенное изменение здесь заключается в том, что сортировка нижней части списка производится не в текущем, а в отдельном потоке — с помощью

std::async()
(1). Верхняя часть списка сортируется путем прямой рекурсии, как и раньше (2). Рекурсивно вызывая
parallel_quick_sort()
, мы можем задействовать доступный аппаратный параллелизм. Если
std::async()
создает новый поток при каждом обращении, то после трех уровней рекурсии мы получим восемь работающих потоков, а после 10 уровней (когда в списке примерно 1000 элементов) будет работать 1024 потока, если оборудование позволяет. Если библиотека решит, что запущено слишком много задач (быть может, потому что количество задач превысило уровень аппаратного параллелизма), то может перейти в режим синхронного запуска новых задач. Тогда новая задача будет работать в том же потоке, который обратился к
get()
, а не в новом, так что мы не будем нести издержки на передачу задачи новому потоку, если это не увеличивает производительность. Стоит отметить, что в соответствии со стандартом реализация
std::async
вправе как создавать новый поток для каждой задачи (даже при значительном превышении лимита), если явно не задан флаг
std::launch::deferred
, так и запускать все задачи синхронно, если явно не задан флаг
std::launch::async
. Рассчитывая, что библиотека сама позаботится об автоматическом масштабировании, изучите, что говорится на эту тему в документации, поставляемой вместе с библиотекой.

Можно не использовать

std::async()
, а написать свою функцию
spawn_task()
, которая будет служить оберткой вокруг
std::packaged_task
и
std::thread
, как показано в листинге 4.14; нужно создать объект
std::packaged_task
для хранения результата вызова функции, получить из него будущий результат, запустить задачу в отдельном потоке и вернуть будущий результат. Само по себе это не дает большого преимущества (и, скорее всего, приведёт к значительному превышению лимита), но пролагает дорогу к переходу на более хитроумную реализацию, которая помещает задачу в очередь, обслуживаемую пулом потоков. Рассмотрение пулов потоков мы отложим до главы 9. Но идти по такому пути вместо использования
std::async
имеет смысл только в том случае, когда вы точно знаете, что делаете, и хотите полностью контролировать, как пул потоков строится и выполняет задачи.

Но вернемся к функции

parallel_quick_sort
. Поскольку для получения
new_higher
мы применяли прямую рекурсию, то и срастить (splice) его можно на месте, как и раньше (3). Но
new_lower
теперь представляет собой не список, а объект
std::future>
, поэтому сначала нужно извлечь значение с помощью
get()
, а только потом вызывать
splice()
(4). Таким образом, мы дождемся завершения фоновой задачи, а затем переместим результат в параметр
splice()
; функция
get()
возвращает ссылку на r-значение — хранимый результат, следовательно, его можно переместить (подробнее о ссылках на r-значения и семантике перемещения см. в разделе А.1.1 приложения А).

Даже в предположении, что

std::async()
оптимально использует доступный аппаратный параллелизм, приведённая реализация Quicksort все равно не идеальна. Основная проблема в том, что
std::partition
делает много работы и остается последовательной операцией, но пока остановимся на этом. Если вас интересует максимально быстрая параллельная реализация, обратитесь к научной литературе.


Листинг 4.14. Простая реализация функции

spawn_task

template

std::future::type>

spawn_task(F&& f, A&& a) {

 typedef std::result_of::type result_type;

 std::packaged_task

 task(std::move(f)));

 std::future res(task.get_future());

 std::thread t(std::move(task), std::move(a));

 t.detach();

 return res;

}

Функциональное программирование — не единственная парадигма параллельного программирования, позволяющая избежать модификации разделяемых данных. Альтернативой является парадигма CSP (Communicating Sequential Processes — взаимодействующие последовательные процессы)[10], в которой потоки концептуально рассматриваются как полностью независимые сущности, без каких бы то ни было разделяемых данных, но соединенные коммуникационными каналами, по которым передаются сообщения. Эта парадигма положена в основу языка программирования Erlang (http://www.erlang.org/) и среды MPI (Message Passing Interface) (http://www.mpi-forum.org/), широко используемой для высокопроизводительных вычислений на С и С++. Уверен, что теперь вы не удивитесь, узнав, что и эту парадигму можно поддержать на С++, если соблюдать определенную дисциплину; в следующем разделе показано, как это можно сделать.

4.4.2. Синхронизация операций с помощью передачи сообщений

Идея CSP проста: если никаких разделяемых данных нет, то каждый поток можно рассматривать независимо от остальных, учитывая лишь его поведение в ответ на получаемые сообщения. Таким образом, поток по существу является конечным автоматом: получив сообщение, он как-то изменяет свое состояние, возможно, посылает одно или несколько сообщений другим потокам и выполняет то или иное вычисление, зависящее от начального состояния. Один из способов такого способа программирования потоков — формализовать это описание и реализовать модель конечного автомата, но этот путь не единственный — конечный автомат может неявно присутствовать в самой структуре приложения. Какой метод будет работать лучше в конкретном случае, зависит от требований к поведению приложения и от опыта разработчиков. Но каким бы образом ни был реализован поток, у разбиения на независимые процессы есть несомненное преимущество — потенциальное устранение многих сложностей, связанных с параллельным доступом к разделяемым данным, и, следовательно, упрощение программирования и снижение количества ошибок.

У настоящих последовательных взаимодействующих процессов вообще нет разделяемых данных, а весь обмен информацией производится через очереди сообщений. Но, поскольку в С++ потоки имеют общее адресное пространство, то обеспечить строгое соблюдение этого требования невозможно. Тут-то и приходит на выручку дисциплина: следить за тем, чтобы никакие данные не разделялись между потоками, — обязанность автора приложения или библиотеки. Разумеется, сами очереди сообщений должны разделяться, иначе потоки не смогут взаимодействовать, но детали этого механизма можно вынести в библиотеку. Представьте, что вам нужно написать программу для банкомата. Она должна поддерживать взаимодействие с человеком, который хочет снять деньги, с соответствующим банком, а также управлять оборудованием, которое принимает платёжную карту, выводит на экран сообщения, обрабатывает нажатия клавиш, выдает деньги и возвращает карту.

Чтобы воплотить все это в жизнь, можно было бы разбить код на три независимых потока: один будет управлять оборудованием, второй — реализовывать логику работы банкомата, а третий — обмениваться информацией с банком. Эти потоки могут взаимодействовать между собой посредством передачи сообщений, а не за счет разделения данных. Например, поток, управляющий оборудованием, будет посылать сообщение потоку логики банкомата о том, что человек вставил карту или нажал кнопку. Поток логики будет посылать потоку, управляющему оборудованием, сообщение о том, сколько денег выдать. И так далее.

Смоделировать логику банкомата можно, например, с помощью конечного автомата. В каждом состоянии поток ждет сообщение, которое затем обрабатывает. Это может привести к переходу в новое состояние, после чего цикл продолжится. На рис. 4.3 показаны состояния, присутствующие в простой реализации программы. Здесь система ждет, пока будет вставлена карта. Когда это произойдёт, система ждет, что пользователь введет свой ПИН-код, по одной цифре за раз. Последнюю введенную цифру пользователь может удалить. После того как будет введено нужное количество цифр, система проверяет ПИН-код. Если он введен неправильно, больше делать нечего — клиенту нужно вернуть карту и ждать, пока будет вставлена следующая карта. Если ПИН-код правильный, то система ждет либо отмены транзакции, либо выбора снимаемой суммы. Если пользователь отменил операцию, ему нужно вернуть карту и закончить работу. Если он выбрал сумму, то система ждет подтверждения от банка, а затем либо выдает наличные и возвращает карту, либо выводит сообщение «недостаточно средств на счете» и тоже возвращает карту. Понятно, что реальный банкомат гораздо сложнее, но и этого достаточно для иллюстрации идеи.

Рис. 4.3. Модель простого конечного автомата для банкомата

Спроектировав конечный автомат для реализации логики банкомата, мы можем оформить его в виде класса, в котором каждому состоянию соответствует функция-член. Каждая такая функция ждет поступления одного из допустимых сообщений, обрабатывает его и, возможно, инициирует переход в новое состояние. типы сообщений представлены структурами

struct
. В листинге 4.15 приведена часть простой реализации логики банкомата в такой системе — главный цикл и код первого состояния, в котором программа ожидает вставки карты.

Как видите, вся синхронизация, необходимая для передачи сообщений, целиком скрыта в библиотеке (ее простая реализация приведена в приложении С вместе с полным кодом этого примера).


Листинг 4.15. Простая реализация класса, описывающего логику работы банкомата

struct card_inserted {

 std::string account;

};


class atm {

 messaging::receiver incoming;

 messaging::sender bank;

 messaging::sender interface_hardware;

 void (atm::*state)();


 std::string account;

 std::string pin;

 void waiting_for_card() {                      ←
(1)

  interface_hardware.send(display_enter_card());←
(2)

  incoming.wait()                               ←
(3)

   .handle(

    [&](card_inserted const& msg) {             ←
(4)

    account = msg.account;

    pin = "";

    interface_hardware.send(display_enter_pin());

    state = &atm::getting_pin;

   }

  );

 }


 void getting_pin();


public:

 void run() {                     ←
(5)

  state = &atm::waiting_for_card; ←
(6)

  try {

   for(;;) {

    (this->*state)();             ←
(7)

   }

  }

  catch(messaging::close_queue const&) {}

 }

};

Мы уже говорили, что эта реализация неизмеримо проще логики работы реального банкомата, но она все же дает представление о программировании на основе передачи сообщений. Не нужно думать о проблемах параллельности и синхронизации, наша основная забота — понять, какие входные сообщения допустимы в данной точке и какие сообщения посылать в ответ. Конечный автомат, реализующий логику банкомата, работает в одном потоке, а прочие части системы, например интерфейс с банком и с терминалом, — в других потоках. Такой принцип проектирования программ называется моделью акторов — в системе есть несколько акторов (каждый работает в своем потоке), которые посылают друг другу сообщения с просьбой выполнить определённое задание, и никакого разделяемого состояния, помимо передаваемого в составе сообщений, не существует.

Выполнение начинается в функции-члене

run()
(5), которая устанавливает начальное состояние
waiting_for_card
(6), а затем в цикле вызывает функции-члены, представляющие текущее состояние (каким бы оно ни было) (7). Функции состояния — это просто функции-члены класса
atm
. Функция
waiting_for_card
(1) тоже не представляет сложности: она посылает сообщение интерфейсу с просьбой вывести сообщение «Вставьте карту» (2), а затем ожидает сообщения, которое могла бы обработать (3). Единственное допустимое в этой точке сообщение —
card_inserted
; оно обрабатывается лямбда-функцией (4). Функции
handle
можно передать любую функцию или объект-функцию, но в таком простом случае лямбда-функции вполне достаточно. Отметим, что вызов функции
handle()
сцеплен с вызовом
wait()
; если получено сообщение недопустимого типа, оно отбрасывается, и поток ждет, пока не придёт подходящее сообщение.

Сама лямбда-функция просто запоминает номер карточного счета в переменной-члене, очищает текущий ПИН-код и переходит в состояние «получение ПИН». По завершении обработчика сообщений функция состояния возвращает управление главному циклу, который вызывает функцию следующего состояния (7).

Функция состояния

getting_pin
несколько сложнее, потому что может обрабатывать сообщения разных типов, как следует из рис. 4.3. Ниже приведён ее код.


Листинг 4.16. Функция состояния

getting_pin
для простой реализации банкомата

void atm::getting_pin() {

 incoming.wait()

 .handle(     ←
(1)

  [&](digit_pressed const& msg) {

   unsigned const pin_length = 4;

   pin += msg.digit;

   if (pin.length() == pin_length) {

    bank.send(verify_pin(account, pin, incoming));

    state = &atm::verifying_pin;

   }

  }

 )

 .handle(←
(2)

  [&](clear_last_pressed const& msg) {

   if (!pin.empty()) {

    pin.resize(pin.length() - 1);

   }

  }

 )

 .handle(    ←
(3)

  [&](cancel_pressed const& msg) {

   state = &atm::done_processing;

  }

 );

}

Поскольку теперь допустимы сообщения трех типов, то с функцией

wait()
сцеплены три вызова функции
handle()
(1), (2), (3). В каждом вызове
handle()
в качестве параметра шаблона указан тип сообщения, а в качестве параметра самой функции — лямбда-функция, которая принимает сообщение этого типа. Поскольку вызовы сцеплены, функция
wait()
знает, что может ожидать сообщений
digit_pressed
,
clear_last_pressed
или
cancel_pressed
. Сообщения всех прочих типов игнорируются.

Как видим, теперь состояние изменяется не всегда. Например, при получении сообщения

digit_pressed
мы просто дописываем цифру в конец
pin
, если эта цифра не последняя. Затем главный цикл ((7) в листинге 4.15) снова вызовет функцию
getting_pin()
, чтобы ждать следующую цифру (или команду очистки либо отмены).

Это соответствует поведению, изображенному на рис. 4.3. Каждое состояние реализовано отдельной функцией-членом, которая ждет сообщений определенных типов и при необходимости обновляет состояние.

Как видите, такой стиль программирования может заметно упростить проектирование параллельной системы, поскольку все потоки рассматриваются как абсолютно независимые. Таким образом, мы имеем пример использования нескольких потоков для разделения обязанностей, а, значит, от нас требуется явно решить, как распределять между ними задачи.

4.5. Резюме