Параллельное программирование на С++ в действии — страница 31 из 53

Часто бывает необходимо сообщить долго работающему потоку о том, что пришло время остановиться. Например, потому что это рабочий поток пула, а мы собираемся уничтожить сам пул, или потому что пользователь отменил работу, выполняемую этим потоком. Причин миллион. Но идея в любом случае одна и та же: послать из одного потока другому сигнал с требованием прекратить работу до ее естественного завершения, и сделать это так, чтобы поток завершился корректно, а не просто выбить почву у него из-под ног.

Можно было бы придумывать такой механизм специально для каждого случая, но это, пожалуй, перебор. Мало того что общий механизм в дальнейшем упростит написание кода, так он еще и позволит писать код, допускающий прерывание, не заботясь о том, где конкретно он используется. Стандарт C++11 такого механизма не предоставляет, но реализовать его самостоятельно не слишком сложно. Я покажу, как это сделать, но сначала взгляну на проблему с точки зрения интерфейса запуска и прерывания потока, а не с точки зрения самого прерываемого потока.

9.2.1. Запуск и прерывание другого потока

Начнем с рассмотрения внешнего интерфейса. Что нам нужно от допускающего прерывание потока? На самом элементарном уровне интерфейс должен быть таким же, как у

std::thread
, но с дополнительной функцией
interrupt()
:

class interruptible_thread {

public:

 template

 interruptible_thread(FunctionType f);

 void join();

 void detach();

 bool joinable() const;

 void interrupt();

};

В реализации можно было бы использовать

std::thread
для управления потоком и какую-то структуру данных для обработки прерывания. А как это выглядит с точки зрения самого потока? Как минимум, нужна возможность сказать: «Меня можно прерывать здесь», то есть нам требуется точка прерывания. Чтобы не передавать дополнительные данные, соответствующая функция должна вызываться без параметров:
interruption_point()
. Отсюда следует, что относящаяся к прерываниям структура данных должна быть доступна через переменную типа
thread_local
, которая устанавливается при запуске потока. Поэтому, когда поток обращается к функции
interruption_point()
, та проверяет структуру данных для текущего исполняемого потока. С реализацией
interruption_point()
мы познакомимся ниже.

Флаг типа

thread_local
— основная причина, по которой мы не можем использовать для управления потоком просто класс
std::thread
; память для него нужно выделить таким образом, чтобы к ней имел доступ как экземпляр
interruptible_thread
, так и вновь запущенный поток. Для этого функцию, переданную конструктору, можно специальным образом обернуть перед тем, как передавать конструктору
std::thread
. Как это делается, показано в следующем листинге.


Листинг 9.9. Простая реализация

interruptible_thread

class interrupt_flag {

public:

 void set();

 bool is_set() const;

};


thread_local interrupt_flag this_thread_interrupt_flag; ←
(1)


class interruptible_thread {

 std::thread internal_thread;

 interrupt_flag* flag;


public:

 template

 interruptible_thread(FunctionType f) {

  std::promise p; ←
(2)

  internal_thread = std::thread([f,&p] { ←
(3)

   p.set_value(&this_thread_interrupt_flag);

   f(); ←
(4)

  });

  flag = p.get_future().get(); ←
(5)

 }


 void interrupt() {

  if (flag) {

   flag->set(); ←
(6)

  }

 }

};

Переданная функция

f
обертывается лямбда-функцией (3), которая хранит копию
f
и ссылку на локальный объект-обещание
p
(2). Перед тем как вызывать переданную функцию (4), лямбда-функция устанавливает в качестве значения обещания адрес переменной
this_thread_interrupt_flag
(объявленной с модификатором
thread_local
(1)) в новом потоке. Затем вызывающий поток дожидается готовности будущего результата, ассоциированного с обещанием, и сохраняет этот результат в переменной-члене
flag
(5). Отметим, что лямбда-функция исполняется в новом потоке и хранит висячую ссылку на локальную переменную
p
, но ничего страшного в этом нет, так как конструктор
interruptible_thread
ждет, пока на
p
не останется ссылок в новом потоке, и только потом возвращает управление. Еще отметим, что эта реализация не обрабатывает присоединение или отсоединение потока. Мы сами должны позаботиться об очистке переменной
flag
в случае выхода или отсоединения потока, чтобы избежать появления висячего указателя.

Теперь написать функцию

interrupt()
несложно: имея указатель на флаг прерывания, мы знаем, какой поток прерывать, поэтому достаточно просто поднять этот флаг (6). Что делать дальше, решает сам прерываемый поток. О том, как принимается это решение, мы и поговорим ниже.

9.2.2. Обнаружение факта прерывания потока

Итак, мы умеем устанавливать флаг прерывания, но толку от него чуть, если поток не проверяет, что его хотят прервать. В простейшем случае это можно сделать с помощью функции

interruption_point()
, которую можно вызывать в точке, где прерывание безопасно. Если флаг прерывания установлен, то эта функция возбуждает исключение
thread_interrupted
:

void interruption_point() {

 if (this_thread_interrupt_flag.is_set()) {

  throw thread_interrupted();

 }

}

Обращаться к этой функции можно там, где нам удобно:

void fоо() {

 while (!done) {

  interruption_point();

  process_next_item();

 }

}

Такое решение работает, но оно не идеально. Лучше всего прерывать поток тогда, когда он блокирован в ожидании чего-либо, но именно в этот момент поток как раз и не работает, а, значит, не может вызвать

interruption_point()
! Нам требуется какой-то механизм прерываемого ожидания.

9.2.3. Прерывание ожидания условной переменной

Итак, мы можем обнаруживать прерывание в подходящих местах программы с помощью обращений к функции

interruption_point()
, но это ничем не помогает в случае, когда поток блокирован в ожидании какого-то события, например сигнала условной переменной. Нам необходима еще одна функция,
interruptible_wait()
, которую можно будет перегрузить для различных ожидаемых событий, и нужно придумать, как вообще прерывать ожидание. Я уже говорил, что среди прочего ожидать можно сигнала условной переменной, поэтому с нее и начнем. Что необходимо для того, чтобы можно было прервать поток, ожидающий условную переменную? Проще всего было бы известить условную переменную в момент установки флага и поставить точку прерывания сразу после ожидания. Но в этом случае придётся разбудить все потоки, ждущие эту условную переменную, хотя заинтересован в этом только прерываемый поток. Впрочем, потоки, ждущие условную переменную, в любом случае должны обрабатывать ложные пробуждения, а отличить посланный нами сигнал от любого другого они не могут, так что ничего страшного не случится. В структуре
interrupt_flag
нужно будет сохранить указатель на условную переменную, чтобы при вызове
set()
ей можно было послать сигнал. В следующем листинге показана возможная реализация функции
interruptible_wait()
для условных переменных.


Листинг 9.10. Неправильная реализация

interruptible_wait()
для
std::condition_variable

void interruptible_wait(std::condition_variable& cv,

 std::unique_lock& lk) {

 interruption_point();←
(1)

 this_thread_interrupt_flag.set_condition_variable(cv);

 cv.wait(lk);         ←
(2)

 this_thread_interrupt_flag.clear_condition_variable();←
(3)

 interruption_point();

}

В предположении, что существуют функции, которые устанавливают и разрывают ассоциацию условной переменной с флагом прерывания, этот код выглядит просто и понятно. Он проверяет, не было ли прерывания, ассоциирует условную переменную с флагом

interrupt_flag
для текущего потока (1), ждет условную переменную (2), разрывает ассоциацию с условной переменной (3) и снова проверяет, не было ли прерывания. Если поток прерывается во время ожидания условной переменной, то прерывающий поток пошлёт этой переменной сигнал, что пробудит нас и даст возможность проверить факт. К сожалению, этот код не работает, в нем есть две проблемы. Первая довольно очевидна: функция
std::condition_variable::wait()
может возбуждать исключения, поэтому из
interruptible_wait()
возможен выход без разрыва ассоциации флага прерывания с условной переменной. Это легко исправляется с помощью структуры, которая разрывает ассоциацию в ее деструкторе.

Вторая, не столь очевидная, проблема связана с гонкой. Если поток прерывается после первого обращения к

interruption_point()
, но до обращения к
wait()
, то не имеет значения, ассоциирована условная переменная с флагом прерывания или нет, потому что поток еще ничего не ждет и, следовательно, не может быть разбужен сигналом, посланным условной переменной. Мы должны гарантировать, что потоку не может быть послан сигнал между последней проверкой прерывания и обращением к
wait()
. Если не залезать в код класса
std::condition_variable
, то сделать это можно только одним способом: использовать для защиты мьютекс, хранящийся в
lk
, который, следовательно, нужно передавать функции
set_condition_variable()
. К сожалению, при этом возникают новые проблемы: мы передаём ссылку на мьютекс, о времени жизни которого ничего не знаем, другому потоку (тому, который выполняет прерывание), чтобы тот его захватил (внутри
interrupt()
). Но может случиться, что этот поток уже удерживает данный мьютекс, и тогда возникнет взаимоблокировка. К тому же, появляется возможность доступа к уже уничтоженному мьютексу. В общем, это решение не годится. Но если мы не можем надежно прерывать ожидание условной переменной, то нечего было и затевать это дело — почти того же самого можно было бы добиться и без специальной функции
interruptible_wait()
. Так какие еще есть варианты? Можно, к примеру, задать таймаут ожидания; использовать вместо
wait()
функцию
wait_for()
с очень коротким таймаутом (скажем, 1 мс). Это ограничивает сверху время до момента, когда поток обнаружит прерывание (с учетом промежутка между тактами часов). Если поступить так, что ожидающий поток будет видеть больше ложных пробуждений из-за срабатывания таймера, но тут уж ничего не попишешь. Такая реализация показана в листинге ниже вместе с соответствующей реализацией
interrupt_flag
.


Листинг 9.11. Реализация

interruptible_wait()
для
std::condition_variable
с таймаутом

class interrupt_flag {

 std::atomic flag;

 std::condition_variable* thread_cond;

 std::mutex set_clear_mutex;


public:

 interrupt_flag(): thread_cond(0) {}


 void set() {

  flag.store(true, std::memory_order_relaxed);

  std::lock_guard lk(set_clear_mutex);

  if (thread_cond) {

   thread_cond->notify_all();

  }

 }


 bool is_set() const {

  return flag.load(std::memory_order_relaxed);

 }


 void set_condition_variable(std::condition_variable& cv) {

  std::lock_guard lk(set_clear_mutex);

  thread_cond = &cv;

 }


 void clear_condition_variable() {

  std::lock_guard lk(set_clear_mutex);

  thread_cond = 0;

 }


 struct clear_cv_on_destruct {

  ~clear_cv_on_destruct() {

   this_thread_interrupt_flag.clear_condition_variable();

  }

 };

};


void interruptible_wait(std::condition_variable& cv,

 std::unique_lock& lk) {

 interruption_point();

 this_thread_interrupt_flag.set_condition_variable(cv);

 interrupt_flag::clear_cv_on_destruct guard;

 interruption_point();

 cv.wait_for(lk, std::chrono::milliseconds(1));

 interruption_point();

}

Если мы ждем какой-то предикат, то таймаут продолжительностью 1 мс можно полностью скрыть внутри цикла проверки предиката:

template

void interruptible_wait(std::condition_variable& cv,

 std::unique_lock& lk,

 Predicate pred) {

 interruption_point();

 this_thread_interrupt_flag.set_condition_variable(cv);

 interrupt_flag::clear_cv_on_destruct guard;

 while (!this_thread_interrupt_flag.is_set() && !pred()) {

  cv.wait_for(lk, std::chrono::milliseconds(1));

 }

 interruption_point();

}

Правда, предикат при этом проверяется чаще, чем необходимо, но зато эту функцию легко использовать вместо простого вызова

wait()
. Легко реализовать и другие варианты функций с таймаутом, например: ждать в течение указанного времени или 1 мс в зависимости от того, что меньше.

Ну хорошо, с ожиданием

std::condition_variable
мы разобрались, а что сказать о
std::condition_variable_any
? Всё точно так же или можно сделать лучше?

9.2.4. Прерывание ожидания
std::condition_variable_any

Класс

std::condition_variable_any
отличается от
std::condition_variable
тем, что работает с любым типом блокировки, а не только с
std::unique_lock
. Как выясняется, это сильно упрощает дело, так что мы сможем добиться более впечатляющих результатов, чем получилось с
std::condition_variable
. Раз допустим любой тип блокировки, то можно написать и свой собственный класс, который захватывает (освобождает) как внутренний мьютекс
set_clear_mutex
в классе
interrupt_flag
, так и блокировку, переданную при вызове
wait()
. Соответствующий код приведён в листинге ниже.


Листинг 9.12. Реализация

interruptible_wait()
для
std::condition_variable_any

class interrupt_flag {

 std::atomic flag;

 std::condition_variable* thread_cond;

 std::condition_variable_any* thread_cond_any;

 std::mutex set_clear_mutex;


public:

 interrupt_flag():

  thread_cond(0), thread_cond_any(0) {}


 void set() {

  flag.store(true, std::memory_order_relaxed);

  std::lock_guard lk(set_clear_mutex);

  if (thread_cond) {

   thread_cond->notify_all();

  } else if (thread_cond_any) {

   thread_cond_any->notify_all();

  }

 }


 template

 void wait(std::condition_variable_any& cv, Lockable& lk) {

  struct custom_lock {

   interrupt_flag* self;

   Lockable& lk;


   custom_lock(interrupt_flag* self_,

    std::condition_variable_any& cond,

    Lockable& lk_): self(self_), lk(lk_) {

    self->set_clear_mutex.lock();  ←
(1)

    self->thread_cond_any = &cond; ←
(2)

   }


   void unlock() { ←
(3)

    lk.unlock();

    self->set_clear_mutex.unlock();

   }


   void lock() {

    std::lock(self->set_clear_mutex, lk); ←
(4)

   }


   ~custom_lock() {

    self->thread_cond_any = 0; ←
(5)

    self->set_clear_mutex.unlock();

   }

  };


  custom_lock cl(this, cv, lk);

  interruption_point();

  cv.wait(cl);

  interruption_point();

 }


 // остальное, как и раньше

};


template

void interruptible_wait(std::condition_variable_any& cv,

 Lockable& lk) {

 this_thread_interrupt_flag.wait(cv, lk);

}

Наш класс блокировки должен захватить внутренний мьютекс

set_clear_mutex
на этапе конструирования (1) и затем записать в переменную
thread_cond_any
указатель на объект
std::condition_variable_any
, переданный конструктору (2). Ссылка на объект
Lockable
сохраняется для последующего использования; он должен быть уже заблокирован. Теперь проверять, был ли поток прерван, можно, не опасаясь гонки. Если в этой точке флаг прерывания установлен, то это было сделано до захвата мьютекса
set_clear_mutex
. Когда условная переменная вызывает нашу функцию
unlock()
внутри
wait()
, мы разблокируем объект
Lockable
и внутренний мьютекс
set_clear_mutex
(3). Это позволяет потокам, которые пытаются нас прервать, захватить
set_clear_mutex
и проверить указатель
thread_cond_any
, когда мы уже находимся в
wait()
, но не раньше. Это именно то, чего мы хотели (но не смогли) добиться в случае
std::condition_variable
. После того как
wait()
завершит ожидание (из-за получения сигнала или вследствие ложного пробуждения), она вызовет нашу функцию
lock()
, которая снова захватит внутренний мьютекс
set_clear_mutex
и заблокирует объект
Lockable
(4). Теперь можно еще раз проверить, не было ли прерываний, пока мы находились в
wait()
, и только потом обнулить указатель
thread_cond_any
в деструкторе
custom_lock
(5), где также освобождается
set_clear_mutex
.

9.2.5. Прерывание других блокирующих вызовов

Теперь с прерыванием ожидания условной переменной всё ясно, но как быть с другими блокирующими операциями ожидания: освобождения мьютекса, готовности будущего результата и т.д.? Вообще говоря, приходится прибегать к тому же трюку с таймаутом, который мы использовали для

std::condition_variable
, потому что, если не влезать в код мьютекса или будущего результата, то нет никакого другого способа прервать ожидание, кроме как обеспечить выполнение ожидаемого условия. Но в отличие от условных переменных, мы точно знаем, чего ждем, поэтому можем организовать цикл внутри функции
interruptible_wait()
.

Вот, например, как выглядит перегрузка

interruptible_wait()
для
std::future<>
:

template

void interruptible_wait(std::future& uf) {

 while (!this_thread_interrupt_flag.is_set()) {

  if (uf.wait_for(lk, std::chrono::milliseconds(1) ==

   std::future_status::ready)

  break;

 }

 interruption_point();

}

Здесь мы ждем, пока либо будет установлен флаг прерывания, либо готов будущий результат, но блокирующее ожидание будущего результата продолжается в течение 1 мс на каждой итерации цикла. Это означает, что в среднем запрос на прерывание будет обнаружен с задержкой 0,5 мс, в предположении, что разрешение часов достаточно высокое. Функция

wait_for
обычно ожидает в течение как минимум одного такта, поэтому если такт системных часов составляет 15 мс, то ждать придётся не одну, а 15 мс. Приемлемо это или нет, зависит от конкретных условий. Таймаут при необходимости можно уменьшить (если часы позволяют), но тогда поток будет чаще просыпаться для проверки флага, что увеличит накладные расходы на переключение задач.

На данный момент мы знаем, как можно обнаружить прерывание с помощью функций

interruption_point()
и
interruptible_wait()
, но что с этим потом делать?

9.2.6. Обработка прерываний

С точки зрения прерываемого потока, прерывание — это просто исключение типа

thread_interrupted
, которое, следовательно, можно обработать, как любое другое исключение. В частности, его можно перехватить в стандартном блоке
catch
:

try {

 do_something();

} catch (thread_interrupted&) {

 handle_interruption();

}

Таким образом, прерывание можно перехватить, каким-то образом обработать и потом спокойно продолжить работу. Если мы так поступим, а потом другой поток еще раз вызовет

interrupt()
, то поток будет снова прерван, когда в очередной раз вызовет функцию
interruption_point()
. Возможно, это и неплохо, если вы выполняете последовательность независимых задач; текущая задача будет прервана, а поток благополучно перейдёт к следующей задаче в списке.

Поскольку

thread_interrupted
— исключение, то при вызове кода, который может быть прерван, следует принимать все обычные для исключений меры предосторожности, чтобы не было утечки ресурсов, и структуры данных оставались согласованными. Часто желательно завершать поток в случае прерывания, так чтобы исключение можно было просто передать вызывающей функции. Но если позволить исключению выйти за пределы функции потока, переданной конструктору
std::thread
, то будет вызвана функция
std::terminate()
, что приведёт к завершению всей программы. Чтобы не помещать обработчик
catch(thread_interrupted)
в каждую функцию, которая передаётся
interruptible_thread
, можно включить блок
catch
в обертку, служащую для инициализации
interrupt_flag
. Тогда распространять необработанное исключение будет безопасно, так как завершится лишь отдельный поток. Инициализация потока в конструкторе
interruptible_thread
при таком подходе выглядит следующим образом:

internal_thread = std::thread([f, &p] {

 p.set_value(&this_thread_interrupt_flag);

  try {

  f();

 } catch(thread_interrupted const&) {}

});

А теперь рассмотрим конкретный пример, когда прерывание оказывается полезно.

9.2.7. Прерывание фоновых потоков при выходе из приложения

Представьте себе приложение для поиска в файловой системе настольного ПК. Оно должно не только взаимодействовать с пользователем, но и следить за состоянием файловой системы, обнаруживать изменения и обновлять свой индекс. Обычно такие операции поручаются фоновому потоку, чтобы пользовательский интерфейс мог реагировать на действия пользователя. Фоновый поток должен работать на протяжении всего времени жизни приложения; он запускается на этапе инициализации и трудится, пока приложение не завершится. Обычно это происходит при останове операционной системы, так как приложение должно постоянно поддерживать индекс в актуальном состоянии. Как бы то ни было, когда приложение завершается, надо аккуратно остановить и фоновые потоки, например, прервав их.

В следующем листинге показана возможная реализация управления потоками в такой программе.


Листинг 9.13. Фоновый мониторинг файловой системы

std::mutex config_mutex;

std::vector background_threads;


void background_thread(int disk_id) {

 while (true) {

  interruption_point(); ←
(1)

  fs_change fsc = get_fs_changes(disk_id); ←
(2)

  if (fsc.has_changes()) {

   update_index(fsc); ←
(3)

  }

 }

}


void start_background_processing() {

 background_threads.push_back(

  interruptible_thread(background_thread, disk_1));

 background_threads.push_back(

  interruptible_thread(background_thread, disk_2));

}


int main() {

 start_background_processing(); ←
(4)

 process_gui_until_exit(); ←
(5)

 std::unique_lock lk(config_mutex);

 for (unsigned i = 0; i < background_threads.size(); ++i) {

  background_threads[i].interrupt(); ←
(6)

 }

 for (unsigned i = 0; i < background_threads.size(); ++i) {

  background_threads[i].join(); ←
(7)

 }

}

В самом начале запускаются фоновые потоки (4). Затем главный поток продолжает обслуживать пользовательский интерфейс (5). Когда пользователь хочет выйти из приложения, фоновые потоки прерываются (6), после чего главный поток ждет их завершения (7), и только потом выходит сам. Каждый фоновый поток исполняет цикл, в котором следит за изменениями на диске (2) и обновляет индекс (3). На каждой итерации цикла поток проверяет, не прервали ли его, вызывая функцию

interruption_point()
(1).

Почему мы прерываем все потоки до того, как начинать ждать их завершения? Почем нельзя прервать один поток, дождаться его, потом прервать следующий и так далее? Все из-за параллелизма. Поток не завершается сразу после прерывания, так как должен добраться до очередной точки прерывания, а затем, перед выходом, выполнить все деструкторы и код обработки исключений. Если главный поток будет присоединять прерванные потоки сразу после прерывания, то ему придётся ждать, хотя в это время он мог бы делать полезную работу — прерывать другие потоки. Поэтому мы поступаем по-другому — начинаем ждать только тогда, когда больше никакой работы не осталось (все потоки уже прерваны). Заодно это позволяет прерываемым потокам обрабатывать прерывания параллельно, так что общее время завершения, возможно, уменьшится.

Описанный механизм прерывания легко развить, добавив дополнительные прерываемые вызовы или запретив прерывания на определенном участке кода, но это я оставляю читателю в качестве упражнения.

9.3. Резюме