Параллельное программирование на С++ в действии — страница 29 из 53

В какой мере следует учитывать описанные выше факторы при проектировании, зависит от конкретной задачи. Для демонстрации мы рассмотрим реализацию параллельных версий трех функций из стандартной библиотеки С++. При этом у нас будет знакомая платформа, на которой можно изучать новые вещи. Попутно мы получим работоспособные версии функций, которые можно будет применить при распараллеливании более крупной программы.

Я ставил себе задачей продемонстрировать определенные приёмы, а не написать самый оптимальный код. Реализации, в которых лучше используется имеющееся оборудование, можно найти в академической литературе по параллельным алгоритмам или в специализированных многопоточных библиотеках типа Intel Threading Building Blocks[20].

Концептуально простейшим параллельным алгоритмом является параллельная версия

std::for_each
, с которой я и начну.

8.5.1. Параллельная реализация
std::for_each

Идея

std::for_each
проста — этот алгоритм вызывает предоставленную пользователем функцию для каждого элемента диапазона. Различие между параллельной и последовательной реализацией
std::for_each
заключается, прежде всего, в порядке вызовов функции. Стандартная версия
std::for_each
вызывает функцию сначала для первого элемента диапазона, затем для второго и так далее, тогда как параллельная версия не дает гарантий относительно порядка обработки элементов, они даже могут (и хочется надеяться, будут) обрабатываться параллельно.

Для реализации параллельной версии нужно всего лишь разбить диапазон на участки, которые будут обрабатываться каждым потоком. Количество элементов известно заранее, поэтому такое разбиение можно произвести до начала работы (см. раздел 8.1.1). Мы будем предполагать, что это единственная исполняемая параллельная задача, поэтому вычислить количество требуемых потоков можно с помощью функции

std::thread::hardware_concurrency()
. Мы также знаем, что элементы можно обрабатывать абсолютно независимо, поэтому для предотвращения ложного разделения (см. раздел 8.2.3) имеет смысл использовать соседние блоки.

По своей структуре этот алгоритм похож на параллельную версию

std::accumulate
, описанную в разделе 8.4.1, только вместо вычисления суммы элементов он применяет к ним заданную функцию. На первый взгляд, это должно бы существенно упростить код, потому что не нужно возвращать никакой результат. Но если мы собираемся передавать исключения вызывающей программе, то все равно придется воспользоваться механизмами
std::packaged_task
и
std::future
, чтобы передавать исключения из одного потока в другой. Ниже приведен пример реализации.


Листинг 8.7. Параллельная реализация

std::for_each

template

void parallel_for_each(Iterator first, Iterator last, Func f) {

 unsigned long const length = std::distance(first, last);


 if (!length)

  return;


 unsigned long const min_per_thread = 25;

 unsigned long const max_threads =

  (length + min_per_thread - 1) / min_per_thread;


 unsigned long const hardware_threads =

  std::thread::hardware_concurrency();


 unsigned long const num_threads =

  std::min(

   hardware_threads != 0 ? hardware_threads : 2, max_threads);


 unsigned long const block_size = length / num_threads;


 std::vector> futures(num_threads - 1); ←
(1)

 std::vector threads(num_threads – 1);

 join_threads joiner(threads);


 Iterator block_start = first;

 for (unsigned long i = 0; i < (num_threads - 1); ++i) {

  Iterator block_end = block_start;

  std::advance(block_end, block_size);

  std::packaged_task task( ←
(2)

   [=]() {

    std::for_each(block_start, block_end, f);

   });

  futures[i] = task.get_future();

  threads[i] = std::thread(std::move(task)); ←
(3)

  block_start = block_end;

 }

 std::for_each(block_start, last, f);

 for (unsigned long i = 0; i < (num_threads - 1); ++i) {

  futures[i].get(); ←
(4)

 }

}

Структурно код ничем не отличается от приведенного в листинге 8.4, что и неудивительно. Основное различие состоит в том, что в векторе futures хранятся объекты

std::future
(1), потому что рабочие потоки не возвращают значение, а в качестве задачи мы используем простую лямбда-функцию, которая вызывает функцию
f
для элементов из диапазона от
block_start
до
block_end
(2). Это позволяет не передавать конструктору потока (3) диапазон. Поскольку рабочие потоки ничего не возвращают, обращения к
futures[i].get()
(4) служат только для получения исключений, возникших в рабочих потоках; если мы не хотим передавать исключения, то эти обращения можно вообще опустить.

Реализацию

parallel_for_each
можно упростить, воспользовавшись
std::async
, — точно так же, как мы делали при распараллеливании
std::accumulate
.


Листинг 8.8. Параллельная реализация

std::for_each
с применением
std::async

template

void parallel_for_each(Iterator first, Iterator last, Func f) {

 unsigned long const length = std::distance(first, last);


 if (!length)

  return;


 unsigned long const min_per_thread = 25;


 if (length < (2 * min_per_thread)) {

  std::for_each(first, last, f); ←
(1)

 } else {

  Iterator const mid_point = first + length / 2;

  std::future first_half = ←
(2)

   std::async(¶llel_for_each,

   first, mid_point, f);

  parallel_for_each(mid_point, last, f); ←
(3)

  first_half.get();                      ←
(4)

 }

}

Как и в случае реализации

parallel_accumulate
с помощью
std::async
в листинге 8.5, мы разбиваем данные рекурсивно в процессе выполнения, а не заранее, потому что не знаем, сколько потоков задействует библиотека. На каждом шаге данные делятся пополам, пока их не останется слишком мало для дальнейшего деления. При этом одна половина обрабатывается асинхронно (2), а вторая — непосредственно (3). Когда дальнейшее деление становится нецелесообразным, вызывается
std::for_each
(1). И снова использование
std::async
и функции-члена
get()
объекта
std::future
(4) обеспечивает семантику распространения исключения.

Теперь перейдем от алгоритмов, которые выполняют одну и ту же операцию над каждым элементом (к их числу относятся также

std::count
и
std::replace
), к чуть более сложному случаю —
std::find
.

8.5.2. Параллельная реализация
std::find

Далее будет полезно рассмотреть алгоритм

std::find
, потому что это один из нескольких алгоритмов, которые могут завершаться еще до того, как обработаны все элементы. Например, если уже первый элемент в диапазоне отвечает условию поиска, то рассматривать остальные не имеет смысла. Как мы скоро увидим, это свойство существенно для повышения производительности, и от него напрямую зависит структура параллельной реализации. На этом примере мы продемонстрируем, как порядок доступа к данным может оказать влияние на проектирование программы (см. раздел 8.3.2). К той же категории относятся алгоритмы
std::equal
и
std::any_of
.

Если вы вместе с женой или другом ищете какую-нибудь старую фотографию в сваленных на чердаке альбомах, то вряд ли захотите, чтобы они продолжали перелистывать страницы, когда вы уже нашли то, что нужно. Наверное, вы просто сообщите им, что искомое найдено (быть может, крикнув «Есть!»), чтобы они могли прекратить поиски и заняться чем-нибудь другим. Но многие алгоритмы по природе своей должны обработать каждый элемент и, стало быть, не имеют эквивалента восклицанию «Есть!». Для алгоритмов типа

std::find
умение «досрочно» прекращать работу — важное свойство, которое нельзя игнорировать. И, следовательно, его нужно учитывать при проектировании кода — закладывать возможность прерывания других задач, когда ответ уже известен, чтобы программа не ждала, пока прочие рабочие потоки обработают оставшиеся элементы.

Без прерывания других потоков последовательная версия может работать быстрее параллельной, потому что прекратит поиск, как только будет найден нужный элемент. Если, например, система поддерживает четыре параллельных потока, то каждый из них должен будет просмотреть четверть полного диапазона, поэтому при наивном распараллеливании каждый поток потратит на просмотр своих элементов четверть всего времени. Если искомый элемент окажется в первой четверти диапазона, то последовательный алгоритм завершится раньше, так как не должен будет просматривать оставшиеся элементы.

Один из способов прервать другие потоки — воспользоваться атомарной переменной в качестве флага и проверять его после обработки каждого элемента. Если флаг поднят, то один из потоков уже нашел нужный элемент, поэтому можно прекращать обработку. При таком способе прерывания мы не обязаны обрабатывать все элементы, и, значит, параллельная версия чаще будет обгонять последовательную. Недостаток состоит в том, что загрузка атомарной переменной — медленная операция, которая тормозит работу каждого потока.

Мы уже знаем о двух способах возврата значений и распространения исключений. Можно использовать массив будущих результатов и объекты

std::packaged_task
для передачи значений и исключений, после чего обработать частичные результаты в главном потоке. Или с помощью
std::promise
устанавливать окончательный результат прямо в рабочем потоке. Все зависит от того, как мы хотим обрабатывать исключения, возникающие в рабочих потоках. Если требуется остановиться при первом возникновении исключения (несмотря на то, что обработаны не все элементы), то можно использовать
std::promise
для передачи значения и исключения. С другой стороны, если мы хотим, чтобы рабочие потоки продолжали поиск, то используем
std::packaged_task
, сохраняем все исключения, а затем повторно возбуждаем одно из них, если искомый элемент не найден.

В данном случае я остановился на

std::promise
, потому что такое поведение больше походит на поведение
std::find
. Надо только не забыть о случае, когда искомого элемента в указанном диапазоне нет. Поэтому необходимо дождаться завершения всех потоков перед тем, как получать значение из будущего результата. Если просто блокировать исполнение при обращении к
get()
, то при условии, что искомого элемента нет, мы будем ждать вечно. Получившийся код приведён ниже.


Листинг 8.9. Параллельная реализация алгоритма

find()

template

Iterator parallel_find(Iterator first, Iterator last,

 MatchType match) {

 struct find_element { ←
(1)

  void operator()(Iterator begin, Iterator end,

   MatchType match,

   std::promise* result,

   std::atomic* done_flag) {

   try {

    for(; (begin != end) && !done_flag->load(); ++begin) {←
(2)

     if (*begin == match) {

      result->set_value(begin);←
(3)

      done_flag->store(true);  ←
(4)

      return;

     }

    }

   } catch (...) { ←
(5)

    try {

     result->set_exception(std::current_exception());←
(6)

     done_flag->store(true);

    } catch (...) ←
(7)

    {}

   }

  }

 };


 unsigned long const length = std::distance(first, last);


 if (!length)

  return last;


 unsigned long const min_per_thread = 25;

 unsigned long const max_threads =

  (length + min_per_thread — 1) / min_per_thread;


 unsigned long const hardware_threads =

  std::thread::hardware_concurrency();


 unsigned long const num_threads =

  std::min(

   hardware_threads != 0 ? hardware_threads : 2, max_threads);


 unsigned long const block_size = length / num_threads;


 std::promise result;     ←
(8)

 std::atomic done_flag(false);←
(9)

 std::vector threads(num_threads — 1); {←
(10)

  join_threads joiner(threads);


 Iterator block_start = first;

 for (unsigned long i = 0; i < (num_threads - 1); ++i) {

  Iterator block_end = block_start;

  std::advance(block_end, block_size);

  threads[i] = std::thread(find_element(), ←
(11)

   block_start, block_end, match,

&result, &done_flag);

  block_start = block_end;

 }

 find_element()(

  block_start, last, match, &result, &done_flag);←
(12)

 if (!done_flag.load()) { ←
(13)

  return last;

 }

 return result.get_future().get() 
(14)

}

В основе своей код в листинге 8.9 похож на предыдущие примеры. На этот раз вся работа производится в операторе вызова, определенном в локальном классе

find_element
(1). Здесь мы в цикле обходим элементы из назначенного потоку блока, проверяя флаг на каждой итерации (2). Если искомый элемент найден, то мы записываем окончательный результат в объект-обещание (3) и перед возвратом устанавливаем флаг
done_flag
(4).

Если было возбуждено исключение, то его перехватит универсальный обработчик (5) и попытается сохранить исключение в обещании (6) перед установкой

done_flag
. Но установка значения объекта-обещания может возбудить исключение, если значение уже установлено, поэтому мы перехватываем и игнорируем любые возникающие здесь исключения (7).

Это означает, что если поток, вызвавший

find_element
, найдет искомый элемент или возбудит исключение, то все остальные потоки увидят поднятый флаг
done_flag
и прекратят работу. Если несколько потоков одновременно найдут искомое или возбудят исключение, то возникнет гонка за установку результата в обещании. Но это безобидная гонка: победивший поток считается «первым», и установленный им результат приемлем.

В самой функции

parallel_find
мы определяем обещание (8) и флаг прекращения поиска (9), которые передаем новым потокам вместе с диапазоном для просмотра (11). Кроме того, главный поток пользуется классом
find_element
для поиска среди оставшихся элементов (12). Как уже отмечалось, мы должны дождаться завершения всех потоков перед тем, как проверять результат, потому что искомого элемента может вообще не оказаться. Для этого мы заключаем код запуска и присоединения потоков в блок (10), так что к моменту проверки флага (13) все потоки гарантировано присоединены. Если элемент был найден, то, обратившись к функции
get()
объекта
std::future
, мы либо получим результат из обещания, либо возбудим сохраненное исключение.

Как и раньше, в этой реализации предполагается, что мы собираемся использовать все доступные аппаратные потоки или располагаем каким-то механизмом, который позволит заранее определить количество потоков для предварительного разделения между ними работы. И снова мы можем упростить код, воспользовавшись функцией

std::async
и рекурсивным разбиением данных, если готовы принять автоматический механизм масштабирования, скрытый в стандартной библиотеке С++. Реализация
parallel_find
с применением
std::async
приведена в листинге ниже.


Листинг 8.10. Параллельная реализация алгоритма

find()
с применением
std::async

template
(1)

Iterator parallel_find_impl(Iterator first, Iterator last,

 MatchType match,

 std::atomic& done) {

 try {

  unsigned long const length = std::distance(first, last);

  unsigned long const min_per_thread = 25;          ←
(2)

  if (length < (2 * min_per_thread)) {              ←
(3)

   for(; (first != last) && !done.load(); ++first) {←
(4)

    if (*first == match) {

     done = true;                                   ←
(5)

     return first;

    }

   }

   return last; ←
(6)

  } else {

   Iterator const mid_point = first + (length / 2);    ←
(7)

   std::future async_result =

    std::async(¶llel_find_impl,←
(8)

   mid_point, last, match, std::ref(done));

   Iterator const direct_result =

    parallel_find_impl(first, mid_point, match, done);  ←
(9)

   return (direct_result == mid_point) ?

    async_result.get() : direct_result; ←
(10)

  }

 } catch (...) {

  done = true; ←
(11)

  throw;

 }

}


template

Iterator parallel_find(

 Iterator first, Iterator last, MatchType match) {

 std::atomic done(false);

 return parallel_find_impl(first, last, match, done); ←
(12)

}

Желание закончить поиск досрочно при обнаружении совпадения заставило нас ввести флаг, разделяемый между всеми потоками. Этот флаг, следовательно, нужно передавать во все рекурсивные вызовы. Проще всего сделать это, делегировав работу отдельной функции (1), которая принимает дополнительный параметр — ссылку на флаг

done
, передаваемый из главной точки входа (12).

Основная же ветвь кода не таит никаких неожиданностей. Как и во многих предыдущих реализациях, мы задаем минимальное количество элементов, обрабатываемых в одном потоке (2); если размер обеих половин диапазона меньше этой величины, то весь диапазон обрабатывается в текущем потоке (3). Собственно алгоритм сводится к простому циклу — он продолжается, пока не будет достигнут конец заданного диапазона или не установлен флаг

done
(4). При обнаружении совпадения мы устанавливаем флаг
done
и выходим из функции (5). Если мы дошли до конца списка или вышли из цикла, потому что другой поток установил флаг
done
, то возвращаем значение
last
, означающее, что совпадение не найдено (6).

Если диапазон можно разбивать, то мы сначала находим среднюю точку (7), а потом через

std::async
запускаем поиск во второй половине диапазона (8), не забыв передать ссылку на флаг
done
с помощью
std::ref
. Одновременно мы просматриваем первую половину диапазона, рекурсивно вызывая себя же (9). И асинхронный, и рекурсивный вызов могут разбивать диапазон и дальше, если он достаточно велик.

Если прямой рекурсивный вызов вернул

mid_point
, значит, он не нашел совпадения, поэтому нужно получить результат асинхронного поиска. Если и в той половине ничего не было найдено, то мы получим
last
(10). Если «асинхронный» вызов на самом деле был не асинхронным, а отложенным, то выполняться он начнет именно при обращении к
get()
; в таком случае поиск во второй половине списке вообще не будет производиться, если поиск в первой оказался успешным. Если же асинхронный поиск действительно выполнялся в другом потоке, то деструктор переменной
async_result
будет ждать завершения этого потока, поэтому утечки потоков не произойдет.

Как и раньше, применение

std::async
гарантирует безопасность относительно исключений и распространения исключений вверх по стеку вызовов. Если прямой рекурсивный вызов возбудит исключение, то деструктор будущего результата позаботится о том, чтобы поток, в котором работал асинхронный поиск, завершился до возврата из функции. Если исключение возбудит асинхронный вызов, то оно распространится вверх при вызове
get()
(10). Внешний блок
try/catch
нужен только для того, чтобы установить флаг
done
и обеспечить тем самым быстрое завершение всех потоков в случае исключения (11). Программа правильно работала бы и без этого, по продолжала бы сравнивать элементы до естественного завершения всех потоков.

Существенной особенностью обеих реализаций этого алгоритма (характерной и для других рассмотренных выше параллельных алгоритмов) является тот факт, что элементы могут обрабатываться не в том же порядке, что в стандартном алгоритме

std::find
. Это важный момент при распараллеливании любого алгоритма. Если порядок имеет значение, то обрабатывать элементы параллельно нельзя. В таких алгоритмах, как
parallel_for_each
, порядок обработки независимых элементов не важен, однако
parallel_find
может вернуть элемент, найденный где-то в конце диапазона, хотя имеется другой такой же элемент в начале. Это может оказаться неприятной неожиданностью.

Итак, нам удалось распараллелить

std::find
. В начале этого раздела я говорил, что существуют и другие алгоритмы, которые могут завершаться раньше, чем будут обработаны все элементы. К ним применима такая же техника. В главе 9 мы еще вернёмся к вопросу о прерывании потоков.

В последнем из трех примеров мы направимся в другую сторону и рассмотрим алгоритм

std::partial_sum
. Он не очень широко известен, но интересен с точки зрения распараллеливания, поскольку позволяет проиллюстрировать некоторые дополнительные проектные решения.

8.5.3. Параллельная реализация
std::partial_sum

Алгоритм

std::partial_sum
вычисляет частичные суммы по диапазону, то есть каждый элемент заменяется суммой всех элементов от начала диапазона до него самого включительно. Таким образом, последовательность 1, 2, 3, 4, 5 преобразуется в 1, (1+2)=3, (1+2+3)=6, (1+2+3+4)=10, (1+2+3+4+5)=15. С точки зрения распараллеливания этот алгоритм интересен тем, что невозможно разбить диапазон на части и обрабатывать каждый блок независимо. Действительно, значение первого элемента необходимо складывать с каждым из остальных элементов, так что независимой обработки не получается.

Один из возможных подходов — вычислить частичные суммы отдельных блоков, а затем прибавить полученное значение последнего элемента в первом блоке ко всем элементам в следующем блоке и так далее. Например, если исходную последовательность 1, 2, 3, 4, 5, 6, 7, 8, 9 разбить на три равных блока, то после первого прохода получатся блоки {1, 3, 6}, {4, 9, 15}, {7, 15, 24}. Если теперь прибавить 6 (значение последнего элемента в первом блоке) ко всем элементам второго блока, то получится {1, 3, 6}, {10, 15, 21}, {7, 15, 24}. Далее прибавляем последний элемент второго блока (21) ко всем элементам третьего блока и получаем окончательный результат: {1, 3, 6}, {10, 15, 21}, {28, 36, 55}.

Процедуру прибавления частичной суммы предыдущего блока ко всем элементам следующего также можно распараллелить. Если сначала изменить последний элемент блока, то оставшиеся элементы того же блока могут модифицироваться одним потоком, тогда как другой поток одновременно будет модифицировать следующий блок. И так далее. Такое решение хорошо работает, если количество элементов в списке намного превышает количество процессорных ядер, поскольку в этом случае у каждого ядра будет достаточно элементов для обработки на каждом этапе.

Если же процессорных ядер очень много (столько же, сколько элементов в списке, или больше), то описанный подход оказывается не столь эффективен. Если разбить всю работу между процессорами, то на первом шаге каждый процессор будет работать всего с двумя элементами. Но тогда на этапе распространения результатов большинство процессоров будут ждать, и хорошо бы их чем-то запять. В таком случае можно подойти к задаче по-другому. Вместо того чтобы сначала вычислять частичные суммы всех блоков, а затем распространять их от предыдущего к следующему, мы можем распространять суммы по частям. Сначала, как и раньше, вычисляем суммы соседних элементов. На следующем шаге каждое вычисленное значение прибавляется к элементу, отстоящему от него на расстояние 2. Затем новые значения прибавляются к элементам, отстоящим на расстояние 4, и так далее. Если начать с тех же самых девяти элементов, то после первого шага мы получим последовательность 1, 3, 5, 7, 9, 11, 13, 15, 17, в которой правильно вычислены первые два элемента. После второго шага получается последовательность 1, 3, 6, 10, 14, 18, 22, 26, 30, в которой правильны первые четыре элемента. После третьего мы получим последовательность 1, 3, 6, 10, 15, 21, 28, 36, 44, в которой правильны первые восемь элементов. И после четвертого шага получаем окончательный результат 1, 3, 6, 10, 15, 21, 28, 36, 45. Общее количество шагов здесь больше, чем в первом варианте, зато и возможности для распараллеливания на большое число процессоров шире — на каждом шаге каждый процессор может обновлять одно число.

Всего во втором варианте требуется выполнить log2(N) шагов по N операций на каждом шаге (по одной на процессор), где N — число элементов в списке. В первом же варианте каждый поток производит N/k операций для вычисления частичной суммы своего блока и еще N/k операций для распространения сумм (здесь k — число потоков). Следовательно, вычислительная сложность первого варианта в терминах количества операций составляет O(N), а второго — O(N log(N)). Однако, если процессоров столько же, сколько элементов в списке, то во втором варианте требуется произвести только log(N) операций на каждом процессоре, тогда как в первом при большом k операции из-за распространения частичных сумм вперед фактически сериализуются. Таким образом, если количество процессоров невелико, то первый алгоритм завершится быстрее, тогда как в массивно параллельных системах победителем окажется второй алгоритм. Это крайнее проявление феномена, обсуждавшегося в разделе 8.2.1.

Но оставим в стороне эффективность и перейдем к коду. В листинге 8.11 приведена реализация первого подхода.


Листинг 8.11. Параллельное вычисление частичных сумм путём разбиения задачи на части

template

void parallel_partial_sum(Iterator first, Iterator last) {

 typedef typename Iterator::value_type value_type;


 struct process_chunk { ←
(1)

  void operator()(Iterator begin, Iterator last,

   std::future* previous_end_value,

   std::promise* end_value) {

   try {

    Iterator end = last;

    ++end;

    std::partial_sum(begin, end, begin); ←
(2)

    if (previous_end_value) {            ←
(3)

     value_type& addend = previous_end_value->get();←
(4)

     *last += addend;              ←
(5)

     if (end_value) {

      end_value->set_value(*last); ←
(6)

     }

     std::for_each(begin, last, [addend](value_type& item) {←
(7)

      item += addend;

     });

    } else if (end_value) {

     end_value->set_value(*last); ←
(8)

    }

   } catch (...) { ←
(9)

    if (end_value) {

     end_value->set_exception(std::current_exception()); ←
(10)

    } else {

     throw; ←
(11)

    }

   }

  }

 };


 unsigned long const length = std::distance(first, last);


 if (!length)

  return last;


 unsigned long const min_per_thread = 25; ←
(12)


 unsigned long const max_threads =

  (length + min_per_thread - 1) / min_per_thread;


 unsigned long const hardware_threads =

  std::thread::hardware_concurrency();


 unsigned long const num_threads =

  std::min(

   hardware_threads!= 0 ? hardware_threads : 2, max_threads);


 unsigned long const block_size = length / num_threads;


 typedef typename Iterator::value_type value_type;

 std::vector threads(num_threads - 1);←
(13)

 std::vector>

  end_values(num_threads - 1);                     ←
(14)

 std::vector>

  previous_end_values;                             ←
(15)

 previous_end_values.reserve(num_threads - 1);     ←
(16)

 join_threads joiner(threads);

 Iterator block_start = first;

 for (unsigned long i = 0; i < (num_threads - 1); ++i) {

  Iterator block_last = block_start;

  std::advance(block_last, block_size – 1); ←
(17)

  threads[i] = std::thread(process_chunk(), ←
(18)

   block_start, block_last,

   (i !=0 ) ? &previous_end_values[i - 1] : 0, &end_values[i]);

  block_start = block_last;

  ++block_start; ←
(19)

  previous_end_values.push_back(end_values[i].get_future());←
(20)

 }

 Iterator final_element = block_start;

 std::advance(

  final_element, std::distance(block_start, last) - 1);←
(21)

  process_chunk()(block_start, final_element, ←
(22)

  (num_threads > 1) ? &previous_end_values.back() : 0, 0);

}

Общая структура этого кода не отличается от рассмотренных ранее алгоритмов: разбиение задачи на блоки с указанием минимального размера блока, обрабатываемого одним потоком (12). В данном случае, помимо вектора потоков (13), мы завели вектор обещаний (14), в котором будут храниться значения последних элементов в каждом блоке, и вектор будущих результатов (15), используемый для получения последнего значения из предыдущего блока. Так как мы знаем, сколько всего понадобится будущих результатов, то можем заранее зарезервировать для них место в векторе (16), чтобы избежать перераспределения памяти при запуске потоков.

Главный цикл выглядит так же, как раньше, только на этот раз нам нужен итератор, который указывает на последний элемент в блоке (17), а не на элемент за последним, чтобы можно было распространить последние элементы поддиапазонов. Собственно обработка производится в объекте-функции

process_chunk
, который мы рассмотрим ниже; в качестве аргументов передаются итераторы, указывающие на начало и конец блока, а также будущий результат, в котором будет храниться последнее значение из предыдущего диапазона (если таковой существует), и объект-обещание для хранения последнего значения в текущем диапазоне (18).

Запустив поток, мы можем обновить итератор, указывающий на начало блока, не забыв продвинуть его на одну позицию вперед (за последний элемент предыдущего блока) (19), и поместить будущий результат, в котором будет храниться последнее значение в текущем блоке, в вектор будущих результатов, откуда он будет извлечён на следующей итерации цикла (20).

Перед тем как обрабатывать последний блок, мы должны получить итератор, указывающий на последний элемент (21), который передается в

process_chunk
(22). Алгоритм
std::partial_sum
не возвращает значения, поэтому по завершении обработки последнего блока больше ничего делать не надо. Можно считать, что операция закончилась, когда все потоки завершили выполнение.

Теперь настало время поближе познакомиться с объектом-функцией

process_chunk
, который собственно и делает всю содержательную работу (1). Сначала вызывается
std::partial_sum
для всего блока, включая последний элемент (2), но затем нам нужно узнать, первый это блок или нет (3). Если это не первый блок, то должно быть вычислено значение
previous_end_value
для предыдущего блока, поэтому нам придется его подождать (4). Чтобы добиться максимального распараллеливания, мы затем сразу же обновляем последний элемент (5), так чтобы это значение можно было передать дальше, в следующий блок (если таковой имеется) (6). Сделав это, мы можем с помощью
std::for_each
и простой лямбда-функции (7) обновить остальные элементы диапазона.

Если

previous_end_value
не существует, то это первый блок, поэтому достаточно обновить
end_value
для следующего блока (опять же, если таковой существует, — может случиться, что блок всего один) (8).

Наконец, если какая-то операция возбуждает исключение, мы перехватываем его (9) и сохраняем в объекте-обещании (10), чтобы оно было распространено в следующий блок, когда он попытается получить последнее значение из предыдущего блока (4). В результате все исключения доходят до последнего блока, где и возбуждаются повторно (11), поскольку в этой точке мы гарантированно работаем в главном потоке.

Из-за необходимости синхронизации между потоками этот код не получится переписать под

std::async
. Каждая задача ждет результатов, которые становятся доступны по ходу выполнения других задач, поэтому все задачи должны работать параллельно.

Разобравшись с решением, основанным на распространении результатов обработки предыдущих блоков, посмотрим на второй из описанных алгоритмов вычисления частичных сумм.

Реализация прогрессивно-попарного алгоритма вычисления частичных сумм

Второй способ вычисления частичных сумм, основанный на сложении элементов, расположенных на все большем расстоянии друг от друга, работает лучше всего, когда процессоры могут выполнять операции сложения синхронно. В таком случае никакой дополнительной синхронизации не требуется, потому что все промежуточные результаты можно сразу распространить на следующий нуждающийся в них процессор. Но на практике такие системы встречаются редко — разве что в случаях, когда один процессор может выполнять одну и ту же команду над несколькими элементами данных одновременно с помощью так называемых SIMD-команд (Single-Instruction/Multiple-Data — одиночный поток команд, множественный поток данных). Поэтому мы должны проектировать программу в расчете на общий случай и производить явную синхронизацию на каждом шаге.

Сделать это можно, например, с помощью барьера — механизма синхронизации, который заставляет потоки ждать, пока указанное количество потоков достигнет барьера. Когда все потоки подошли к барьеру, они разблокируются и могут продолжать выполнение. В библиотеке C++11 Thread Library готовая реализация барьера отсутствует, так что нам придется написать свою собственную.

Представьте себе американские горки в парке аттракционов. Если желающих покататься достаточно, то смотритель не запустит состав, пока не будут заняты все места. Барьер работает точно так же: вы заранее задаете число «мест», и потоки будут ждать, пока все «места» заполнятся. Когда ожидающих потоков собирается достаточно, они все получают возможность продолжить выполнение; барьер при этом сбрасывается и начинает ждать следующую партию потоков. Часто такая конструкция встречается в цикле, где на каждой итерации у барьера собираются одни и те же потоки. Идея в том, чтобы все потоки «шли в ногу» — никто не должен забегать вперед. В рассматриваемом алгоритме такой «поток-торопыга» недопустим, потому что он мог бы модифицировать данные, которые еще используются другими потоками, или, наоборот, сам попытался бы использовать еще не модифицированные должным образом данные.

В следующем листинге приведена простая реализация барьера.


Листинг 8.12. Простой класс барьера

class barrier {

 unsigned const count;

 std::atomic spaces;

 std::atomic generation;

public:

 explicit barrier (unsigned count_) : ←
(1)

 count(count_), spaces(count), generation(0) {}


 void wait() {

  unsigned const my_generation = generation; ←
(2)

  if (!--spaces) {←
(3)

   spaces = count;←
(4)

   ++generation;  ←
(5)

  } else {

   while (generation == my_generation) ←
(6)

   std::this_thread::yield();          ←
(7)

  }

 }

};

Здесь при конструировании барьера мы указываем количество «мест» (1), которое сохраняется в переменной count. В начале количество мест у барьера spaces равно

count
. Когда очередной поток выполняет функцию
wait()
, значение
spaces
уменьшается на единицу (3). Как только оно обращается в нуль, количество мест возвращается к исходному значению
count
(4), а переменная
generation
увеличивается, что служит для других потоков сигналом к продолжению работы (5). Если число свободных мест больше нуля, то поток должен ждать. В этой реализации используется простой спинлок (6), который сравнивает текущее значение
generation
с тем, что было запомнено в начале
wait()
(2). Поскольку
generation
изменяется только после того, как все потоки подошли к барьеру (5), мы можем во время ожидания уступить процессор с помощью
yield()
(7), чтобы ожидающий поток не потреблял процессорное время.

Эта реализация действительно очень простая — в ней для ожидания используется спинлок, поэтому она не идеальна для случая, когда потоки могут ждать долго, и совсем не годится в ситуации, когда в каждый момент времени может существовать более count потоков, выполнивших

wait()
. Если требуется, чтобы и в этих случаях барьер работал правильно, то следует использовать более надежную (но и более сложную) реализацию. Кроме того, я ограничился последовательно согласованными операциями над атомарными переменными, потому что они проще для понимания, но вы, возможно, захотите ослабить ограничения на порядок доступа к памяти. Такая глобальная синхронизация дорого обходится в массивно параллельных архитектурах, так как строку кэша, содержащую состояние барьера, приходится передавать всем участвующим процессорам (см. обсуждение перебрасывания кэша в разделе 8.2.2). Поэтому нужно тщательно продумать, является ли это решение оптимальным.

Как бы то ни было, барьер — это именно то, что нам необходимо в данном случае; имеется фиксированное число потоков, которые должны синхронизироваться на каждой итерации цикла. Ну почти фиксированное. Как вы, наверное, помните, элементы, расположенные близко к началу списка, получают окончательные значения всего через пару шагов. Это означает, что либо все потоки должны крутиться в цикле, пока не будет обработан весь диапазон, либо барьер должен поддерживать выпадение потоков, уменьшая значение

count
. Я предпочитаю второй вариант, чтобы не заставлять потоки выполнять бессмысленную работу в ожидании завершения последнего шага.

Но тогда нужно сделать

count
атомарной переменной, чтобы ее можно было изменять из нескольких потоков без внешней синхронизации:

std::atomic count;

Инициализация не меняется, но при переустановке

spaces
теперь нужно явно загружать
spaces
с помощью операции
load()
:

spaces = count.load();

Больше никаких изменений в

wait()
вносить не надо, но необходима новая функция-член для уменьшения count. Назовем ее
done_waiting()
, потому что с ее помощью поток заявляет, что больше ждать не будет.

void done_waiting() {

 --count;               ←
(1)

 if (!--spaces) {       ←
(2)

  spaces = count.load();←
(3)

  ++generation;

 }

}

Прежде всего мы уменьшаем

count
(1)
, чтобы при следующей переустановке
spaces
было отражено новое, меньшее прежнего, количество ожидающих потоков. Затем уменьшаем количество свободных мест
spaces
(2). Если этого не сделать, то остальные потоки будут ждать вечно, так как при инициализации
spaces
было задано старое, большее, значение. Если текущий поток является последним в партии, то количество мест нужно переустановить и увеличить
generation
на единицу (3) — так же, как в
wait()
. Основное отличие от
wait()
заключается в том, что поток не должен ждать — он же сам объявляет, что больше ждать не будет!

Теперь мы готовы написать вторую реализацию алгоритма вычисления частичных сумм. На каждом шаге каждый поток вызывает функцию

wait()
барьера, чтобы все потоки пересекли его вместе, а, закончив работу, поток вызывает функцию
done_waiting()
, чтобы уменьшить счетчик. Если наряду с исходным диапазоном использовать второй буфер, то барьер обеспечивает необходимую синхронизацию. На каждом шаге потоки либо читают исходный диапазон и записывают новое значение в буфер, либо наоборот — читают буфер и записывают новое значение в исходный диапазон. На следующем шаге исходный диапазон и буфер меняются местами. Тем самым мы гарантируем отсутствие гонки между операциями чтения и записи в разных потоках. Перед выходом из цикла каждый поток должен позаботиться о записи окончательного значения в исходный диапазон. В листинге 8.13 все это собрано воедино.


Листинг 8.13. Параллельная реализация

partial_sum
методом попарных обновлений

struct barrier {

 std::atomic count;

 std::atomic spaces;

 std::atomic generation;


 barrier(unsigned count_) :

  count(count_), spaces(count_), generation(0) {}


 void wait() {

  unsigned const gen = generation.load();

  if (!--spaces) {

   spaces = count.load();

   ++generation;

  } else {

   while (generation.load() == gen) {

    std::this_thread::yield();

   }

  }

 }


 void done_waiting() {

  --count;

  if (!--spaces) {

   spaces = count.load();

   ++generation;

  }

 }

};


template

void parallel_partial_sum(Iterator first, Iterator last) {

 typedef typename Iterator::value_type value_type;


 struct process_element { ←
(1)

  void operator()(Iterator first, Iterator last,

   std::vector& buffer,

   unsigned i, barrier& b) {

   value_type& ith_element = *(first + i);

   bool update_source = false;


   for (unsigned step = 0, stride = 1;

        stride <= i; ++step, stride *= 2) {

    value_type const& source = (step % 2) ? ←
(2)

     buffer[i] : ith_element;

    value_type& dest = (step % 2) ?

     ith_element:buffer[i];

    value_type const& addend = (step % 2) ? ←
(3)

     buffer[i - stride] : *(first + i – stride);


    dest = source + addend; ←
(4)

    update_source = !(step % 2);

    b.wait(); ←
(5)

   }

   if (update_source) { ←
(6)

    ith_element = buffer[i];

   }

   b.done_waiting(); ←
(7)

  }

 };


 unsigned long const length = std::distance(first, last);


 if (length <= 1)

  return;


 std::vector buffer(length);

 barrier b(length);

 std::vector threads(length - 1); ←
(8)

 join_threads joiner(threads);


 Iterator block_start = first;

 for (unsigned long i = 0; i < (length - 1); ++i) {

  threads[i] = std::thread(process_element(), first, last,←
(9)

   std::ref(buffer), i, std::ref(b));

 }

 process_element()(first, last, buffer, length - 1, b);   ←
(10)

}

Общая структура кода вам, наверное, уже понятна. Имеется класс с оператором вызова (

process_element
), который выполняет содержательную работу (1) и вызывается из нескольких потоков (9), хранящихся в векторе (8), а также из главного потока (10). Важное отличие заключается в том, что теперь число потоков зависит от числа элементов в списке, а не от результата, возвращаемого функцией
std::thread::hardware_concurrency
. Я уже говорил, что эта идея не слишком удачна, если только вы не работаете на машине с массивно параллельной архитектурой, где потоки обходятся дешево. Но это неотъемлемая часть самой идеи решения. Можно было бы обойтись и меньшим числом потоков, поручив каждому обработку нескольких значений из исходного диапазона, но тогда при относительно небольшом количестве потоков этот алгоритм оказался бы менее эффективен, чем алгоритм с прямым распространением.

Так или иначе, основная работа выполняется в операторе вызове из класса

process_element
. На каждом шаге берется либо i-ый элемент из исходного диапазона, либо i-ый элемент из буфера (2) и складывается с предшествующим элементом, отстоящим от него на расстояние
stride
(3); результат сохраняется в буфере, если мы читали из исходного диапазона, и в исходном диапазоне — если мы читали из буфера (4). Перед тем как переходить к следующему шагу, мы ждем у барьера (5). Работа заканчивается, когда элемент, отстоящий на расстояние
stride
, оказывается слева от начала диапазона. В этом случае мы должно обновить элемент в исходном диапазоне, если сохраняли окончательный результат в буфере (6). Наконец, мы вызываем функцию
done_waiting()
(7), сообщая барьеру, что больше ждать не будем.

Отметим, что это решение не безопасно относительно исключений. Если один из рабочих потоков возбудит исключение в

process_element
, то приложение аварийно завершится. Решить эту проблему можно было бы, воспользовавшись
std::promise
для запоминания исключения, как в реализации
parallel_find
из листинга 8.9, или просто с помощью объекта
std::exception_ptr
, защищенного мьютексом.

Вот и подошли к концу обещанные три примера. Надеюсь, они помогли уложить в сознании соображения, высказанные в разделе 8.1, 8.2, 8.3 и 8.4, и показали, как описанные приемы воплощаются в реальном коде.

8.6. Резюме