До сих пор мы в этой главе рассматривали различные способы распределения работы между потоками, факторы, влияющие на производительность, и то, как от них зависит выбор порядка доступа к данным и самой структуры данных. Но этим проблематика проектирования параллельных программ не исчерпывается. Необходимо еще принимать во внимание такие вещи, как безопасность относительно исключений и масштабируемость. Говорят, что программа масштабируется, если ее производительность (в терминах повышения быстродействия или увеличения пропускной способности) возрастает при добавлении новых процессорных ядер. В идеале производительность должна расти линейно, то есть система с 100 процессорами должна работать в 100 раз быстрее системы с одним процессором.
Даже немасштабируемая программа может быть работоспособной — в конце концов, однопоточные приложения в этом смысле, безусловно, не масштабируемы — но вот безопасность относительно исключений — вопрос, напрямую связанный с корректностью. Если программа не безопасна относительно исключений, то может наблюдаться нарушение инвариантов, состояния гонки и даже аварийное завершение. Вот этим вопросом мы сейчас и займемся.
8.4.1. Безопасность относительно исключений в параллельных алгоритмах
Безопасность относительно исключений — необходимая составная часть любой приличной программы на С++, и параллельные программы — не исключение. На самом деле, при разработке параллельных алгоритмов часто требуется уделять исключениям даже больше внимания. Если какая-то операция в последовательном алгоритме возбуждает исключение, то алгоритм должен лишь позаботиться о предотвращении утечек памяти и нарушения собственных инвариантов, а потом может передать исключение вызывающей программе для обработки. В параллельных же алгоритмах многие операции выполняются в разных потоках. В этом случае исключение невозможно распространить вверх по стеку вызовов, потому что у каждого потока свой стек. Если выход из функции потока производится в результате исключения, то приложение завершается.
В качестве конкретного примера рассмотрим еще раз функцию
parallel_accumulate
из листинга 2.8, который воспроизведен ниже.
Листинг 8.2. Наивная параллельная организация
std::accumulate
(из листинга 2.8)template
struct accumulate_block {
void operator()(Iterator first, Iterator last, T& result) {
result = std::accumulate(first, last, result); ←
(1) }
};
template
T parallel_accumulate(Iterator first, Iterator last, T init) {
unsigned long const length = std::distance(first, last);←
(2) if (!length)
return init;
unsigned long const min_per_thread = 25;
unsigned long const max_threads =
(length + min_per_thread - 1) / min_per_thread;
unsigned long const hardware_threads =
std::thread::hardware_concurrency();
unsigned long const num_threads =
std::min(
hardware_threads != 0 ? hardware_threads : 2, max_threads);
unsigned long const block_size = length / num_threads;
std: :vector results (num_threads); ←
(3) std::vector threads(num_threads - 1); ←
(4)
Iterator block_start = first; ←
(5) for (unsigned long i = 0; i < (num_threads - 1); ++i) {
Iterator block_end = block_start; ←
(6) std::advance(block_end, block_size);
threads[i] = std::thread( ←
(7) accumulate_block(),
block_start, block_end, std::ref(results[i]));
block_start = block_end; ←
(8) }
accumulate_block()(
block_start, last, results[num_threads - 1]);←
(9)
std::for_each(threads.begin(), threads.end(),
std::mem_fn(&std::thread::join));
return
std::accumulate(results.begin(), results.end(), init); ←
(10)}
Посмотрим, где могут возникнуть исключения. Вообще говоря, это вызовы библиотечных функций, которые могут возбуждать исключения, а также операции, определенные в пользовательском типе.
Итак, начнем. В точке (2) мы обращаемся к функции
distance
, которая выполняет операции над пользовательским типом итератора. Поскольку мы еще не начали работу, и обращение к этой функции произведено из вызывающего потока, то тут всё нормально. Далее мы выделяем память для векторов results
(3) и threads
(4). И эти обращения произведены из вызывающего потока до начала работы и до создания новых потоков, так что и здесь всё хорошо. Разумеется, если конструктор threads
возбудит исключение, то нужно будет освободить память, выделенную для results
, но об этом позаботится деструктор.С инициализацией объекта
block_start
(5) всё чисто по тем же причинам, так что перейдём к операциям в цикле запуска потоков (6), (7), (8). Если после создания первого же потока (7) возникнет исключение, и мы его не перехватим, появится проблема; деструкторы объектов std::thread
вызывают std::terminate
, что приводит к аварийному завершению программы. Нехорошо.Обращение к
accumulate_block
в точке (9) может возбуждать исключения — с точно такими же последствиями: объекты потоков будут уничтожены, а их деструкторы вызовут std::terminate
. С другой стороны, исключение, возбуждаемое в последнем обращении к std::accumulate
(10), не так опасно, потому что к этому моменту все потоки уже присоединились к вызывающему.Таким образом, обращения к
accumulate_block
из новых потоков могут возбуждать исключения в точке (1). Так как блоки catch
отсутствуют, то исключение останется необработанным и приведёт к вызову std::terminate()
и завершению программы.Если это еще не очевидно, скажем прямо: этот код не безопасен относительно исключений.
Делаем код безопасным относительно исключенийИтак, мы выявили все возможные точки возбуждения исключений и поняли, к каким печальным последствиям это приведёт. Что с этим можно сделать? Начнем с вопроса об исключениях, возбуждаемых в созданных нами потоках.
В главе 4 мы уже познакомились с подходящим для решения проблемы средством. Для чего нам вообще нужны новые потоки? Для того чтобы вычислить результат и при этом учесть возможность возникновения исключений. Но это именно то, для чего предназначено сочетание
std::packaged_task
и std::future
. В листинге 8.3 показан код, переписанный с использованием std::packaged_task
.
Листинг 8.3. Параллельная версия
std::accumulate
с применением std::packaged_task
template
struct accumulate_block {
T operator()(Iterator first, Iterator last) {←
(1) return std::accumulate(first, last, T()); ←
(2) }
};
template
T parallel_accumulate(Iterator first, Iterator last, T init) {
unsigned long const length = std::distance(first, last);
if (!length)
return init;
unsigned long const min_per_thread = 25;
unsigned long const max_threads =
(length + min_per_thread — 1) / min_per_thread;
unsigned long const hardware_threads =
std::thread::hardware_concurrency();
unsigned long const num_threads =
std::min(
hardware_threads i = 0 ? hardware_threads : 2, max_threads);
unsigned long const block_size = length / num_threads;
std::vector> futures(num_threads-1);←
(3) std::vector threads(num_threads — 1);
Iterator block_start = first;
for (unsigned long i = 0; i < (num_threads - 1); ++i) {
Iterator block_end = block_start;
std::advance(block_end, block_size);
std::packaged_task task( ←
(4) accumulate_block());
futures[i] = task.get_future(); ←
(5) threads[i] =
std::thread(std::move(task), block_start, block_end);←
(6) block_start = block_end;
}
T last_result = accumulate_block()(block_start, last); ←
(7)
std::for_each(threads.begin(), threads.end(),
std::mem_fn(&std::thread::join));
T result = init; ←
(8) for (unsigned long i = 0; i < (num_threads - 1); ++i) {
result += futures[i].get(); ←
(9) }
result += last_result; ←
(10) return result;
}
Первое изменение заключается в том, что оператор вызова в
accumulate_block
теперь возвращает результат по значению, а не принимает ссылку на место, где его требуется сохранить (1). Для обеспечения безопасности относительно исключений мы используем std::packaged_task
и std::future
, поэтому можем воспользоваться этим и для передачи результата. Правда, для этого требуется при вызове std::accumulate
(2) явно передавать сконструированный по умолчанию экземпляр T
, а не использовать повторно предоставленное значение result
, но это не слишком существенное изменение.Далее, вместо того заводить вектор результатов, мы создаем вектор
futures
(3), в котором храним объекты std::future
для каждого запущенного потока. В цикле запуска потоков мы сначала создаем задачу для accumulate_block
(4). В классе std::packaged_task
объявлена задача, которая принимает два объекта Iterator
и возвращает T
, а это именно то, что делает наша функция. Затем мы получаем будущий результат для этой задачи (5) и исполняем ее в новом потоке, передавая начало и конец обрабатываемого блока (6). Результат работы задачи, равно как и исключение, если оно возникнет, запоминается в объекте future
.Поскольку используются будущие результаты, массива
results
больше нет, поэтому мы должны сохранить результат обработки последнего блока в переменной (7), а не в элементе массива. Кроме того, поскольку мы получаем значения из будущих результатов, проще не вызывать std::accumulate
, а написать простой цикл for
, в котором к переданному начальному значению (8) будут прибавляться значения, полученные из каждого будущего результата (9). Если какая-то задача возбудит исключение, то оно будет запомнено в будущем результате и повторно возбуждено при обращении к get()
. Наконец, перед тем как возвращать окончательный результат вызывающей программе, мы прибавляем результат обработки последнего блока (10).Таким образом, мы устранили одну из потенциальных проблем: исключения, возбужденные в рабочих потоках, повторно возбуждаются в главном. Если исключения возникнут в нескольких рабочих потоках, то вверх распространится только одно, но это не очень страшно. Если вы считаете, что это все-таки важно, то можете воспользоваться чем-то вроде класса
std::nested_exception
, чтобы собрать все такие исключения и передать их главному потоку.Осталось решить проблему утечки потоков в случае, когда исключение возникает между моментом запуска первого потока и присоединением всех запущенных. Для этого проще всего перехватить любое исключение, дождаться присоединения потоков, которые все еще находятся в состоянии
joinable()
, а потом возбудить исключение повторно:try {
for (unsigned long i = 0; i < (num_threads - 1); ++i) {
// ... как и раньше
}
T last_result = accumulate_block()(block_start, last);
std::for_each(threads.begin(), threads.end(),
std::mem_fn(&std::thread::join));
} catch (...) {
for (unsigned long i = 0; i < (num_thread - 1); ++i) {
if (threads[i].joinable())
thread[i].join();
}
throw;
}
Теперь все работает. Все потоки будут присоединены вне зависимости от того, как завершилась обработка блока. Однако блоки
try-catch
выглядят некрасиво, и часть кода дублируется. Мы присоединяем потоки как в «нормальной» ветке, так и в блоке catch
. Дублирование кода — вещь почти всегда нежелательная, потому что изменения придётся вносить в несколько мест. Давайте лучше перенесём этот код в деструктор — ведь именно такова идиома очистки ресурсов в С++. Вот как выглядит этот класс:class join_threads {
std::vector& threads;
public:
explicit join_threads(std::vector& threads_):
threads(threads_) {}
~join_threads() {
for (unsigned long i = 0; i < threads.size(); ++i) {
if (threads[i].joinable())
threads[i].join();
}
}
};
Это похоже на класс
thread_guard
из листинга 2.3, только на этот раз мы «охраняем» целый вектор потоков. Теперь наш код упрощается.
Листинг 8.4. Безопасная относительно исключений версия
std::accumulate
template
T parallel_accumulate(Iterator first, Iterator last, T init) {
unsigned long const length = std::distance(first, last);
if (!length)
return init;
unsigned long const min_per_thread = 25;
unsigned long const max_threads =
(length + min_per_thread - 1) / min_per_thread;
unsigned long const hardware_threads =
std::thread::hardware_concurrency();
unsigned long const num_threads =
std::min(
hardware_threads i = 0 ? hardware_threads : 2, max_threads);
unsigned long const block_size = length / num_threads;
std::vector> futures(num_threads — 1);
std::vector threads(num_threads - 1);
join_threads joiner(threads); ←
(1)
Iterator block_start = first;
for (unsigned long i = 0; i < (num_threads - 1); ++i) {
Iterator block_end = block_start;
std::advance(block_end, block_size);
std::packaged_task task(
accumulate_block());
futures[i] = task.get_future();
threads[i] =
std::thread(std:move(task), block_start, block_end);
block_start = block_end;
}
T last_result = accumulate_block()(block_start, last);
T result = init;
for (unsigned long i = 0; i < (num_threads - 1); ++i) {
result + = futures[i].get(); ←
(2) }
result += last_result;
return result;
}
Создав контейнер потоков, мы создаем объект написанного выше класса (1), который присоединяет все завершившиеся потоки. Теперь явный цикл присоединения можно удалить, точно зная, что при выходе из функции потоки будут присоединены. Отметим, что вызовы
futures[i].get()
(2) приостанавливают выполнение программы до готовности результатов, поэтому в этой точке явно присоединять потоки не нужно. Этим данный код отличается от первоначальной версии в листинге 8.2, где присоединять потоки было необходимо для нрав ильного заполнения вектора results
. Мало того что мы получили безопасный относительно исключений код, так еще и функция стала короче, потому что код присоединения вынесен в новый класс (который, кстати, можно использовать и в других программах).Обеспечение безопасности относительно исключений при работе с std::async()
Теперь, когда мы знаем, что необходимо для обеспечения безопасности относительно исключений в программе, которая явно управляет потоками, посмотрим, как добиться того же результата при использовании
std::async()
. Мы уже видели, что в этом случае управление потоками берет на себя библиотека, а запущенный ей поток завершается, когда будущий результат готов. В плане безопасности относительно исключений важно то, что если уничтожить будущий результат, не дождавшись его, то деструктор будет ждать завершения потока. Тем самым мы элегантно решаем проблему утечки потоков, которые еще исполняются и хранят ссылки на данные. В следующем листинге показана безопасная относительно исключений реализация с применением std::async()
.
Листинг 8.5. Безопасная относительно исключений версия
std::accumulate
с применением std::async
template
T parallel_accumulate(Iterator first, Iterator last, T init) {
unsigned long const length = std::distance(first, last);←
(1) unsigned long const max_chunk_size = 25;
if (length <= max_chunk_size) {
return std::accumulate(first, last, init); ←
(2) } else {
Iterator mid_point = first;
std::advance(mid_point, length / 2); ←
(3) std::future first_half_result =
std::async(parallel_accumulate, ←
(4) first, mid_point, init);
T second_half_result =
parallel_accumulate(mid_point, last, T()); ←
(5) return first_half_result.get() + second_half_result;←
(6) }
}
В этой версии мы распределяем данные рекурсивно, а не вычисляем распределение по блокам заранее, но в целом код намного проще предыдущей версии и при этом безопасен относительно исключений. Как и раньше, сначала мы вычисляем длину последовательности (1), и, если она меньше максимального размера блока, то вызываем
std::accumulate
напрямую (2). В противном случае находим среднюю точку последовательности (3) и запускаем асинхронную задачу, которая будет обрабатывать левую половину (4). Для обработки правой половины мы вызываем себя рекурсивно (5), а затем складываем результаты обработки обеих половин (6). Библиотека гарантирует, что std::async
задействует имеющийся аппаратный параллелизм, не создавая слишком большого количества потоков. Некоторые «асинхронные» вызовы на самом деле будут исполняться синхронно при обращении к get()
(6).Изящество этого решения не только в том, что задействуется аппаратный параллелизм, но и в том, что безопасность относительно исключений обеспечивается тривиальным образом. Если рекурсивный вызов (5) возбудит исключение, то будущий результат, созданный при обращении к
std::async
(4), будет уничтожен при распространении исключения вверх по стеку. Это в свою очередь приведёт к ожиданию завершения асинхронного потока, так что «висячий поток» не образуется. С другой стороны, если исключение возбуждает асинхронный вызов, то оно будет запомнено в будущем результате и повторно возбуждено при обращении к get()
(6).Какие еще соображения следует принимать во внимание при проектировании параллельного кода? Давайте поговорим о масштабируемости. Насколько увеличится производительность программы, если запустить ее на машине с большим количеством процессоров?
8.4.2. Масштабируемость и закон Амдала
Под масштабируемостью понимается свойство программы использовать дополнительные имеющиеся в системе процессоры. На одном полюсе находятся однопоточные приложения, которые вообще не масштабируемы, — даже если система оснащена 100 процессорами, быстродействие такой программы не возрастет. На другом полюсе мы встречаем проект SETI@Home[19], который рассчитан на использование тысяч процессоров (в виде отдельных компьютеров, добровольно подключаемых к сети пользователями).
В многопоточной программе количество потоков, выполняющих полезную работу, может изменяться в процессе исполнения. Даже если каждый поток на всем протяжении своего существования делает что-то полезное, первоначально в приложении имеется всего один поток, который должен запустить все остальные. Но такой сценарий крайне маловероятен. Потоки часто не работают, а ожидают друг друга или завершения операций ввода/вывода.
Всякий раз, как один поток чего-то ждет (неважно, чего именно), а никакого другого потока, готового занять вместо него процессор, нет, процессор, который мог бы выполнять полезную работу, простаивает.
Упрощенно можно представлять, что программа состоит из «последовательных» участков, в которых полезные действия выполняет только один поток, и «параллельных», где задействованы все имеющиеся процессоры. Если программа исполняется на машине с большим числом процессоров, то теоретически «параллельные» участки могли бы завершаться быстрее, а «последовательные» такими бы и остались. Приняв такие упрощающие предположения, можно оценить потенциальное повышение производительности при увеличении количества процессоров: если доля «последовательных» участков равна fs, то коэффициент повышения производительности P при числе процессоров N составляет
Это закон Амдала, на который часто ссылаются при обсуждении производительности параллельных программ. Если код полностью распараллелен, то есть доля последовательных участков нулевая, то коэффициент ускорения равен N. Если же, к примеру, последовательные участки составляют треть программы, то даже при бесконечном количестве процессоров не удастся добиться ускорения более чем в три раза.
Конечно, эта картина чересчур упрощённая, потому что редко встречаются бесконечно делимые задачи, без чего это соотношение неверно, и не менее редко вся работа сводится только к процессорным вычислениям, как то предполагается в законе Амдала. Во время исполнения потоки могут ожидать разных событий.
Но из закона Амдала все же следует, что если целью распараллеливания является повышение производительности, то следует проектировать всё приложение так, чтобы процессорам всегда было чем заняться. За счет уменьшения длины «последовательных» участков или времени ожидания можно повысить выигрыш от добавления новых процессоров. Альтернативный подход — подать на вход системы больше данных и тем самым загрузить параллельные участки работой; при этом можно будет уменьшить долю последовательных участков и повысить коэффициент P.
По существу, масштабируемость — это возможность либо уменьшить время, затрачиваемое на какое-то действие, либо увеличить объем данных, обрабатываемых в единицу времени, при увеличении количества процессоров. Иногда оба свойства эквивалентны (можно обработать больше данных, если каждый элемент обрабатывается быстрее), но это необязательно. Прежде чем выбирать способ распределения работы между потоками, важно определить, какие аспекты масштабируемости представляют для вас наибольший интерес.
В начале этого раздела я уже говорил, что у потоков не всегда есть чем заняться. Иногда они вынуждены ждать другие потоки, завершения ввода/вывода или еще чего-то. Если на время этого ожидания загрузить систему какой-нибудь полезной работой, такое простаивание можно «скрыть».
8.4.3. Сокрытие латентности с помощью нескольких потоков
При обсуждении производительности многопоточного кода мы часто предполагали, что потоки трудятся изо всех сил и, получая в свое распоряжение процессор, всегда имеют полезную работу. Конечно, это не так — потоки часто оказываются блокированы в ожидании какого-то события: завершения ввода/вывода, освобождения мьютекса, завершения операции в каком-то другом потоке, сигнала условной переменной, готовности будущего результата… Наконец, они могут просто спать какое-то время.
Но какова бы ни была причина ожидания, если потоков столько же, сколько физических процессоров, наличие заблокированных потоков означает, что процессоры работают вхолостую. Процессор, который мог бы исполнять поток, вместо этого не делает ничего. Следовательно, если заранее известно, что какой-то поток будет проводить много времени в ожидании, то имеет смысл задействовать на это время процессор, запустив один или несколько дополнительных потоков.
Возьмем, к примеру, антивирусный сканер, который для распределения работы использует конвейер. Первый поток просматривает файловую систему и помещает имена файлов в очередь. Второй поток выбирает имена файлов из очереди и сканирует их на предмет наличия вирусов. Мы знаем, что поток просмотра файловой системы определённо будет простаивать в ожидании завершения ввода/вывода, поэтому «лишнее» процессорное время отдаем дополнительному потоку сканирования. Таким образом, у нас будет поток выбора файлов и столько потоков сканирования, сколько имеется процессоров. Поскольку потоку сканирования тоже нужно читать большие куски файлов, то имеет смысл еще увеличить количество таких потоков. Однако в какой-то момент потоков может стать слишком много, и система начнет работать медленнее, потому что вынуждена будет расходовать все больше и больше времени на контекстное переключение (см. раздел 8.2.5).
Такого рода настройка является оптимизацией, поэтому следует измерять производительность до и после изменения количества потоков; оптимальное их число зависит как от характера работы, так и от доли времени, затрачиваемой потоком на ожидание.
Иногда удается полезно использовать свободное процессорное время, не запуская дополнительные потоки. Например, если поток блокируется в ожидании завершения ввода/вывода, то имеет смысл воспользоваться асинхронным вводом/выводом, если платформа его поддерживает. Тогда поток сможет заняться полезной работой, пока ввод/вывод выполняется в фоне. С другой стороны, поток, ожидающий завершения операции в другом потоке, может в это время заняться чем-то полезным. Как это делается, мы видели при рассмотрении реализации свободной от блокировок очереди в главе 7. В крайнем случае, когда поток ждет завершения задачи, которая еще не была запущена другим потоком, ожидающий поток может сам выполнить эту задачу целиком или помочь в выполнении какой-то другой задачи. Такой пример мы видели в листинге 8.1, где функция сортировки пыталась отсортировать находящиеся в очереди блоки, пока блоки, которых она ждет, сортируются другими потоками.
Иногда потоки добавляются не для того, чтобы загрузить имеющиеся процессоры, а чтобы быстрее обрабатывать внешние события, то есть повысить быстроту реакции системы.
8.4.4. Повышение быстроты реакции за счет распараллеливания
Большинство современных графических интерфейсов пользователя являются событийно-управляемыми — пользователь выполняет в интерфейсе какие-то действия — нажимает клавиши или двигает мышь, в результате чего порождается последовательность событий или сообщений, которые приложение затем обрабатывает. Система может и сама порождать сообщения или события. Чтобы все события и сообщения были корректно обработаны, в приложении обычно присутствует цикл такого вида:
while (true) {
event_data event = get_event();
if (event.type == quit)
break;
process(event);
}
Детали API, конечно, могут отличаться, но структура всегда одна и та же: дождаться события, обработать его и ждать следующего. В однопоточном приложении такая структура затрудняет программирование длительных задач (см. раздел 8.1.3). Чтобы система оперативно реагировала на действия пользователя, функции
get_event()
и process()
должны вызываться достаточно часто вне зависимости от того, чем занято приложение. Это означает, что задача должна либо периодически приостанавливать себя и возвращать управление циклу обработки событий, либо сама вызывать функции get_event()
и process()
в подходящих точках. То и другое решение усложняет реализацию задачи.Применив распараллеливание для разделения обязанностей, мы сможем вынести длительную задачу в отдельный поток, а выделенному потоку GUI поручить обработку событий. В дальнейшем потоки могут взаимодействовать с помощью простых механизмов, и мешать код обработки событий с кодом задачи не придётся. В листинге ниже приведён набросок такого разделения обязанностей.
Листинг 8.6. Отделение потока GUI от потока задачи
std::thread task_thread;
std::atomic task_cancelled(false);
void gui_thread() {
while (true) {
event_data event = get_event();
if (event.type == quit)
break;
process(event);
}
}
void task() {
while (!task_complete() && !task_cancelled) {
do_next_operation();
}
if (task_cancelled) {
perform_cleanup();
} else {
post_gui_event(task_complete);
}
}
void process(event_data const& event) {
switch(event.type) {
case start_task:
task_cancelled = false;
task_thread = std::thread(task);
break;
case stop_task:
task_cancelled = true;
task_thread.join();
break;
case task_complete:
task_thread.join();
display_results();
break;
default:
//...
}
}
В результате такого разделения обязанностей поток пользовательского интерфейса всегда будет своевременно реагировать на события, даже если задача занимает много времени. Быстрота реакции часто является основной характеристикой приложения с точки зрения пользователя — с приложением, которое полностью зависает на время выполнения некоторой операции (неважно, какой именно), работать неприятно. За счет выделения специального потока для обработки событий пользовательский интерфейс может сам обрабатывать относящиеся к нему сообщения (например, изменение размера или перерисовка окна), не прерывая длительной операции, но передавая адресованные ей сообщения, если таковые поступают.
До сих пор в этой главе мы говорили о том, что следует учитывать при проектировании параллельного кода. Поначалу количество разных факторов может привести в изумление, но постепенно они войдут в плоть и кровь и станут вашей второй натурой. Если описанные выше соображения внове для вас, то, надеюсь, они станут яснее после того, как мы рассмотрим конкретные примеры многопоточного кода.
8.5. Проектирование параллельного кода на практике