Параллельное программирование на С++ в действии — страница 12 из 53

В этой главе мы рассмотрели, к каким печальным последствиям могут приводить проблематичные гонки, когда возможно разделение данных между потоками, и как с помощью класса

std::mutex
и тщательного проектирования интерфейса этих неприятностей можно избежать. Мы видели, что мьютексы — не панацея, поскольку им свойственны собственные проблемы в виде взаимоблокировки, хотя стандартная библиотека С++ содержит средство, позволяющее избежать их — класс
std::lock()
. Затем мы обсудили другие способы избежать взаимоблокировок и кратко обсудили передачу владения блокировкой и вопросы, касающиеся выбора подходящего уровня гранулярности блокировки. Наконец, я рассказал об альтернативных механизмах защиты данных, применяемых в специальных случаях:
std::call_once()
и
boost::shared_mutex
.

А вот чего мы пока не рассмотрели, так это ожидание поступления входных данных из других потоков. Наш потокобезопасный стек просто возбуждает исключение при попытке извлечения из пустого стека. Поэтому если один поток хочет дождаться, пока другой поток поместит в стек какие-то данные (а это, собственно, и есть основное назначение потокобезопасного стека), то должен будет раз за разом пытаться извлечь значение, повторяя попытку в случае исключения. Это приводит лишь к бесцельной трате процессорного времени на проверку; более того, такая повторяющаяся проверка может замедлить работу программы, поскольку не дает выполняться другим потокам. Нам необходим какой-то способ, который позволил бы одному потоку ждать завершения операции в другом потоке, не потребляя процессорное время. В главе 4, которая опирается на рассмотренные выше средства защиты разделяемых данных, мы познакомимся с различными механизмами синхронизации операций между потоками в С++, а в главе 6 увидим, как с помощью этих механизмов можно строить более крупные структуры данных, допускающие повторное использование.

Глава 4.Синхронизация параллельных операций

В этой главе:

■ Ожидание события.

■ Ожидание однократного события с будущими результатами

■ Ожидание с ограничением по времени.

■ Использование синхронизации операций для упрощения программы.

В предыдущей главе мы рассмотрели различные способы защиты данных, разделяемых между потоками. Но иногда требуется не только защитить данные, но и синхронизировать действия, выполняемые в разных потоках. Например, возможно, что одному потоку перед тем как продолжить работу, нужно дождаться, пока другой поток завершит какую-то операцию. В общем случае, часто возникает ситуация, когда поток должен ожидать какого-то события или истинности некоторого условия. Конечно, это можно сделать, периодически проверяя разделяемый флаг «задача завершена» или что-то в этом роде, но такое решение далеко от идеала. Необходимость в синхронизации операций — настолько распространенный сценарий, что в стандартную библиотеку С++ включены специальные механизмы для этой цели — условные переменные и будущие результаты (future).

В этой главе мы рассмотрим, как реализуется ожидание событий с помощью условных переменных и будущих результатов и как ими можно воспользоваться, чтобы упростить синхронизацию операций.

4.1. Ожидание события или иного условия

Представьте, что вы едете на поезде ночью. Чтобы не пропустить свою станцию, можно не спать всю ночь и читать названия всех пунктов, где поезд останавливается. Так вы, конечно, не проедете мимо, но сойдете с поезда сильно уставшим. Есть и другой способ — заранее посмотреть в расписании, когда поезд прибывает в нужный вам пункт, поставить будильник и улечься спать. Так вы тоже свою остановку не пропустите, но если поезд задержится в пути, то проснётесь слишком рано. И еще одно — если в будильнике сядут батарейки, то вы можете проспать и проехать мимо нужной станции. В идеале хотелось бы, чтобы кто-то или что-то разбудило вас, когда поезд подъедет к станции, — не раньше и не позже.

Какое отношение всё это имеет к потокам? Самое непосредственное — если один поток хочет дождаться, когда другой завершит некую операцию, то может поступить несколькими способами. Во-первых, он может просто проверять разделяемый флаг (защищенный мьютексом), полагая, что второй поток поднимет этот флаг, когда завершит свою операцию. Это расточительно но двум причинам: на опрос флага уходит процессорное время, и мьютекс, захваченный ожидающим потоком, не может быть захвачен никаким другим потоком. То и другое работает против ожидающего потока, поскольку ограничивает ресурсы, доступные потоку, которого он так ждет, и даже не дает ему возможность поднять флаг, когда работа будет завершена. Это решение сродни бодрствованию всю ночь, скрашиваемому разговорами с машинистом: он вынужден вести поезд медленнее, потому что вы его постоянно отвлекаете, и, значит, до пункта назначения вы доберетесь позже. Вот и ожидающий поток потребляет ресурсы, которые пригодились бы другим потокам, в результате чего ждет дольше, чем необходимо.

Второй вариант — заставить ожидающий поток спать между проверками с помощью функции

std::this_thread::sleep_for()
(см. раздел 4.3):

bool flag;

std::mutex m;


void wait_for_flag() {

 std::unique_lock lk(m); ←
(1) Освободить мьютекс

 while (!flag) {

  lk.unlock(); ←
(2) Спать 100 мс

  std::this_thread::sleep_for(std::chrono::milliseconds(100));

  lk.lock();   ←
(3) Снова захватить мьютекс

 }

}

В этом цикле функция освобождает мьютекс (1) перед тем, как заснуть (2), и снова захватывает его, проснувшись, (3), оставляя другому потоку шанс захватить мьютекс и поднять флаг.

Это уже лучше, потому что во время сна поток не расходует процессорное время. Но трудно выбрать подходящий промежуток времени. Если он слишком короткий, то поток все равно впустую тратит время на проверку; если слишком длинный — то поток будет спать и после того, как ожидание завершилось, то есть появляется ненужная задержка. Редко бывает так, что слишком длительный сон прямо влияет на работу программу, но в динамичной игре это может привести к пропуску кадров, а в приложении реального времени — к исчерпанию выделенного временного кванта.

Третий — и наиболее предпочтительный - способ состоит в том, чтобы воспользоваться средствами из стандартной библиотеки С++, которые позволяют потоку ждать события. Самый простой механизм ожидания события, возникающего в другом потоке (например, появления нового задания в упоминавшемся выше конвейере), дают условные переменные. Концептуально условная переменная ассоциирована с каким-то событием или иным условием, причём один или несколько потоков могут ждать, когда это условие окажется выполненным. Если некоторый поток решит, что условие выполнено, он может известить об этом один или несколько потоков, ожидающих условную переменную, в результате чего они возобновят работу.

4.1.1. Ожидание условия с помощью условных переменных

Стандартная библиотека С++ предоставляет не одну, а две реализации условных переменных:

std::condition_variable
и
std::condition_variable_any
. Оба класса объявлены в заголовке
. В обоих случаях для обеспечения синхронизации необходимо взаимодействие с мьютексом; первый класс может работать только с
std::mutex
, второй — с любым классом, который отвечает минимальным требованиям к «мьютексоподобию», отсюда и суффикс
_any
. Поскольку класс
std::condition_variable_any
более общий, то его использование может обойтись дороже с точки зрения объема потребляемой памяти, производительности и ресурсов операционной системы. Поэтому, если дополнительная гибкость не требуется, то лучше ограничиться классом
std::condition_variable
.

Ну и как же воспользоваться классом

std::condition_variable
в примере, упомянутом во введении, — как сделать, чтобы поток, ожидающий работу, спал, пока не поступят данные? В следующем листинге приведён пример реализации с использованием условной переменной.


Листинг 4.1. Ожидание данных с помощью

std::condition_variable

std::mutex mut;

std::queue data_queue; ←
(1)

std::condition_variable data_cond;


void data_preparation_thread() {

 while (more_data_to_prepare()) {

  data_chunk const data = prepare_data();

  std::lock_guard lk(mut);

  data_queue.push(data);  ←
(2)

  data_cond.notify_one(); ←
(3)

 }

}


void data_processing_thread() {

 while(true) {

  std::unique_lock lk(mut); ←
(4)

  data_cond.wait(

   lk, []{ return !data_queue.empty(); }); ←
(5)

  data_chunk data = data_queue.front();

  data_queue.pop();

  lk.unlock(); ←
(6)

  process(data);

  if (is_last_chunk(data))

   break;

 }

}

Итак, мы имеем очередь (1), которая служит для передачи данных между двумя потоками. Когда данные будут готовы, поток, отвечающий за их подготовку, помещает данные в очередь, предварительно захватив защищающий ее мьютекс с помощью

std::lock_guard
. Затем он вызывает функцию-член
notify_one()
объекта
std::condition_variable
, чтобы известить ожидающий поток (если таковой существует) (3).

По другую сторону забора находится поток, обрабатывающий данные. Он в самом начале захватывает мьютекс, но с помощью

std::unique_lock
, а не
std::lock_guard
(4) — почему, мы скоро увидим. Затем поток вызывает функцию-член
wait()
объекта
std::condition_variable
, передавая ей объект-блокировку и лямбда-функцию, выражающую ожидаемое условие (5). Лямбда-функции — это нововведение в С++11, они позволяют записать анонимную функцию как часть выражения и идеально подходят для задания предикатов для таких стандартных библиотечных функций, как
wait()
. В данном случае простая лямбда-функция
[]{ return !data_queue.empty(); }
проверяет, что очередь
data_queue
не пуста (вызывая ее метод
empty()
), то есть что в ней имеются данные для обработки. Подробнее лямбда-функции описаны в разделе А.5 приложения А.

Затем функция

wait()
проверяет условие (вызывая переданную лямбда-функцию) и возвращает управление, если оно выполнено (то есть лямбда-функция вернула
true
). Если условие не выполнено (лямбда-функция вернула
false
), то
wait()
освобождает мьютекс и переводит поток в состояние ожидания. Когда условная переменная получит извещение, отправленное потоком подготовки данных с помощью
notify_one()
, поток обработки пробудится, вновь захватит мьютекс и еще раз проверит условие. Если условие выполнено, то
wait()
вернет управление, причём мьютекс в этот момент будет захвачен. Если же условие не выполнено, то поток снова освобождает мьютекс и возобновляет ожидание. Именно поэтому нам необходим
std::unique_lock
, а не
std::lock_guard
— ожидающий поток должен освобождать мьютекс, когда находится в состоянии ожидания, и захватывать его но выходе из этого состояния, a
std::lock_guard
такой гибкостью не обладает. Если бы мьютекс оставался захваченным в то время, когда поток обработки спит, поток подготовки данных не смог бы захватить его, чтобы поместить новые данные в очередь, а, значит, ожидаемое условие никогда не было бы выполнено.

В листинге 4.1 используется простая лямбда-функция (5), которая проверяет, что очередь не пуста. Однако с тем же успехом можно было бы передать любую функцию или объект, допускающий вызов. Если функция проверки условия уже существует (быть может, она сложнее показанного в примере простенького теста), то передавайте ее напрямую — нет никакой необходимости обертывать ее лямбда-функцией. Внутри

wait()
условная переменная может проверять условие многократно, но всякий раз это делается после захвата мьютекса, и, как только функция проверки условия вернет
true
(и лишь в этом случае),
wait()
возвращает управление вызывающей программе. Ситуация, когда ожидающий поток захватывает мьютекс и проверяет условие не в ответ на извещение от другого потока, называется ложным пробуждением (spurious wake). Поскольку количество и частота ложных пробуждений по определению недетерминированы, нежелательно использовать для проверки условия функцию с побочными эффектами. В противном случае будьте готовы к тому, что побочный эффект может возникать более одного раза.

Присущая

std::unique_lock
возможность освобождать мьютекс используется не только при обращении к
wait()
, но и непосредственно перед обработкой поступивших данных (6). Обработка может занимать много времени, а, как было отмечено в главе 3, удерживать мьютекс дольше необходимого неразумно.

Применение очереди для передачи данных между потоками (как в листинге 4.1) — весьма распространенный прием. При правильной реализации синхронизацию можно ограничить только самой очередью, что уменьшает количество потенциальных проблем и состояний гонки. Поэтому покажем, как на основе листинга 4.1 построить обобщенную потокобезопасную очередь.

4.1.2. Потокобезопасная очередь на базе условных переменных

Приступая к проектированию обобщенной очереди, стоит потратить некоторое время на обдумывание того, какие понадобятся операции. Именно так мы подходили к разработке потокобезопасного стека в разделе 3.2.3. Возьмем в качестве образца адаптер контейнера

std::queue<>
из стандартной библиотеки С++, интерфейс которого показан в листинге ниже.


Листинг 4.2. Интерфейс класса

std::queue

template >

class queue {

public:

 explicit queue(const Container&);

 explicit queue(Container&& = Container());


 template  explicit queue(const Alloc&);

 template  queue(const Container&, const Alloc&);

 template  queue(Container&&, const Alloc&);

 template  queue(queue&&, const Alloc&);


 void swap(queue& q);


 bool empty() const;

 size_type size() const;


 T& front();

 const T& front() const;

 T& back();

 const T& back() const;


 void push(const T& x);

 void push(T&& x);

 void pop();

 template  void emplace(Args&&... args);

};

Если не обращать внимания на конструирование, присваивание и обмен, то останется три группы операций: опрос состояния очереди в целом (

empty()
и
size()
), опрос элементов очереди (
front()
и
back()
) модификация очереди (
push()
,
pop()
и
emplace()
). Ситуация аналогична той, что мы видели в разделе 3.2.3 для стека, поэтому возникают те же — внутренне присущие интерфейсу — проблемы с гонкой. Следовательно,
front()
и
pop()
необходимо объединить в одной функции — точно так же, как мы постудили с
top()
и
pop()
в случае стека. Но в коде в листинге 4.1 есть дополнительный нюанс: если очередь используется для передачи данных между потоками, то поток-получатель часто будет ожидать поступления данных. Поэтому включим два варианта
pop()
:
try_pop()
пытается извлечь значение из очереди, но сразу возвращает управление (с указанием ошибки), если в очереди ничего не было, a
wait_and_pop()
ждет, когда появятся данные. Взяв за образец сигнатуры функций из примера стека, представим интерфейс в следующем виде:


Листинг 4.3. Интерфейс класса

threadsafe_queue

#include 


template

class threadsafe_queue {

public:

 threadsafe_queue();

 threadsafe_queue(const threadsafe_queue&);

 threadsafe_queue& operator=(

  const threadsafe_queue&) = delete; ←┐
Для простоты

 void push(T new_value);              │
запрещаем присваивание


 bool try_pop(T& value);       ←
(1)

 std::shared_ptr try_pop(); ←
(2)


 void wait_and_pop(T& value);

 std::shared_ptr wait_and_pop();


 bool empty() const;

};

Как и в случае стека, мы для простоты уменьшили число конструкторов и запретили присваивание. И, как и раньше, предлагаем по два варианта функций

try_pop()
и
wait_for_pop()
. Первый перегруженный вариант
try_pop()
(1) сохраняет извлеченное значение в переданной по ссылке переменной, а возвращаемое значение использует для индикации ошибки: оно равно
true
, если значение получено, и
false
— в противном случае (см. раздел А.2). Во втором перегруженном варианте (2) так поступить нельзя, потому что возвращаемое значение — это данные, извлеченные из очереди. Однако же можно возвращать указатель
NULL
, если в очереди ничего не оказалось.

Ну и как же всё это соотносится с листингом 4.1? В следующем листинге показано, как перенести оттуда код в методы

push()
и
wait_and_pop()
.


Листинг 4.4. Реализация функций

push()
и
wait_and_pop()
на основе кода из листинга 4.1

#include 

#include 

#include 


template

class threadsafe_queue {

private:

 std::mutex mut;

 std::queue data_queue;

 std::condition_variable data_cond;

public:

 void push(T new_value) {

  std::lock_guard lk(mut);

  data_queue.push(new_value);

  data_cond.notify_one();

 }


 void wait_and_pop(T& value) {

  std::unique_lock lk(mut);

  data_cond.wait(lk, [this]{return !data_queue.empty();});

  value = data_queue.front();

  data_queue.pop();

 }

};


threadsafe_queue data_queue; ←
(1)


void data_preparation_thread() {

 while (more_data_to_prepare()) {

  data_chunk const data = prepare_data();

  data_queue.push(data); ←
(2)

 }

}


void data_processing_thread() {

 while (true) {

  data_chunk data;

  data_queue.wait_and_pop(data); ←
(3)

  process(data);

  if (is_last_chunk(data))

   break;

 }

}

Теперь мьютекс и условная переменная находятся в экземпляре

threadsafe_queue
, поэтому не нужно ни отдельных переменных (1), ни внешней синхронизации при обращении к функции
push()
(2). Кроме того,
wait_and_pop()
берет на себя заботу об ожидании условной переменной (3).

Второй перегруженный вариант

wait_and_pop()
тривиален, а остальные функции можно почти без изменений скопировать из кода стека в листинге 3.5. Ниже приведена окончательная реализация.


Листинг 4.5. Полное определение класса потокобезопасной очереди на базе условных переменных

#include 

#include 

#include 

#include 


template

class threadsafe_queue {

private:

 mutable std::mutex mut;←
(1) Мьютекс должен быть изменяемым

 std::queue data_queue;

 std::condition_variable data_cond;


public:

 threadsafe_queue() {}

 threadsafe_queue(threadsafe_queue const& other) {

  std::lock_guard lk(other.mut);

  data_queue = other.data_queue;

 }


 void push(T new_value) {

  std::lock_guard lk(mut);

  data_queue.push(new_value);

  data_cond.notify_one();

 }


 void wait_and_pop(T& value) {

  std::unique_lock lk(mut);

  data_cond.wait(lk, [this]{ return !data_queue.empty(); });

  value = data_queue.front();

  data_queue.pop();

 }


 std::shared_ptr wait_and_pop() {

  std::unique_lock lk(mut);

  data_cond.wait(lk, [this]{ return !data_queue.empty(); });

  std::shared_ptr

   res(std::make_shared(data_queue.front()));

  data_queue.pop();

  return res;

 }


 bool try_pop(T& value) {

  std::lock_guard lk(mut);

  if (data_queue.empty())

   return false;

  value = data_queue.front();

  data_queue.pop();

  return true;

 }


 std::shared_ptr try_pop() {

  std::lock_guard lk(mut);

  if (data_queue.empty())

   return std::shared_ptr();

  std::shared_ptr

   res(std::make_shared(data_queue.front()));

  data_queue.pop();

  return res;

 }


 bool empty() const {

  std::lock_guard lk(mut);

  return data_queue.empty();

 }

};

Хотя

empty()
— константная функция-член, а параметр копирующего конструктора —
const
-ссылка, другие потоки могут хранить неконстантные ссылки на объект и вызывать изменяющие функции-члены, которые захватывают мьютекс. Поэтому захват мьютекса — это изменяющая операция, следовательно, член
mut
необходимо пометить как
mutable
(1), чтобы его можно было захватить в функции
empty()
и в копирующем конструкторе.

Условные переменные полезны и тогда, когда есть несколько потоков, ожидающих одного события. Если потоки используются для разделения работы и, следовательно, на извещение должен реагировать только один поток, то применима точно такая же структура программы, как в листинге 4.1; нужно только запустить несколько потоков обработки данных. При поступлении новых данных функция

notify_one()
разбудит только один поток, который проверяет условие внутри
wait()
, и этот единственный поток вернет управление из
wait()
(в ответ на помещение нового элемента в очередь
data_queue
). Заранее нельзя сказать, какой поток получит извещение и есть ли вообще ожидающие потоки (не исключено, что все они заняты обработкой ранее поступивших данных).

Альтернативный сценарий — когда несколько потоков ожидают одного события, и отреагировать должны все. Так бывает, например, когда инициализируются разделяемые данные, и все работающие с ними потоки должны ждать, пока инициализация завершится (хотя для этого случая существуют более подходящие механизмы, см. раздел 3.3.1 главы 3), или когда потоки должны ждать обновления разделяемых данных, например, в случае периодической повторной инициализации. В таких ситуациях поток, отвечающий за подготовку данных, может вызвать функцию-член

notify_all()
условной переменной вместо
notify_one()
. Эта функция извещает все потоки, ожидающие внутри функции
wait()
, о том, что они должны проверить ожидаемое условие.

Если ожидающий поток собирается ждать условия только один раз, то есть после того как оно станет истинным, он не вернется к ожиданию той же условной переменной, то лучше применить другой механизм синхронизации. В особенности это относится к случаю, когда ожидаемое условие — доступность каких-то данных. Для такого сценария больше подходят так называемые будущие результаты (future).

4.2. Ожидание одноразовых событий с помощью механизма будущих результатов