Проектирование параллельных программ
В этой главе:■ Методы распределения данных между потоками.
■ Факторы, влияющие на производительность параллельного кода.
■ Как от этих факторов зависит дизайн параллельных структур данных.
■ Безопасность многопоточного кода относительно исключений.
■ Масштабируемость.
■ Примеры реализации параллельных алгоритмов.
В предыдущих главах мы в основном занимались появившимися в новом стандарте C++11 средствами для написания параллельных программ. В главах 6 и 7 мы видели, как эти средства применяются для проектирования простых структур данных, безопасных относительно доступа из нескольких потоков. Но как столяру недостаточно знать, как устроена петля или шарнир, чтобы смастерить шкаф или стол, так и для проектирования параллельных программ недостаточно знакомства с устройством и применением простых структур данных. Теперь мы будем рассматривать проблему в более широком контексте и научимся строить более крупные узлы, способные выполнять полезную работу В качестве примеров я возьму многопоточные реализации некоторых алгоритмов из стандартной библиотеки С++, но те же самые принципы остаются в силе при разработке всех уровней приложения.
Как и в любом программном проекте, тщательное продумывание структуры параллельного кода имеет первостепенное значение. Однако в параллельной программе приходится учитывать еще больше факторов, чем в последовательной. Нужно принимать во внимание не только инкапсуляцию, связанность и сцепленность (эти вопросы подробно рассматриваются во многих книгах по проектированию программного обеспечения), но и то, какие данные разделять, как организовывать доступ к ним, каким потокам придётся ждать других потоков для завершения операции и т.д.
Именно этим мы и будем заниматься в этой главе, начав с высокоуровневых (но фундаментальных) вопросов о том, сколько создавать потоков, какой код выполнять в каждом потоке и как многопоточность влияет на понятность программы, и закончив низкоуровневыми деталями того, как организовать разделяемые данные для достижения оптимальной производительности.
Начнем с методов распределения работы между потоками.
8.1. Методы распределения работы между потоками
Представьте, что вам поручили построить дом. Для этого предстоит вырыть котлован под фундамент, возвести стены, провести водопровод и электропроводку и т.д. Теоретически, имея достаточную подготовку, все это можно сделать и в одиночку, по времени уйдет много и придётся постоянно переключаться с одного занятия на другое. Можно вместо этого нанять несколько помощников. Тогда нужно решить, сколько людей нанимать и каких специальностей. К примеру, пригласить двух универсалов и всё делать совместно. По-прежнему нужно будет переходить от одной работы к другой, но в итоге строительство удастся завершить быстрее, так как теперь в нем принимает участие больше народу
Есть и другой путь — нанять бригаду узких специалистов: каменщика, плотника, электрика, водопроводчика и других. Специалисты делают то, что лучше всего умеют, так что если прокладывать трубы не надо, то водопроводчик попивает чаёк. Работа все равно продвигается быстрее, так как занято больше людей, — водопроводчик может заниматься туалетом, пока электрик делает проводку на кухне, но зато когда для конкретного специалиста работы нет, он простаивает. Впрочем, несмотря на простои, специалисты, скорее всего, справятся быстрее, чем бригада мастеров на все руки. Им не нужно менять инструменты, да и свое дело они по идее должны делать быстрее, чем универсалы. Так ли это на самом деле, зависит от разных обстоятельств — нет другого пути, как взять и попробовать.
Но и специалистов можно нанять много или мало. Наверное, имеет смысл нанимать больше каменщиков, чем электриков. Кроме того, состав бригады и ее общая эффективность могут измениться, если предстоит построить несколько домов. Если в одном доме у водопроводчика мало работы, то на строительстве нескольких его можно занять полностью. Наконец, если вы не обязаны платить специалистам за простой, то можно позволить себе нанять больше людей, пусть даже в каждый момент времени работать будет столько же, сколько раньше.
Но хватит уже о строительстве, какое отношение всё это имеет к потокам? Да просто к ним применимы те же соображения. Нужно решить, сколько использовать потоков и какие задачи поручить каждому Должны ли потоки быть «универсалами», берущимися за любую подвернувшуюся работу или «специалистами», которые умеют хорошо делать одно дело? Или, быть может, нужно сочетание того и другого? Эти решения необходимо принимать вне зависимости от причин распараллеливания программы и от того, насколько они будут удачны, существенно зависит производительность и ясность кода. Поэтому так важно понимать, какие имеются варианты; тогда решения о структуре приложения будут опираться на знания, а не браться с потолка. В этом разделе мы рассмотрим несколько методов распределения задач и начнем с распределения данных между потоками.
8.1.1. Распределение данных между потоками до начала обработки
Легче всего распараллеливаются такие простые алгоритмы, как
std::for_each
, которые производят некоторую операцию над каждым элементом набора данных. Чтобы распараллелить такой алгоритм, можно назначить каждому элементу один из обрабатывающих потоков. Каким образом распределить элементы для достижения оптимальной производительности, зависит от деталей структуры данных, как мы убедимся ниже в этой главе.Простейший способ распределения данных заключается в том, чтобы назначить первые N элементов одному потоку, следующие N — другому и так далее, как показано на рис. 8.1, хотя это и не единственное решение. Как бы ни были распределены данные, каждый поток обрабатывает только назначенные ему элементы, никак не взаимодействуя с другими потоками до тех пор, пока не завершит обработку.
Рис. 8.1. Распределение последовательных блоков данных между потоками
Такая организация программы знакома каждому, кто программировал в системах Message Passing Interface (МРI)[17] или OpenMP[18]: задача разбивается на множество параллельных подзадач, рабочие потоки выполняют их параллельно, а затем результаты объединяются на стадии редукции. Этот подход применён в примере функции
accumulate
из раздела 2.4; в данном случае и параллельные задачи, и финальный шаг редукции представляют собой аккумулирование. Для простого алгоритма for_each
финальный шаг отсутствует, так как нечего редуцировать.Понимание того, что последний шаг является редукцией, важно; в наивной реализации, представленной в листинге 2.8, финальная редукция выполняется как последовательная операция. Однако часто и ее можно распараллелить; операция
accumulate
по сути дела сама является редукцией, поэтому функцию из листинга 2.8 можно модифицировать таким образом, чтобы она вызывала себя рекурсивно, если, например, число потоков больше минимального количества элементов, обрабатываемых одним потоком. Или запрограммировать рабочие потоки так, чтобы по завершении задачи они выполняли некоторые шаги редукции, а не запускать каждый раз новые потоки.Хотя это действенная техника, применима она не в любой ситуации. Иногда данные не удается заранее распределить равномерно, а как это сделать, становится понятно только в процессе обработки. Особенно наглядно это проявляется в таких рекурсивных алгоритмах, как Quicksort; здесь нужен другой подход.
8.1.2. Рекурсивное распределение данных
Алгоритм Quicksort состоит из двух шагов: разбиение данных на две части — до и после опорного элемента в смысле требуемого упорядочения, и рекурсивная сортировка обеих «половин». Невозможно распараллелить этот алгоритм, разбив данные заранее, потому что состав каждой «половины» становится известен только в процессе обработки элементов. Поэтому распараллеливание по необходимости должно быть рекурсивным. На каждом уровне рекурсии производится больше вызовов функции
quick_sort
, потому что предстоит отсортировать элементы, меньшие опорного, и большие опорного. Эти рекурсивные вызовы не зависят друг от друга, так как обращаются к разным элементам, поэтому являются идеальными кандидатами для параллельного выполнения. На рис. 8.2 изображено такое рекурсивное разбиение.Рис. 8.2. Рекурсивное разбиение данных
В главе 4 была приведена подобная реализация. Вместо того чтобы просто вызывать функцию рекурсивно для большей и меньшей «половины», мы с помощью
std::async()
на каждом шаге запускали асинхронную задачу для меньшей половины. Вызывая std::async()
, мы просим стандартную библиотеку С++ самостоятельно решить, имеет ли смысл действительно выполнять задачу в новом потоке или лучше сделать это синхронно.Это существенно: при сортировке большого набора данных запуск нового потока для каждого рекурсивного вызова быстро приводит к образованию чрезмерного количества потоков. Как мы увидим ниже, когда потоков слишком много, производительность может не возрасти, а наоборот упасть. Кроме того, если набор данных очень велик, то потоков может просто не хватить. Сама идея рекурсивного разбиения на задачи хороша, нужно только строже контролировать число запущенных потоков. В простых случаях с этим справляется
std::async()
, но есть и другие варианты.Один из них — воспользоваться функцией
std::thread::hardware_concurrency()
для выбора нужного числа потоков, как мы делали в параллельной версии accumulate()
из листинга 2.8. Тогда вместо того чтобы запускать новый поток для каждого рекурсивного вызова, мы можем просто поместить подлежащий сортировке блок данных в потокобезопасный стек типа того, что был описан в главах 6 и 7. Если потоку больше нечего делать — потому что он закончил обработку всех своих блоков или потому что ждет, когда необходимый ему блок будет отсортирован, то он может взять блок из стека и заняться его сортировкой.В следующем листинге показана реализация этой идеи.
Листинг 8.1. Параллельный алгоритм Quicksort с применением стека блоков, ожидающих сортировки
template
struct sorter { ←
(1) struct chunk_to_sort {
std::list data;
std::promise> promise;
};
thread_safe_stack chunks; ←
(2) std::vector threads; ←
(3) unsigned const max_thread_count;
std::atomic end_of_data;
sorter():
max_thread_count(std::thread::hardware_concurrency() - 1),
end_of_data(false) {}
~sorter() { ←
(4) end_of_data = true; ←
(5)
for (unsigned i = 0; i < threads.size(); ++i) {
threads[i].join(); ←
(6) }
}
void try_sort_chunk() {
boost::shared_ptr chunk = chunks.pop();←
(7) if (chunk) {
sort_chunk(chunk); ←
(8) }
}
std::list do_sort(std::list& chunk_data) { ←
(9) if (chunk_data.empty()) {
return chunk_data;
}
std::list result;
result.splice(result.begin(), chunk_data, chunk_data.begin());
T const& partition_val = *result.begin();
typename std::list::iterator divide_point = ←
(10) std::partition(chunk_data.begin(), chunk_data.end(),
[&](T const& val) {return val < partition_val; });
chunk_to_sort new_lower_chunk;
new_lower_chunk.data.splice(new_lower_chunk.data.end(),
chunk_data, chunk_data.begin(),
divide_point);
std::future> new_lower =
new_lower_chunk.promise.get_future();
chunks.push(std::move(new_lower_chunk)); ←
(11) if (threads.size() < max_thread_count) { ←
(12) threads.push_back(std::thread(&sorter::sort_thread, this));
}
std::list new_higher(do_sort(chunk_data));
result.splice(result.end(), new_higher);
while (new_lower.wait_for(std::chrono::seconds(0)) !=
std::future_status::ready) { ←
(13) try_sort_chunk(); ←
(14) }
result.splice(result.begin(), new_lower.get());
return result;
}
void sort_chunk(boost::shared_ptr const& chunk) {
chunk->promise.set_value(do_sort(chunk->data));←
(15) }
void sort_thread() {
while (!end_of_data) { ←
(16) try_sort_chunk(); ←
(17) std::this_thread::yield();←
(18) }
}
};
template
std::list parallel_quick_sort(std::list input) { ←
(19) if (input.empty()) {
return input;
}
sorter s;
return s.do_sort(input); ←
(20)}
Здесь функция
parallel_quick_sort
(19) делегирует большую часть работы классу sorter
(1), который объединяет стек неотсортированных блоков (2) с множеством потоков (3). Основные действия производятся в функции-члене do_sort
(9), которая занимается обычным разбиением данных на две части (10). Но на этот раз она не запускает новый поток для каждого блока, а помещает его в стек (11) и запускает новый поток, только если еще есть незанятые процессоры (12). Поскольку меньший блок, возможно, обрабатывается другим потоком, нам необходимо дождаться его готовности (13). Чтобы не простаивать зря (в том случае, когда данный поток единственный или все остальные уже заняты), мы пытаемся обработать блок, находящийся в стеке (14). Функция try_sort_chunk
извлекает поток из стека (7), сортирует его (8) и сохраняет результаты в обещании promise
, так чтобы их смог получить поток, который поместил в стек данный блок (15).Запущенные потоки крутятся в цикле, пытаясь отсортировать блоки, находящиеся в стеке (17), ожидая, пока будет установлен флаг
end_of_data
(16). В промежутке между проверками они уступают процессор другим потокам (18), чтобы у тех был шанс поместить в стек новые блоки. Эта программа полагается на то, что деструктор класса sorter
(4) разберется с запущенными потоками. Когда все данные будут отсортированы, do_sort
вернет управление (хотя рабочие потоки все еще выполняются), так что главный поток вернется из parallel_quick_sort
(20) и, стало быть, уничтожит объект sorter
. В этот момент деструктор поднимет флаг end_of_data
(5) и дождется завершения рабочих потоков (6). Поднятый флаг является для функции потока указанием выйти из цикла (16).При таком подходе не возникает проблемы неограниченного количества потоков, с которой мы сталкивались, когда
spawn_task
каждый раз запускает новый поток. С другой стороны, мы не полагаемся на то, что стандартная библиотека С++ выберет количество рабочих потоков за нас, как то происходит при использовании std::async()
. Мы сами ограничиваем число потоков значением std::thread::hardware_concurrency()
, чтобы избежать чрезмерно большого количества контекстных переключений. Однако же взамен нас поджидает другая потенциальная проблема: управление потоками и взаимодействие между ними сильно усложняют код. Кроме того, хотя потоки и обрабатывают разные элементы данных, все они обращаются к стеку для добавления новых блоков и извлечения блоков для сортировки. Из-за этой острой конкуренции производительность может снизиться, пусть даже мы используем свободный от блокировок (и, значит, неблокирующий) стек. Почему так происходит, мы увидим чуть ниже.Этот подход представляет собой специализированную версию пула потоков — существует набор потоков, которые получают задачи из списка ожидающих работ, выполняют их, а затем снова обращаются к списку. Некоторые потенциальные проблемы, свойственные пулам потоков (в том числе конкуренция за список работ), и способы их решения рассматриваются в главе 9. Задача масштабирования приложения на несколько процессоров более детально обсуждается в этой главе ниже (см. раздел 8.2.1).
Как при предварительном, так и при рекурсивном распределении данных мы предполагаем, что сами данные фиксированы заранее, а нам нужно лишь найти удачный способ их разбиения. Но так бывает не всегда; если данные порождаются динамически или поступают из внешнего источника, то такой подход не годится. В этом случае имеет смысл распределять работу по типам задач, а не по данным.
8.1.3. Распределение работы по типам задач
Распределение работы между потоками путем назначения каждому потоку блока данных (заранее или рекурсивно во время обработки) все же опирается на предположение о том, что все потоки производят одни те же действия с данными. Альтернативный подход — заводить специализированные потоки, каждый из которых решает отдельную задачу — как водопроводчики и электрики на стройке. Потоки могут даже работать с одними и теми же данными, но обрабатывать их по-разному.
Такой способ распределения работы — следствие распараллеливания ради разделения обязанностей; у каждого потока есть своя задача, которую он решает независимо от остальных. Время от времени другие потоки могут поставлять нашему потоку данные или генерировать события, на которые он должен отреагировать, но в общем случае каждый поток занимается тем, для чего предназначен. В принципе, это хороший дизайн — у каждой части кода есть своя зона ответственности.
Распределение работы по типам задач с целью разделения обязанностейОднопоточное приложение должно как-то разрешать противоречия с принципом единственной обязанности, если существует несколько задач, которые непрерывно выполняются в течение длительного промежутка времени, или если приложение должно обрабатывать поступающие события (например, нажатия на клавиши или входящие сетевые пакеты) своевременно, несмотря на наличие других задач. Для решения этой проблемы мы обычную вручную пишем код, который выполняет кусочек задачи А, потом кусочек задачи В, потом проверяет, не нажата ли клавиша и не пришёл ли сетевой пакет, а потом возвращается в начало цикла, чтобы выполнить следующий кусочек задачи А. В результате код задачи А оказывается усложнен из-за необходимости сохранять состояние и периодически возвращать управление главному циклу. Если выполнять в этом цикле чрезмерно много задач, то скорость работы упадёт, а пользователю придётся слишком долго ждать реакции на нажатие клавиши. Уверен, все вы встречались с крайними проявлениями этого феномена не в одном так в другом приложении: вы просите программу выполнить какое-то действие, после чего интерфейс вообще перестаёт реагировать, пока оно не завершится.
Тут-то и приходят на выручку потоки. Если выполнять каждую задачу в отдельном потоке, то всю эту работу возьмет на себя операционная система. В коде для решения задачи А вы можете сосредоточить внимание только на этой задаче и не думать о сохранении состояния, возврате в главный цикл и о том, сколько вам понадобится времени. Операционная система автоматически сохранит состояние и в подходящий момент переключится на задачу В или С, а если система оснащена несколькими процессорами или ядрами, то задачи А и В могут даже выполняться действительно параллельно. Код обработки клавиш или сетевых пакетов теперь будет работать без задержек, и все остаются в выигрыше: пользователь своевременно получает отклик от программы, а разработчик может писать более простой код, так как каждый поток занимается только тем, что входит в его непосредственные обязанности, не отвлекаясь на управление порядком выполнения или на взаимодействие с пользователем.
Звучит, как голубая мечта. Да возможно ли такое? Как всегда, дьявол кроется в деталях. Если всё действительно независимо и потокам не нужно общаться между собой, то это и вправду легко. Увы, мир редко бывает таким идеальным. Эти замечательные фоновые потоки часто выполняют какие-то запросы пользователя и должны уведомлять пользователя о завершении, обновляя графический интерфейс. А то еще пользователь вздумает отменить задачу, и тогда интерфейс должен каким-то образом послать потоку сообщение с требованием остановиться. В обоих случаях необходимо все тщательно продумать и правильно синхронизировать, хотя обязанности таки разделены. Поток пользовательского интерфейса занимается только интерфейсом, но должен обновлять его но требованию других потоков. Аналогично поток, занятый фоновой задачей, выполняет только операции, свойственные данной задаче, но не исключено, что одна из этих обязанностей заключается в том, чтобы остановиться по просьбе другого потока. Потоку все равно, откуда поступил запрос, важно лишь, что он адресован ему и относится к его компетенции.
На пути разделения обязанностей между потоками нас подстерегают две серьезные опасности. Во-первых, мы можем неправильно разделить обязанности. Признаками такой ошибки является большой объем разделяемых данных или положение, при котором потоки должны ждать друг друга; то и другое означает, что взаимодействие между потоками избыточно интенсивно. Когда такое происходит, нужно понять, чем вызвана необходимость взаимодействия. Если все сводится к какой-то одной причине, то, быть может, соответствующую обязанность следует поручить отдельному потоку, освободив от нее всех остальных. Наоборот, если два потока много взаимодействуют друг с другом и мало — с остальными, то, возможно, имеет смысл объединить их в один.
Распределяя задачи по типам, не обязательно полностью изолировать их друг от друга. Если к нескольким наборам входных данных требуется применять одну последовательность операций, то работу можно разделить так, что каждый поток будет выполнять какой-то один этап этой последовательности.
Распределение последовательности задач между потокамиЕсли задача заключается в применении одной последовательности операций ко многим независимым элементам данных, то можно организовать распараллеленный конвейер. Здесь можно провести аналогию с физическим конвейером: данные поступают с одного конца, подвергаются ряду операций и выходят с другого конца.
Чтобы распределить работу таким образом, следует создать отдельный поток для каждого участка конвейера, то есть для каждой операции. По завершении операции элемент данных помещается в очередь, откуда его забирает следующий поток. В результате поток, выполняющий первую операцию, сможет приступить к обработке следующего элемента, пока второй поток трудится над первым элементом.
Такая альтернатива стратегии распределения данных между потоками, которая была описана в разделе 8.1.1, хорошо приспособлена для случаев, когда входные данные вообще неизвестны до начала операции. Например, данные могут поступать по сети или первой в последовательности операцией может быть просмотр файловой системы на предмет нахождения подлежащих обработке файлов.
Конвейеры хороши также тогда, когда каждая операция занимает много времени; распределяя между потоками задачи, а не данные, мы изменяем качественные показатели производительности. Предположим, что нужно обработать 20 элементов данных на машине с четырьмя ядрами, и обработка каждого элемента состоит из четырех шагов, по 3 секунды каждый. Если распределить данные между потоками, то каждому потоку нужно будет обработать 5 элементов. В предположении, что нет никаких других операций, которые могли бы повлиять на хронометраж, по истечении 12 секунд будет обработано четыре элемента, по истечении 24 секунд — 8 элементов и т.д. На обработку всех 20 элементов уйдет 1 минута. Если организовать конвейер, ситуация изменится. Четыре шага можно распределить между четырьмя ядрами. Теперь первый элемент будет обрабатываться каждым из четырех ядер, и в целом на это уйдет 12 секунд — как и раньше. На самом деле, по истечении 12 секунд в нас обработан всего один элемент, что хуже, чем при распределении данных. Однако после того как конвейер запущен, работа идет немного по-другому; первое ядро, обработав первый элемент, переходит ко второму. Поэтому, когда последнее ядро закончит обработку первого элемента, второй уже будет наготове. В результате каждые три секунды на выходе получается очередной обработанный элемент, тогда как раньше элементы выходили пачками по четыре каждые 12 секунд.
Суммарное время обработки всего пакета может увеличиться, потому что придётся подождать 9 секунд, пока первый элемент доберется до последнего ядра. Но более плавная обработка в некоторых случаях предпочтительнее. Возьмем, к примеру, систему воспроизведения цифрового видео высокой четкости. Чтобы фильм можно было смотреть без напряжения, частота кадров должна быть не менее 25 в секунду, а лучше — больше. Кроме того, временные промежутки между кадрами должны быть одинаковы для создания иллюзии непрерывного движения. Приложение, способное декодировать 100 кадров секунду, никому не нужно, если оно секунду ждет, потом «выплевывает» сразу 100 кадров, потом еще секунду ждет и снова выдает 100 кадров. С другой стороны, зритель не будет возражать против двухсекундной задержки перед началом просмотра. В такой ситуации распараллеливание с помощью конвейера, позволяющее выводить кадры с постоянной скоростью, безусловно, предпочтительнее.
Познакомившись с различными способами распределения работы между потоками, посмотрим теперь, какие факторы влияют на производительность многопоточной системы и как от них зависит выбор решения.
8.2. Факторы, влияющие на производительность параллельного кода