Параллельное программирование на С++ в действии — страница 30 из 53

Эта глава оказалась очень насыщенной. Мы начали с описания различных способов распределения работы между потоками, в частности предварительного распределения данных и использования нескольких потоков для формирования конвейера. Затем мы остановились на низкоуровневых аспектах производительности многопоточного кода, обратив внимание на феномен ложного разделения и проблему конкуренции за данные. Далее мы перешли к вопросу о том, как порядок доступа к данным влияет на производительность программы. После этого мы поговорили о дополнительных соображениях при проектировании параллельных программ, в частности о безопасности относительно исключений и о масштабируемости. И закончили главу рядом примеров реализации параллельных алгоритмов, на которых показали, какие проблемы могут возникать при проектировании многопоточного кода.

В этой главе пару раз всплывала идея пула потоков — предварительно сконфигурированной группы потоков, выполняющих задачи, назначенные пулу. Разработка хорошего пула потоков — далеко не тривиальное дело. В следующей главе мы рассмотрим некоторые возникающие при этом проблемы, а также другие, более сложные, аспекты управления потоками.

Глава 9.Продвинутое управление потоками

В этой главе:

■ Пулы потоков.

■ Учет зависимостей между задачами, адресованными пулу.

■ Занимание работ у потоков из пула.

■ Прерывание потоков.

В предыдущих главах мы управляли потоками явно — путем создания объектов

std::thread
для каждого потока. В нескольких местах мы видели, что это не всегда желательно, так как приходится самостоятельно управлять временем жизни этих объектов, определять, сколько потоков создать для решения данной задачи с учетом имеющегося оборудования и т.д. В идеале хотелось бы просто разбить код на максимально мелкие блоки, которые можно выполнить параллельно, а потом передать их компилятору и библиотеке, сказав: «Распараллель и обеспечь оптимальную производительность».

В ряде примеров нам встречалась еще одна повторяющаяся тема — мы можем использовать несколько потоков для решения задачи, но хотим, чтобы они завершались досрочно, если выполнено некоторое условие, например: результат уже получен, или произошла ошибка, или пользователь потребовал отменить операцию. В любом случае потокам нужно отправить запрос «Прекратить работу», который означает, что они должны прервать выполняемое задание, прибраться за собой и как можно скорее завершиться.

В этой главе мы рассмотрим различные механизмы управления потоками и задачами и начнем с автоматического выбора числа потоков и распределения задач между ними.

9.1. Пулы потоков

Во многих компаниях сотрудники, которые обычно работают в офисе, время от времени должны выезжать к клиентам или поставщикам, посещать выставки и конференции. Хотя такие поездки могут считаться обязательными, и в любой день в командировке может находиться сразу несколько человек, но для любого конкретного работника промежуток между поездками может составлять месяцы, а то и годы. В таких условиях резервировать машину для каждого работника было бы дорого и непрактично, поэтому компании содержат парк, или пул машин, то есть ограниченное количество машин, предоставляемых в распоряжении всем работникам. Когда работнику нужно съездить в командировку, он заказывает машину на определенное время, а по завершении поездки возвращает ее в общий пул. Если в какой-то день свободных машин в пуле не оказалось, то командировку придется перенести на другой день.

В основе пула потоков лежит аналогичная идея, только в общее распоряжение предоставляются не машины, а потоки. Во многих системах бессмысленно заводить отдельный поток для каждой задачи, которая потенциально может выполняться одновременно с другими, но тем не менее хотелось бы воспользоваться преимуществами параллелизма там, где это возможно. Именно это и позволяет сделать пул потоков: задачи, которые в принципе могут выполняться параллельно, отправляются в пул, а тот ставит их в очередь ожидающих работ. Затем один из рабочих потоков забирает задачу из очереди, исполняет ее и идет за следующей задачей.

При разработке пула потоков нужно принять несколько важных проектных решений, например: сколько потоков будет в пуле, как эффективнее всего назначать потоки задачам, можно ли будет ждать завершения задачи. В этом разделе мы рассмотрим несколько реализаций пула потоков, начав с самого простого.

9.1.1. Простейший пул потоков

В простейшем случае пул состоит из фиксированного числа рабочих потоков (обычно равного значению, которое возвращает функция

std::thread::hardware_concurrency()
). Когда у программы появляется какая-то работа, она вызывает функцию, которая помещает эту работу в очередь. Рабочий поток забирает работу из очереди, выполняет указанную в ней задачу, после чего проверяет, есть ли в очереди другие работы. В этой реализации никакого механизма ожидания завершения задачи не предусмотрело. Если это необходимо, то вы должны будете управлять синхронизацией самостоятельно.

В следующем листинге приведена реализация такого пула потоков.


Листинг 9.1. Простой пул потоков

class thread_pool {

 std::atomic_bool done;

 thread_safe_queue> work_queue;←
(1)

 std::vector threads; ←
(2)

 join_threads joiner; ←
(3)


 void worker_thread() {

  while (!done) { ←
(4)

   std::function task;

   if (work_queue.try_pop(task)) { ←
(5)

    task(); ←
(6)

   } else {

    std::this_thread::yield(); ←
(7)

   }

  }

 }


public:

 thread_pool():

  done(false), joiner(threads) {

  unsigned const thread_count =

   std::thread::hardware_concurrency();←
(8)

  try {

   for (unsigned i = 0; i < thread_count; ++i) {

    threads.push_back(

     std::thread(&thread_pool::worker_thread, this)); ←
(9)

   }

  } catch (...) {

   done = true; ←
(10)

   throw;

  }

 }


 ~thread_pool() {

  done = true; ←
(11)

 }


 template

 void submit(FunctionType f) {

  work_queue.push(std::function(f)); ←
(12)

 }

};

Здесь мы определили вектор рабочих потоков (2) и используем одну из потокобезопасных очередей из главы 6 (1) для хранения очереди работ. В данном случае пользователь не может ждать завершения задачи, а задача не может возвращать значения, поэтому для инкапсуляции задач можно использовать тип

std::function
. Функция
submit()
обертывает переданную функцию или допускающий вызов объект в объект
std::function
и помещает его в очередь (12).

Потоки запускаются в конструкторе; их количество равно значению, возвращаемому функцией

std::thread::hardware_concurrency()
, то есть мы создаем столько потоков, сколько может поддержать оборудование (8). Все эти потоки исполняют функцию-член нашего класса
worker_thread()
(9).

Запуск потока может завершиться исключением, поэтому необходимо позаботиться о том, чтобы уже запущенные к этому моменту потоки корректно завершались. Для этого мы включили блок

try-catch
, который в случае исключения поднимает флаг
done
(10). Кроме того, мы воспользовались классом
join_threads
из главы 8 (3), чтобы обеспечить присоединение всех потоков. То же самое происходит в деструкторе: мы просто поднимаем флаг
done
(11), а объект
join_threads
гарантирует, что потоки завершатся до уничтожения пула. Отметим, что порядок объявления членов важен: и флаг
done
и объект
worker_queue
должны быть объявлены раньше вектора
threads
, который, в свою очередь, должен быть объявлен раньше
joiner
. Только тогда деструкторы членов класса будут вызываться в правильном порядке; в частности, нельзя уничтожать очередь раньше, чем остановлены все потоки.

Сама функция

worker_thread
проста до чрезвычайности: в цикле, который продолжается, пока не поднят флаг
done
(4), она извлекает задачи из очереди (5) и выполняет их (6). Если в очереди нет задач, функция вызывает
std::this_thread::yield()
(7), чтобы немного отдохнуть и дать возможность поработать другим потокам.

Часто даже такого простого пула потоков достаточно, особенно если задачи независимы, не возвращают значений и не выполняют блокирующих операций. Но бывает и по-другому: во-первых, у программы могут быть более жесткие требования, а, во-вторых, в таком пуле возможны проблемы, в частности, из-за взаимоблокировок. Кроме того, в простых случаях иногда лучше прибегнуть к функции

std::async
, как неоднократно демонстрировалось в главе 8. В этой главе мы рассмотрим и более изощренные реализации пула потоков с дополнительными возможностями, которые призваны либо удовлетворить особые потребности пользователя, либо уменьшить количество потенциальных ошибок. Для начала разрешим ожидать завершения переданной пулу задачи.

9.1.2. Ожидание задачи, переданной пулу потоков

В примерах из главы 8, где потоки запускались явно, главный поток после распределения работы между потоками всегда ждал завершения запущенных потоков. Тем самым гарантировалось, что вызывающая программа получит управление только после полного завершения задачи. При использовании пула потоков ждать нужно завершения задачи, переданной пулу, а не самих рабочих потоков. Это похоже на то, как мы ждали будущих результатов при работе с

std::async
в главе 8. В случае простого пула потоков, показанного в листинге 9.1, организовывать ожидание придется вручную, применяя механизмы, описанные в главе 4: условные переменные и будущие результаты. Это усложняет код; намного удобнее было бы ждать задачу напрямую.

За счет переноса сложности в сам пул потоков мы сумеем добиться желаемого. Функция

submit()
могла бы возвращать некий описатель задачи, по которому затем можно было бы ждать ее завершения. Сам описатель должен был бы инкапсулировать условную переменную или будущий результат. Это упростило бы код, пользующийся пулом потоков.

Частный случай ожидания завершения запущенной задачи возникает, когда главный поток нуждается в вычисленном ей результате. Мы уже встречались с такой ситуацией выше, например, в функции

parallel_accumulate()
из главы 2. В таком случае путем использования будущих результатов мы можем объединить ожидание с передачей результата. В листинге 9.2 приведен код модифицированного пула потоков, который разрешает ожидать завершения задачи и передает возвращенный ей результат ожидающему потоку. Поскольку экземпляры класса
std::packaged_task<>
допускают только перемещение, но не копирование, мы больше не можем воспользоваться классом
std::function<>
для обертывания элементов очереди, потому что
std::function<>
требует, чтобы в обернутых объектах-функциях был определён копирующий конструктор. Вместо этого мы напишем специальный класс-обертку, умеющий работать с объектами, обладающими только перемещающим конструктором. Это простой маскирующий тип класс (type-erasure class), в котором определён оператор вызова. Нам нужно поддержать функции, которые не принимают параметров и возвращают
void
, поэтому оператор всего лишь вызывает виртуальный метод
call()
, который в свою очередь вызывает обернутую функцию.


Листинг 9.2. Пул потоков, ожидающий завершения задачи

class function_wrapper {

 struct impl_base {

  virtual void call() = 0;

  virtual ~impl_base() {}

 };


 std::unique_ptr impl;


 template

 struct impl_type: impl_base {

  F f;

  impl_type(F&& f_): f(std::move(f_)) {}

  void call() { f(); }

 };


public:

 template function_wrapper(F&& f):

  impl(new impl_type(std::move(f))) {}


 void operator()() { impl->call(); }


 function_wrapper() = default;


 function_wrapper(function_wrapper&& other):

  impl(std::move(other.impl)) {}


 function_wrapper& operator=(function_wrapper&& other) {

  impl = std::move(other.impl);

  return *this;

 }


 function_wrapper(const function_wrapper&) = delete;

 function_wrapper(function_wrapper&) = delete;

 function_wrapper& operator=(const function_wrapper&) = delete;

};


class thread_pool {

 thread_safe_queue work_queue;←┐

Используем

 void worker_thread()                            │
function_

 {                                               │
wrapper

  while (!done)                                  │
вместо std::

  {                                              │
function

   function_wrapper task;                       ←┘

   if (work_queue.try_pop(task))

    task();

   else

    std::this_thread::yield();

  }

 }


public:

 template

 std::future::type>←
(1)

 submit(FunctionType f) {

  typedef typename std::result_of::type

   result_type;                                        ←
(2)

  std::packaged_task task(std::move(f));←
(3)

  std::future res(task.get_future());     ←
(4)

  work_queue.push(std::move(task));                    ←
(5)

  return res;                                          ←
(6)

 }


 // остальное, как и раньше

};

Прежде всего отметим, что модифицированная функция

submit()
(1) возвращает объект
std::future<>
, который будет содержать возвращенное задачей значение и позволит вызывающей программе ждать ее завершения. Для этого нам необходимо знать тип значения, возвращаемого переданной функцией
f
, и здесь на помощь приходит шаблон
std::result_of<>
:
std::result_of::type
— это тип результата, возвращенного вызовом объекта типа
FunctionType
(например,
f
) без аргументов. Выражение
std::result_of<>
мы используем также в определении псевдонима типа
result_type
(2) внутри функции.

Затем

f
обертывается объектом
std::packaged_task
(3), потому что
f
— функция или допускающий вызов объект, который не принимает параметров и возвращает результат типа
result_type
. Теперь мы можем получить будущий результат из
std::packaged_task<>
(4), перед тем как помещать задачу в очередь (5) и возвращать будущий результат (6). Отметим, что при помещении задачи в очередь мы должны использовать функцию
std::move()
, потому что класс std::packaged_task<> не допускает копирования. Именно поэтому в очереди хранятся объекты
function_wrapper
, а не объекты типа.

Этот пул позволяет ожидать завершения задач и получать возвращаемые ими результаты. В листинге ниже показано, как выглядит функция

parallel_accumulate
, работающая с таким пулом потоков.


Листинг 9.3. Функция

parallel_accumulate
, реализованная с помощью пула потоков, допускающего ожидание задач

template

T parallel_accumulate(Iterator first, Iterator last, T init) {

 unsigned long const length = std::distance(first, last);


 if (!length)

  return init;


 unsigned long const block_size = 25;

 unsigned long const num_blocks =

  (length + block_size - 1) / block_size; ←
(1)


 std::vector> futures(num_blocks-1);

 thread_pool pool;


 Iterator block_start = first;

 for (unsigned long i = 0; i < (num_blocks - 1); ++i) {

  Iterator block_end = block_start;

  std::advance(block_end, block_size);

  futures[i] = pool.submit(accumulate_block());←
(2)

  block_start = block_end;

 }

 T last_result =

  accumulate_block()(block_start, last);

 T result = init;

 for (unsigned long i = 0; i < (num_blocks - 1); ++i) {

  result += futures[i].get();

 }

 result += last_result;

 return result;

}

Сравнивая этот код с листингом 8.4, следует обратить внимание на две вещи. Во-первых, мы работаем с количеством блоков (

num_blocks
(1)), а не потоков. Чтобы в полной мере воспользоваться масштабируемостью пула потоков, мы должны разбить работу на максимально мелкие блоки, с которыми имеет смысл работать параллельно. Если потоков в пуле немного, то каждый поток будет обрабатывать много блоков, но по мере роста числа потоков, поддерживаемых оборудованием, будет расти и количество блоков, обрабатываемых параллельно.

Но, выбирая «максимально мелкие блоки, с которыми имеет смысл работать параллельно», будьте осторожны. Отправка задачи пулу потоков, выбор ее рабочим потоком из очереди и передача возвращенного значения с помощью

std::future<>
— всё это операции не бесплатные, и для совсем мелких задач они не окупятся. Если размер задачи слишком мал, то программа, в которой используется пул потоков, может работать медленнее, чем однопоточная.

В предположении, что размер блока выбран разумно, вам не надо заботиться об упаковке задач, получении будущих результатов и хранении объектов

std::thread
, чтобы впоследствии их можно было присоединить; все это пул берет на себя. Вам остается лишь вызвать функцию
submit()
, передав ей свою задачу (2).

Пул потоков обеспечивает также безопасность относительно исключений. Любое возбужденное задачей исключение передается через будущий результат, возвращенный

submit()
, а, если выход из функции происходит в результате исключения, то деструктор пула потоков снимет еще работающие задачи и дождется завершения потоков, входящих в пул.

Эта схема работает для простых случаев, когда задачи независимы. Но не годится, когда одни задачи зависят от других, также переданных пулу.

9.1.3. Задачи, ожидающие других задач

В этой книге я уже неоднократно приводил пример алгоритма Quicksort. Его идея проста — подлежащие сортировке данные разбиваются на две части: до и после опорного элемента (в смысле заданной операции сравнения). Затем обе части рекурсивно сортируются и объединяются для получения полностью отсортированной последовательности. При распараллеливании алгоритма надо позаботиться о том, чтобы рекурсивные вызовы задействовали имеющийся аппаратный параллелизм.

В главе 4, где этот пример впервые был представлен, мы использовали

std::async
для выполнения одного из рекурсивных вызовов на каждом шаге и оставляли библиотеке решение о том, запускать ли новый поток или сортировать синхронно при обращении к
get()
. Этот подход неплохо работает — каждая задача либо выполняется в отдельном потоке, либо в тот момент, когда нужны ее результаты.

В главе 8 мы переработали эту реализацию, продемонстрировав альтернативный подход, когда количество потоков фиксировано и определяется уровнем аппаратного параллелизма. В данном случае мы воспользовались стеком ожидающих сортировки блоков. Разбивая на части предложенные для сортировки данные, каждый поток помещал один блок в стек, а второй сортировал непосредственно. Бесхитростное ожидание завершения сортировки второго блока могло бы закончиться взаимоблокировкой, потому что число потоков ограничено, и некоторые из них ждут. Очень легко оказаться в ситуации, когда все потоки ждут завершения сортировки блоков, и ни один ничего не делает. Тогда мы решили эту проблему, заставив поток извлекать блоки из стека и сортировать их, пока тот конкретный блок, которого он ждет, еще не отсортирован.

Точно такая же проблема возникает при использовании одного из рассмотренных выше простых пулов потоков вместо

std::async
, как в примере из главы 4. Число потоков тоже ограничено, и может случиться так, что все они будут ждать задач, которые еще запланированы, так как нет свободных потоков. И решение должно быть таким же,

как в главе 8: обрабатывать стоящие в очереди блоки во время ожидания завершения сортировки своего блока. Но если мы применяем пул потоков для управления списком задач и их ассоциациями с потоками — а именно в этом и состоит смысл пула потоков, — то доступа к списку задач у нас нет. Поэтому необходимо модифицировать сам пул, чтобы он делал это автоматически.

Проще всего будет добавить в класс

thread_pool
новую функцию, чтобы исполнять задачу из очереди и управлять циклом самостоятельно. Так мы и поступим. Более развитые реализации пула могли бы включить дополнительную логику в функцию ожидания или добавить другие функции ожидания для обработки этого случая, быть может, даже назначая приоритеты ожидаемым задачам. В листинге ниже приведена новая функция
run_pending_task()
, а модифицированный алгоритм Quicksort, в котором она используется, показан в листинге 9.5.


Листинг 9.4. Реализация функции

run_pending_task()

void thread_pool::run_pending_task() {

 function_wrapper task;

 if (work_queue.try_pop(task)) {

  task();

 } else {

  std::this_thread::yield();

 }

}

Код

run_pending_task()
вынесен из главного цикла внутри функции
worker_thread()
, которую теперь можно будет изменить, так чтобы она вызывала
run_pending_task()
. Функция
run_pending_task()
пытается получить задачу из очереди и в случае успеха выполняет ее; если очередь пуста, то функция уступает управление ОС, чтобы та могла запланировать другой поток. Показанная ниже реализация Quicksort гораздо проще, чем версия в листинге 8.1, потому что вся логика управления потоками перенесена в пул.


Листинг 9.5. Реализация Quicksort на основе пула потоков

template

struct sorter {    ←
(1)

 thread_pool pool; ←
(2)


 std::list do_sort(std::list& chunk_data) {

  if (chunk_data.empty()) {

   return chunk_data;

  }


  std::list result;

  result.splice(result.begin(), chunk_data, chunk_data.begin());

  T const& partition_val = *result.begin();


  typename std::list::iterator divide_point =

   std::partition(chunk_data.begin(), chunk_data.end(),

    [&](T const& val){ return val < partition_val; });


  std::list new_lower_chunk;

  new_lower_chunk.splice(new_lower_chunk.end(),

   chunk_data, chunk_data.begin(),

   divide_point);


  std::future> new_lower = ←
(3)

  pool.submit(std::bind(&sorter::do_sort, this,

   std::move(new_lower_chunk)));


  std::list new_higher(do_sort(chunk_data));

  result.splice(result.end(), new_higher);

  while (!new_lower.wait_for(std::chrono::seconds(0)) ==

   std::future_status::timeout) {

   pool.run_pending_task(); ←
(4)

  }


  result.splice(result.begin(), new_lower.get());

  return result;

 }

};


template

std::list parallel_quick_sort(std::list input) {

 if (input.empty()) {

  return input;

 }


 sorter s;

 return s.do_sort(input);

}

Как и в листинге 8.1, реальная работа делегируется функции-члену

do_sort()
шаблона класса
sorter
(1), хотя в данном случае этот шаблон нужен лишь для обертывания экземпляра
thread_pool
(2).

Управление потоками и задачами теперь свелось к отправке задачи пулу (3) и исполнению находящихся в очереди задач в цикле ожидания (4). Это гораздо проще, чем в листинге 8.1, где нужно было явно управлять потоками и стеком подлежащих сортировке блоков. При отправке задачи пулу мы используем функцию

std::bind()
, чтобы связать указатель
this
с
do_sort()
и передать подлежащий сортировке блок. В данном случае мы вызываем
std::move()
, чтобы данные
new_lower_chunk
перемещались, а не копировались.

Мы решили проблему взаимоблокировки, возникающую из- за того, что одни потоки ждут других, но этот пул все еще далек от идеала. Отметим хотя бы, что все вызовы

submit()
и
run_pending_task()
обращаются к одной и той же очереди. В главе 8 мы видели, что модификация одного набора данных из разных потоков может негативно сказаться на производительности, стало быть, с этим нужно что-то делать.

9.1.4. Предотвращение конкуренции за очередь работ

Всякий раз, как поток вызывает функцию

submit()
экземпляра пула потоков, он помещает новый элемент в единственную разделяемую очередь работ. А рабочие потоки постоянно извлекают элементы из той же очереди. Следовательно, по мере увеличения числа процессоров будет возрастать конкуренция за очередь работ. Это может ощутимо отразиться на производительности; даже при использовании свободной от блокировок очереди, в которой нет явного ожидания, драгоценное время может тратиться на перебрасывание кэша.

Чтобы избежать перебрасывания кэша, мы можем завести по одной очереди работ на каждый поток. Тогда каждый поток будет помещать новые элементы в свою собственную очередь и брать работы из глобальной очереди работ только тогда, когда в его очереди работ нет. В следующем листинге приведена реализация с использованием переменной типа

thread_local
, благодаря которой каждый поток обладает собственной очередью работ в дополнение к глобальной.


Листинг 9.6. Пул с очередями в поточно-локальной памяти

class thread_pool {

 thread_safe_queue pool_work_queue;


 typedef std::queue local_queue_type;←
(1)

 static thread_local std::unique_ptr

  local_work_queue; ←
(2)


 void worker_thread() {

  local_work_queue.reset(new local_queue_type);←
(3)

  while (!done) {

   run_pending_task();

  }

 }


public:

 template

 std::future::type>

 submit(FunctionType f) {

  typedef typename std::result_of::type

   result_type;


  std::packaged_task task(f);

  std::future res(task.get_future());

  if (local_work_queue) { ←
(4)

   local_work_queue->push(std::move(task));

  } else {

   pool_work_queue.push(std::move(task)); ←
(5)

  }

  return res;

 }


 void run_pending_task() {

  function_wrapper task;

  if (local_work_queue && !local_work_queue->empty()) {←
(6)

   task = std::move(local_work_queue->front());

   local_work_queue->pop();

   task();

  } else if(pool_work_queue.try_pop(task)) { ←
(7)

   task();

  } else {

   std::this_thread::yield();

  }

 }


 // остальное, как и раньше

};

Для хранения очереди работ в поточно-локальной памяти мы воспользовались указателем

std::unique_ptr<>
(2), потому что не хотим, чтобы у потоков, не входящих в пул, была очередь; этот указатель инициализируется в функции
worker_thread()
до начала цикла обработки (3). Деструктор
std::unique_ptr<>
позаботится об уничтожении очереди работ по завершении потока.

Затем функция

submit()
проверяет, есть ли у текущего потока очередь работ (4). Если есть, то это поток из пула, и мы можем поместить задачу в локальную очередь. В противном случае задачу следует помещать в очередь пула, как и раньше (5).

Аналогичная проверка имеется в функции

run_pending_task()
(6), только на этот раз нужно еще проверить, есть ли что-нибудь в локальной очереди. Если есть, то можно извлечь элемент из начала очереди и обработать его. Обратите внимание, что локальная очередь может быть обычным объектом
std::queue<>
(1), так как к ней обращается только один поток. Если в локальной очереди задач нет, то мы проверяем очередь пула, как и раньше (7).

Таким образом мы действительно уменьшаем конкуренцию, но если распределение работ неравномерно, то может случиться, что в очереди одного потока скопится много задач, тогда как остальным будет нечем заняться. Например, в случае Quicksort только самый первый блок попадает в очередь пула, а остальные окажутся в локальной очереди того потока, который этот блок обработал. Это сводит на нет всю идею пула потоков.

К счастью, у этой проблемы есть решение: позволить потоку занимать (steal) работы из очередей других потоков, если ни в его собственной, ни в глобальной очереди ничего нет.

9.1.5. Занимание работ

Если мы хотим, чтобы «безработный» поток мог брать работы из очереди другого потока, то эта очередь должна быть доступна занимающему потоку в

run_pending_tasks()
. Для этого каждый поток должен зарегистрировать свою очередь в пуле или получать очередь от пула. Кроме того, необходимо позаботиться о надлежащей синхронизации и защите очереди работ, чтобы не нарушались инварианты.

Можно написать свободную от блокировок очередь, которая позволит потоку-владельцу помещать и извлекать элементы с одного конца, а другим потокам — занимать элементы с другого конца, однако реализация такой очереди выходит за рамки данной книги. Чтобы продемонстрировать идею, мы поступим проще — воспользуемся мьютексом для защиты данных очереди. Мы надеемся, что занимание работ — редкое событие, поэтому конкуренция за мьютекс будет невелика, и накладные расходы на такую очередь окажутся минимальны. Ниже приведена простая реализация с блокировками.


Листинг 9.7. Очередь с блокировкой, допускающей занимание работ

class work_stealing_queue {

private:

 typedef function_wrapper data_type;

 std::deque the_queue; ←
(1)

 mutable std::mutex the_mutex;


public:

 work_stealing_queue() {}

 work_stealing_queue(const work_stealing_queue& other)=delete;

 work_stealing_queue& operator=(

  const work_stealing_queue& other)=delete;


 void push(data_type data) { ←
(2)

  std::lock_guard lock(the_mutex);

  the_queue.push_front(std::move(data));

 }


 bool empty() const {

  std::lock_guard lock(the_mutex);

  return the_queue.empty();

 }


 bool try_pop(data_type& res) { ←
(3)

  std::lock_guard lock(the_mutex);

  if (the_queue.empty()) {

   return false;

  }


  res = std::move(the_queue.front());

  the_queue.pop_front();

  return true;

 }


 bool try_steal(data_type& res) { ←
(4)

  std::lock_guard lock(the_mutex);

  if (the_queue.empty()) {

   return false;

  }


  res = std::move(the_queue.back());

  the_queue.pop_back();

  return true;

 }

};

Этот класс является простой оберткой вокруг

std::deque
(1), которая защищает все операции доступа к очереди с помощью мьютекса. Функции
push()
(2) и
try_ pop()
(3) работают с началом очереди, а функция
try_steal()
 — с концом (4).

Получается, что эта «очередь» для потока-владельца на самом деле является стеком, обслуживаемым согласно дисциплине «последним пришёл, первым обслужен», — задача, которая была помещена последней, извлекается первой. С точки зрения кэш-памяти это даже может повысить производительность, так как относящиеся к последней задаче данные с большей вероятностью окажутся в кэше, чем данные, относящиеся к предыдущей задаче. К тому же, такая дисциплина прекрасно подходит для алгоритмов типа Quicksort. В предшествующих реализациях каждое обращение к

do_sort()
помещает элемент в очередь, а затем ждет его. Обрабатывая последний помещенный в очередь элемент первым, мы гарантируем, что блок, необходимый текущему вызову для завершения работы, будет обработан раньше блоков, нужных другим ветвям, а, значит, уменьшается как количество активных задач, так и занятый размер стека. Функция
try_steal()
извлекает элементы из противоположного по сравнению с
try_pop()
конца очереди, чтобы минимизировать конкуренцию; в принципе, можно было бы применить технику, обсуждавшуюся в главах 6 и 7, чтобы поддержать одновременные обращения к
try_pop()
и
try_steal()
.

Итак, теперь у нас есть замечательная очередь работ, допускающая занимание. Но как воспользоваться ей в пуле потоков? Ниже приведена одна из возможных реализаций.


Листинг 9.8. Пул потоков с использованием занимания работ

class thread_pool {

 typedef function_wrapper task_type;


 std::atomic_bool done;

 thread_safe_queue pool_work_queue;

 std::vector> queues;←
(1)

 std::vector threads;

 join_threads joiner;


 static thread_local work_stealing_queue* local_work_queue;←
(2)

 static thread_local unsigned my_index;


 void worker_thread(unsigned my_index_) {

  my_index = my_index_;

  local_work_queue = queues[my_index].get(); ←
(3)

  while (!done) {

   run_pending_task();

  }

 }


 bool pop_task_from_local_queue(task_type& task) {

  return local_work_queue && local_work_queue->try_pop(task);

 }


 bool pop_task_from_pool_queue(task_type& task) {

  return pool_work_queue.try_pop(task);

 }


 bool pop_task_from_other_thread_queue(task_type& task) { ←
(4)

  for (unsigned i = 0; i < queues.size(); ++i) {

   unsigned const index = (my_index + i + 1) % queues.size();←
(5)

   if (queues[index]->try_steal(task)) {

    return true;

   }

  }


  return false;

 }


public:

 thread_pool() :

  done(false), joiner(threads) {

  unsigned const thread_count =

   std::thread::hardware_concurrency();

  try {

   for (unsigned i = 0; i < thread_count; ++i) {

    queues.push_back(std::unique_ptr (←
(6)

     new work_stealing_queue));

    threads.push_back(

     std::thread(&thread_pool::worker_thread, this, i));

   }

  } catch (...) {

   done = true;

   throw;

  }

 }


 ~thread_pool() {

  done = true;

 }


 template

 std::future::type> submit(

  FunctionType f) {

  typedef typename std::result_of::type

   result_type;


  std::packaged_task task(f);

  std::future res(task.get_future());

  if (local_work_queue) {

   local_work_queue->push(std::move(task));

  } else {

   pool_work_queue.push(std::move(task));

  }

  return res;

 }


 void run_pending_task() {

  task_type task;

  if (pop_task_from_local_queue(task) || ←
(7)

      pop_task_from_pool_queue (task) || ←
(8)

      pop_task_from_other_thread_queue(task)) { ←
(9)

   task();

  } else {

   std::this_thread::yield();

  }

 }

};

Этот код очень похож на код из листинга 9.6. Первое отличие состоит в том, что локальная очередь каждого потока — объект класса

work_stealing_queue
, а не просто
std::queue<>
(2). Новый поток не выделяет очередь для себя самостоятельно; это делает конструктор пула потоков (6), и он же сохраняет новую очередь в списке очередей для данного пула (1). Индекс очереди в списке передаётся функции потока и используется затем для получения указателя на очередь (3). Это означает, что пул потоков может получить доступ к очереди, когда пытается занять задачу для потока, которому нечего делать. Новая версия
run_pending_task()
сначала пытается получить задачу из очереди исполняемого потока (7), затем из очереди пула (8) и, наконец, из очереди другого потока (9).

Функция

pop_task_from_other_thread_queue()
(4) обходит очереди, принадлежащие всем потокам пула, пытаясь занять задачу у каждой. Чтобы не случилось так, что все потоки занимают задачи у первого потока в списке, каждый поток начинает просмотр с позиции, равной его собственному индексу (5).

Теперь у нас имеется пул потоков, пригодный для самых разных целей. Разумеется, есть масса способов улучшить его для работы в конкретной ситуации, но это я оставляю в качестве упражнения для читателя. В частности, мы совсем не исследовали идею динамического изменения размера пула, так чтобы обеспечить оптимальное использование процессоров, даже когда потоки блокированы в ожидании какого-то события, например, завершения ввода/вывода или освобождения мьютекса.

Следующим в нашем списке «продвинутых» приёмов управления потоками стоит прерывание потоков.

9.2. Прерывание потоков