смотренную в главах 3 и 4. Другая — спроектировать саму структуру данных, так чтобы к ней был возможен параллельный доступ.
При проектировании структуры данных, допускающей параллельный доступ, мы можем использовать основные строительные блоки многопоточных приложений, описанные в предыдущих главах, например мьютексы и условные переменные. На самом деле, вы уже видели примеры структур, в которых сочетание этих блоков гарантирует безопасный доступ из нескольких потоков.
Мы начнем эту главу с нескольких общих рекомендаций по проектированию параллельных структур данных. Затем мы еще раз вернемся к использованию блокировок и условных переменных в простых структурах, после чего перейдём к более сложным. В главе 7 я покажу, как с помощью атомарных операций можно строить структуры данных без блокировок.
Но довольно предисловий — посмотрим, что входит в проектирование структуры данных для параллельного программирования.
6.1. Что понимается под проектированием структур данных, рассчитанных на параллельный доступ?
На простейшем уровне это означает, что нужно спроектировать структуру данных, к которой смогут одновременно обращаться несколько потоков для выполнения одних и тех же или разных операций, причём каждый поток должен видеть согласованное состояние структуры. Данные не должны теряться или искажаться, все инварианты должны быть соблюдены, и никаких проблематичных состояний гонки не должно быть. Такая структура данных называется потокобезопасной. В общем случае структура данных безопасна только относительно определенных типов параллельного доступа. Не исключено, что несколько потоков могут одновременно выполнять какую-то одну операцию над структурой, а для выполнения другой необходим монопольный доступ. Наоборот, бывает, что несколько потоков могут одновременно и безопасно выполнять различные операции, но при выполнении одной и той же операции в разных потоках возникают проблемы.
Но проектирование с учетом параллелизма этим не исчерпывается: задача заключается в том, чтобы предоставить возможность распараллеливания потокам, обращающимся к структуре данных. По природе своей, мьютекс означает взаимное исключение: в каждый момент времени только один поток может захватить мьютекс. Следовательно, мьютекс защищает структуру данных, явным образом предотвращая истинно параллельный доступ к ней.
Это называется сериализацией: потоки обращаются к защищенным мьютексом данным по очереди, то есть последовательно, а не параллельно. Чтобы обеспечить истинно параллельный доступ, нужно тщательно продумывать внутреннее устройство структуры данных. Одни структуры данных оставляют больший простор для распараллеливания, чем другие, но идея остается неизменной: чем меньше защищаемая область, тем меньше операций приходится сериализовывать и тем больше потенциал для распараллеливания.
Прежде чем знакомиться с конкретными структурами данных, приведём несколько простых рекомендаций, полезных при проектировании с учетом параллелизма.
6.1.1. Рекомендации по проектированию структур данных для параллельного доступа
Как я уже отмечал, при проектировании структур данных для параллельного доступа нужно учитывать два аспекта: обеспечить безопасность доступа и разрешить истинно параллельный доступ. Как сделать структуру потокобезопасной, я уже рассказывал в главе 3.
• Гарантировать, что ни один поток не может увидеть состояние, в котором инварианты структуры данных нарушены действиями со стороны других потоков.
• Позаботиться о предотвращении состояний гонки, внутренне присущих структуре данных, предоставив такие функции, которые выполняли бы операции целиком, а не частями.
• Обращать внимание на том, как ведет себя структура данных при наличии исключений, — не допускать нарушения инвариантов и в этом случае.
• Минимизировать шансы возникновения взаимоблокировки, ограничивая область действия блокировок и избегая но возможности вложенных блокировок.
Прежде чем задумываться об этих деталях, важно решить, какие ограничения вы собираетесь наложить на использование структуры данных: если некоторый поток обращается к структуре с помощью некоторой функции, то какие функции можно в этот момент безопасно вызывать из других потоков?
Это на самом деле весьма важный вопрос. Обычно конструкторы и деструкторы нуждаются в монопольном доступе к структуре данных, но обязанность не обращаться к структуре до завершения конструирования или после начала уничтожения возлагается на пользователя. Если структура поддерживает присваивание, функцию
swap()
или копирующий конструктор, то проектировщик должен решить, безопасно ли вызывать эти операции одновременно с другими или пользователь должен обеспечить на время их выполнения монопольный доступ, хотя большинство других операций можно без опаски выполнять параллельно из разных потоков.Второй аспект, нуждающийся в рассмотрении, — обеспечение истинно параллельного доступа. Тут я не могу предложить конкретных рекомендаций, а вместо этого перечислю несколько вопросов, которые должен задать себе проектировщик структуры данных.
• Можно ли ограничить область действия блокировок, так чтобы некоторые части операции выполнялись не под защитой блокировки?
• Можно ли защитить разные части структуры данных разными мьютексами?
• Все ли операции нуждаются в одинаковом уровне защиты?
• Можно ли с помощью простого изменения структуры данных расширить возможности распараллеливания, не затрагивая семантику операций?
В основе всех этих вопросов лежит одна и та же мысль: как свести к минимуму необходимую сериализацию и обеспечить максимально возможную степень истинного параллелизма? Часто бывает так, что структура данных допускает одновременный доступ из нескольких потоков для чтения, но поток, желающий модифицировать данные, должен получать монопольный доступ. Такое требование поддерживает класс
boost::shared_mutex
и ему подобные. Как мы скоро увидим, встречается и другой случай: поддерживается одновременный доступ из потоков, выполняющих различные операции над структурой, но потоки, выполняющие одну и ту же операцию, сериализуются.В простейших потокобезопасных структурах данных обычно для защиты используются мьютексы и блокировки. Хотя, как мы видели в главе 3, им свойственны некоторые проблемы, но гарантировать с их помощью, что в каждый момент времени доступ к данным будет иметь только один поток, сравнительно легко. Мы будем знакомиться с проектированием потокобезопасных структур данных постепенно, и в этой главе рассмотрим только структуры на основе блокировок. А разговор о параллельных структурах данных без блокировок отложим до главы 7.
6.2. Параллельные структуры данных с блокировками
Проектирование параллельных структур данных с блокировками сводится к тому, чтобы захватить нужный мьютекс при доступе к данным и удерживать его минимально возможное время. Это довольно сложно, даже когда имеется только один мьютекс, защищающий всю структуру. Как мы видели в главе 3, требуется гарантировать, что к данным невозможно обратиться без защиты со стороны мьютекса и что интерфейс свободен от внутренне присущих состояний гонки. Если для защиты отдельных частей структуры применяются разные мьютексы, то проблема еще усложняется, поскольку в случае, когда некоторые операции требуют захвата нескольких мьютексов, появляется возможность взаимоблокировки. Поэтому к проектированию структуры данных с несколькими мьютексами следует подходить еще более внимательно, чем при наличии единственного мьютекса. В этом разделе мы применим рекомендации из раздела 6.1.1 к проектированию нескольких простых структур данных, защищаемых мьютексами. В каждом случае мы будем искать возможности повысить уровень параллелизма, обеспечивая в то же время потокобезопасность.
Начнем с реализации стека, приведённой в главе 3; это одна из самых простых структур данных, к тому же в ней используется всего один мьютекс. Но является ли она потокобезопасной? И насколько она хороша с точки зрения достижения истинного распараллеливания?
6.2.1. Потокобезопасный стек с блокировками
В следующем листинге воспроизведен код потокобезопасного стека из главы 3. Задача состояла в том, чтобы реализовать потокобезопасную структуру данных наподобие
std::stack<>
, которая поддерживала бы операции заталкивания и выталкивания.
Листинг 6.1. Определение класса потокобезопасного стека
#include
struct empty_stack: std::exception {
const char* what() const throw();
};
template
class threadsafe_stack {
private:
std::stack data;
mutable std::mutex m;
public:
threadsafe_stack(){}
threadsafe_stack(const threadsafe_stack& other) {
std::lock_guard lock(other.m);
data = other.data;
}
threadsafe_stack& operator=(const threadsafe_stack&) = delete;
void push(T new_value) {
std::lock_guard lock(m);
data.push(std::move(new_value)); ←
(1) }
std::shared_ptr pop() {
std::lock_guard lock(m);
if (data.empty()) throw empty_stack(); ←
(2) std::shared_ptr const res(
std::make_shared(std::move(data.top())));←
(3) data.pop(); ←
(4) return res;
}
void pop(T& value) {
std::lock_guard lock(m);
if (data.empty()) throw empty_stack();
value = std::move(data.top()); ←
(5) data.pop(); ←
(6) }
bool empty() const {
std::lock_guard lock(m);
return data.empty();
}
};
Посмотрим, как в этом случае применяются сформулированные выше рекомендации. Во-первых, легко видеть, что базовую потокобезопасность обеспечивает защита каждой функции-члена с помощью мьютекса
m
. Он гарантирует, что в каждый момент времени к данным может обращаться только один поток, поэтому если функции-члены поддерживают какие-то инварианты, то ни один поток не увидит их нарушения.Во-вторых, существует потенциальная гонка между
empty()
и любой из функций pop()
, но поскольку мы явно проверяем, что стек пуст, удерживая блокировку в pop()
, эта гонка не проблематична. Возвращая извлеченные данные прямо в pop()
, мы избегаем потенциальной гонки, которая могла бы случиться, если бы top()
и pop()
были отдельными функциями-членами, как в std::stack<>
.Далее, существует несколько возможных источников исключений. Операция захвата мьютекса может возбудить исключение, но, во-первых, это крайне редкий случай (свидетельствующий о проблемах в реализации мьютекса или о нехватке системных ресурсов), а, во-вторых, эта операция всегда выполняется в самом начале любой функции-члена. Поскольку в этот момент никакие данные еще не изменены, опасности нет. Операция освобождения мьютекса не может завершиться ошибкой, она всегда безопасна, а использование
std::lock_guard<>
гарантирует, что мьютекс не останется захваченным.Вызов
data.push()
(1) может возбудить исключение, если его возбуждает копирование или перемещение данных либо если памяти недостаточно для увеличения размера структуры, в которой хранятся сами данные. В любом случае std::stack<>
гарантирует безопасность, поэтому здесь проблемы тоже нет.В первом перегруженном варианте
pop()
наш код может возбудить исключение empty_stack
(2), но в этот момент еще ничего не изменено, так что мы в безопасности. Создание объекта res
(3) может возбудить исключение по двум причинам: при обращении к std::make_shared
может не хватить памяти для нового объекта и внутренних данных, необходимых для подсчёта ссылок, или копирующий либо перемещающий конструктор возбуждает исключение при копировании или перемещении данных в только что выделенную область памяти. В обоих случаях исполняющая среда С++ и стандартная библиотека гарантируют отсутствие утечек памяти и корректное уничтожение нового объекта (если он был создан). Поскольку мы все еще не модифицировали данные стека, все хорошо. Вызов data.pop()
(4) гарантированно не возбуждает исключений, равно как и возврат результата, так что этот вариант pop()
безопасен относительно исключений.Второй перегруженный вариант
pop()
аналогичен, только на этот раз исключение может возбудить оператор копирующего или перемещающего присваивания (5), а не конструктор нового объекта или экземпляра std::shared_ptr
. Но и теперь мы ничего не изменяли до вызова функции data.pop()
(6), которая гарантированно не возбуждает исключений, так что и этот вариант безопасен относительно исключений.Наконец, функция
empty()
вообще не изменяет данные, так что она точно безопасна относительно исключенийВ этом коде есть две возможности для взаимоблокировки из-за того, что пользовательский код вызывается, когда удерживается блокировка: копирующий или перемещающий конструктор (1), (3) и копирующий или перемещающий оператор присваивания (5) хранимых в стеке данных. И еще —
operator new
, который также мог бы быть определён пользователем. Если любая из этих функций вызовет функции-члены стека, в который вставляется или из которого удаляется элемент, либо затребует какую-либо блокировку в момент, когда удерживается блокировка, захваченная при вызове функции-члена стека, то может возникнуть взаимоблокировка. Однако было бы разумно возложить ответственность за это на пользователей стека; невозможно представить себе разумную реализацию операций добавления в стек и удаления из стека, которые не копировали бы данные и не выделяли память.Поскольку все функции-члены используют для защиты данных класс
std::lock_guard<>
, их можно безопасно вызывать из любого количества потоков. Единственные небезопасные функции-члены — конструкторы и деструкторы, но эта проблема не особенно серьезна; объект можно сконструировать и уничтожить только один раз. Вызов функций-членов не полностью сконструированного или частично уничтоженного объекта — это всегда плохо, и к одновременности доступа отношения не имеет. Таким образом, пользователь должен гарантировать, что никакой другой поток не может обратиться к стеку, пока он не будет сконструирован полностью, и что любая операция доступа завершается до начала его уничтожения.Хотя благодаря блокировке несколько потоков могут одновременно вызывать функции-члены стека, в каждый момент времени с ним реально работает не более одного потока. Такая сериализация потоков может снизить производительность приложения в случае, когда имеется значительная конкуренция за стек, — пока поток ожидает освобождения блокировки, он не выполняет никакой полезной работы. Кроме того, стек не предоставляет средств, позволяющих ожидать добавления элемента. Следовательно, если потоку нужно получить из стека элемент, то он должен периодически опрашивать его с помощью
empty()
или просто вызывать pop()
и обрабатывать исключение empty_stack
.Поэтому для такого сценария данная реализация стека неудачна, так как ожидающий поток должен либо впустую растрачивать драгоценные ресурсы, ожидая данных, либо пользователь должен писать внешний код ожидания и извещения (например, с помощью условных переменных), который сделает внутренний механизм блокировки избыточным и, стало быть, расточительным. Приведенная в главе 4 реализация очереди демонстрирует, как можно включить такое ожидание в саму структуру данных с помощью условной переменной. Это и станет нашим следующим примером.
6.2.2. Потокобезопасная очередь с блокировками и условными переменными
В листинге 6.2 воспроизведен код потокобезопасной очереди из главы 4. Если стек построен по образцу
std::stack<>
, то очередь — по образцу std::queue<>
. Но ее интерфейс также отличается от стандартного адаптера контейнера, потому что запись в структуру данных должна быть безопасной относительно одновременного доступа из нескольких потоков.
Листинг 6.2. Потокобезопасная очередь с блокировками и условными переменными
template
class threadsafe_queue {
private:
mutable std::mutex mut;
std::queue data_queue;
std::condition_variable data_cond;
public:
threadsafe_queue() {}
void push(T new_value) {
std::lock_guard lk(mut);
data_queue.push(std::move(data));
data_cond.notify_one(); ←
(1) }
void wait_and_pop(T& value) { ←
(2) std::unique_lock lk(mut);
data_cond.wait(lk, [this]{return !data_queue.empty();});
value = std::move(data_queue.front());
data_queue.pop();
}
std::shared_ptr wait_and_pop() ←
(3) std::unique_lock lk(mut);
data_cond.wait(lk, [this] {return !data_queue.empty();});←
(4) std::shared_ptr res(
std::make_shared(std::move(data_queue.front())));
data_queue.pop();
return res;
}
bool try_pop(T& value) {
std::lock_guard lk(mut);
if (data_queue.empty())
return false;
value = std::move(data_queue.front());
data_queue.pop();
return true;
}
std::shared_ptr try_pop() {
std::lock_guard lk(mut);
if (data_queue.empty())
return std::shared_ptr(); ←
(5) std::shared_ptr res(
std::make_shared(std::move(data_queue.front())));
data_queue.pop();
return res;
}
bool empty() const {
std::lock_guard lk(mut);
return data_queue.empty();
}
};
Структурно очередь в листинге 6.2 реализована аналогично стеку в листинге 6.1, отличие только в обращениях к функции
data_cond.notify_one()
в push()
(1) и в наличии двух вариантов функции wait_and_pop()
(2), (3). Оба перегруженных варианта try_pop()
почти идентичны функциям pop()
в листинге 6.1 с тем отличием, что не возбуждают исключение, если очередь пуста. Вместо этого одна функция возвращает булевское значение, показывающее, были ли извлечены данные, а вторая — возвращающая указатель на данные (5) — указатель NULL
. Точно так же можно было бы поступить и в случае стека. Таким образом, если оставить в стороне функции wait_and_pop()
, то применим тот же анализ, который мы провели для стека.Новые функции
wait_and_pop()
решают проблему ожидания значения в очереди, с которой мы столкнулись при обсуждении стека; вместо того чтобы раз за разом вызывать empty()
, ожидающий поток может просто вызвать wait_and_pop()
, а структура данных обслужит этот вызов с помощью условной переменной. Обращение к data_cond.wait()
не вернет управление, пока во внутренней очереди не появится хотя бы один элемент, так что мы можем не беспокоиться но поводу того, что в этом месте кода возможна пустая очередь. При этом данные по-прежнему защищаются мьютексом. Таким образом, функции wait_and_pop()
не вводят новых состояний гонки, не создают возможности взаимоблокировок и не нарушают никаких инвариантов.В части безопасности относительно исключений есть мелкая неприятность — если помещения данных в очередь ожидают несколько потоков, то лишь один из них будет разбужен в результате вызова
data_cond.notify_one()
. Однако если этот поток возбудит исключение в wait_and_pop()
, например при конструировании std::shared_ptr<>
(4), то ни один из оставшихся потоков разбужен не будет. Если это неприемлемо, то можно заменить notify_one()
на data_cond.notify_all()
, тогда будут разбужены все потоки, но за это придётся заплатить — большая часть из них сразу же уснет снова, увидев, что очередь по-прежнему пуста. Другой вариант — включить в wait_and_pop()
обращение к notify_one()
в случае исключения, тогда другой поток сможет попытаться извлечь находящееся в очереди значение. Третий вариант — перенести инициализацию std::shared_ptr<>
в push()
и сохранять экземпляры std::shared_ptr<>
, а не сами значения данных. Тогда при копировании std::shared_ptr<>
из внутренней очереди std::queue<>
никаких исключений возникнуть не может, и wait_and_pop()
становится безопасной. В следующем листинге приведена реализация очереди, переработанная с учетом высказанных соображений.
Листинг 6.3. Потокобезопасная очередь, в которой хранятся объекты
std::shared_ptr
template
class threadsafe_queue {
private:
mutable std::mutex mut;
std::queue> data_queue;
std::condition_variable data_cond;
public:
threadsafe_queue() {}
void wait_and_pop(T& value) {
std::unique_lock lk(mut);
data_cond.wait(lk, [this]{return !data_queue.empty();});
value = std::move(*data_queue.front()); ←
(1) data_queue.pop();
}
bool try_pop(T& value) {
std::lock_guard lk(mut);
if (data_queue.empty())
return false;
value = std::move(*data_queue.front()); ←
(2) data_queue.pop();
return true;
}
std::shared_ptr wait_and_pop() {
std::unique_lock lk(mut);
data_cond.wait(lk, [this]{return !data_queue.empty();});
std::shared_ptr res = data_queue.front(); ←
(3) data_queue.pop();
return res;
}
std::shared_ptr try_pop() {
std::lock_guard lk(mut);
if (data_queue.empty())
return std::shared_ptr();
std::shared_ptr res = data_queue.front(); ←
(4) data_queue.pop();
return res;
}
void push(T new_value) {
std::shared_ptr data(
std::make_shared(std::move(new_value))); ←
(5) std::lock_guard lk(mut);
data_queue.push(data);
data_cond.notify_one();
}
bool empty() const {
std::lock_guard lk(mut);
return data_queue.empty();
}
};
Последствия хранения данных, обернутых в
std::shared_ptr<>
, понятны: функции pop, которые получают значение из очереди в виде ссылки на переменную, теперь должны разыменовывать указатель (1), (2), а функции pop
, которые возвращают std::shared_ptr<>
, теперь могут напрямую извлекать его из очереди (3), (4) без дальнейших манипуляций.У хранения данных в виде
std::shared_ptr<>
есть и еще одно преимущество: выделение памяти для нового объекта можно производить не под защитой блокировки в push()
(5), тогда как в листинге 6.2 это приходилось делать в защищенном участке кода внутри pop()
. Поскольку выделение памяти, вообще говоря, дорогая операция, это изменение весьма благотворно скажется на общей производительности очереди, так как уменьшается время удержания мьютекса, а, значит, у остальных потоков остается больше времени на полезную работу.Как и в примере стека, применение мьютекса для защиты всей структуры данных ограничивает возможности распараллеливания работы с очередью; хотя ожидать доступа к очереди могут несколько потоков, выполняющих разные функции, в каждый момент лишь один совершает какие-то действия. Однако это ограничение отчасти проистекает из того, что мы пользуемся классом
std::queue<>
, — стандартный контейнер составляет единый элемент данных, который либо защищен, либо нет. Полностью взяв на себя управление деталями реализации структуры данных, мы сможем обеспечить мелкогранулярные блокировки и повысить уровень параллелизма.6.2.3. Потокобезопасная очередь с мелкогранулярными блокировками и условными переменными
В листингах 6.2 и 6.3 имеется только один защищаемый элемент данных (
data_queue
) и, следовательно, только один мьютекс. Чтобы воспользоваться мелкогранулярными блокировками, мы должны заглянуть внутрь очереди и связать мьютекс с каждым хранящимся в ней элементом данных.Проще всего реализовать очередь в виде односвязного списка, как показано на рис. 6.1. Указатель head направлен на первый элемент списка, и каждый элемент указывает на следующий. Когда данные извлекаются из очереди, в head записывается указатель на следующий элемент, после чего возвращается элемент, который до этого был в начале.
Добавление данных производится с другого конца. Для этого нам необходим указатель tail, направленный на последний элемент списка. Чтобы добавить узел, мы записываем в поле next в последнем элементе указатель на новый узел, после чего изменяем указатель tail, так чтобы он адресовал новый элемент. Если список пуст, то оба указателя head и tail равны
NULL
.В следующем листинге показана простая реализация такой очереди с урезанным по сравнению с листингом 6.2 интерфейсом; мы оставили только функцию
try_pop()
и убрали функцию wait_and_pop()
, потому что эта очередь поддерживает только однопоточную работу.Рис. 6.1. Очередь, представленная в виде односвязного списка
Листинг 6.4. Простая реализация однопоточной очереди
template
class queue {
private:
struct node {
T data;
std::unique_ptr next;
node(T data_):
data(std::move(data_)) {}
};
std::unique_ptr head;←
(1) node* tail; ←
(2)
public:
queue() {}
queue(const queue& other) = delete;
queue& operator=(const queue& other) = delete;
std::shared_ptr try_pop() {
if (!head) {
return std::shared_ptr();
}
std::shared_ptr const res(
std::make_shared(std::move(head->data)));
std::unique_ptr const old_head = std::move(head);
head = std::move(old_head->next);←
(3) return res;
}
void push(T new_value) {
std::unique_ptr p(new node(std::move(new_value)));
node* const new_tail = p.get();
if (tail) {
tail->next = std::move(p);←
(4) } else {
head = std::move(p); ←
(5) }
tail = new_tail; ←
(6)}
};
Прежде всего отметим, что в листинге 6.4 для управления узлами используется класс
std::unique_ptr
, потому что он гарантирует удаление потерявших актуальность узлов (и содержащихся в них данных) без явного использования delete
. За передачу владения отвечает head
, тогда как tail
является простым указателем на последний узел.В однопоточном контексте эта реализация прекрасно работает, но при попытке ввести мелкогранулярные блокировки в многопоточном контексте возникают две проблемы. Учитывая наличие двух элементов данных (
head
(1) и tail
(2)), мы в принципе могли бы использовать два мьютекса — для защиты head
и tail
соответственно. Но не всё так просто.Самая очевидная проблема заключается в том, что
push()
может изменять как head
(5), так и tail
(6), поэтому придётся захватывать оба мьютекса. Это не очень хорошо, но не трагедия, потому что захватить два мьютекса, конечно, можно. Настоящая проблема возникает из-за того, что и push()
, и pop()
обращаются к указателю next
в узле: push()
обновляет tail->next
(4), a try_pop()
читает head->next
(3). Если в очереди всего один элемент, то head==tail
, и, значит, head->next
и tail->next
— один и тот же объект, который, следовательно, нуждается в защите. Поскольку нельзя сказать, один это объект или нет, не прочитав и head
, и tail
, нам приходится захватывать один и тот же мьютекс в push()
и в try_pop()
, и получается, что мы ничего не выиграли по сравнению с предыдущей реализацией. Есть ли выход из этого тупика?Обеспечение параллелизма за счет отделения данныхРешить проблему можно, заранее выделив фиктивный узел, не содержащий данных, и тем самым гарантировать, что в очереди всегда есть хотя бы один узел, отделяющий голову от хвоста. В случае пустой очереди
head
и tail
теперь указывают на фиктивный узел, а не равны NULL
. Это хорошо, потому что try_pop()
не обращается к head->next
, если очередь пуста. После добавления в очередь узла (в результате чего в ней находится один реальный узел) head
и tail
указывают на разные узлы, так что гонки за head->next
и tail->next
не возникает. Недостаток этого решения в том, что нам пришлось добавить лишний уровень косвенности для хранения указателя на данные, чтобы поддержать фиктивный узел. В следующем листинге показано, как теперь выглядит реализация.
Листинг 6.5. Простая очередь с фиктивным узлом
template
class queue {
private:
struct node {
std::shared_ptr data;←
(1) std::unique_ptr next;
};
std::unique_ptr head;
node* tail;
public:
queue():
head(new node), tail(head.get())←
(2) {}
queue(const queue& other) = delete;
queue& operator=(const queue& other) = delete;
std::shared_ptr try_pop() {
if (head.get() ==tail)←
(3) {
return std::shared_ptr();
}
std::shared_ptr const res(head->data); ←
(4) std::unique_ptr old_head = std::move(head);
head = std::move(old_head->next); ←
(5) return res; ←
(6) }
void push(T new_value) {
std::shared_ptr new_data(
std::make_shared(std::move(new_value))); ←
(7) std::unique_ptr p(new node); ←
(8) tail->data = new_data;←
(9) node* const new_tail = p.get();
tail->next = std::move(p);
tail = new_tail;
}
};
Изменения в
try_pop()
минимальны. Во-первых, мы сравниваем head
с tail
(3), а не с NULL
, потому что благодаря наличию фиктивного узла head
никогда не может обратиться в NULL
. Поскольку head
имеет тип std::unique_ptr
, для сравнения необходимо вызывать head.get()
. Во-вторых, так как в node
теперь хранится указатель на данные (1), то можно извлекать указатель непосредственно (4) без конструирования нового экземпляра T
. Наиболее серьезные изменения претерпела функция push()
: мы должны сначала создать новый экземпляр T
в куче и передать владение им std::shared_ptr<>
(7) (обратите внимание на использование функции std::make_shared
, чтобы избежать накладных расходов на второе выделение памяти под счетчик ссылок). Вновь созданный узел станет новым фиктивным узлом, поэтому передавать конструктору значение new_value
необязательно (8). Вместо этого мы записываем в старый фиктивный узел значение только что созданной копии — new_value
(9). Наконец, первоначальный фиктивный узел следует создать в конструкторе (2).Уверен, теперь вы задаетесь вопросом, что мы выиграли от всех этих изменений и как они помогут сделать код потокобезопасным. Разберемся. Функция
push()
теперь обращается только к tail
, но не к head
, и это, безусловно, улучшение. try_pop()
обращается и к head
, и к tail
, но tail
нужен только для начального сравнения, так что блокировка удерживается очень недолго. Основной выигрыш мы получили за счет того, что из-за наличия фиктивного узла try_pop()
и push()
никогда не оперируют одним и тем же узлом, так что нам больше не нужен всеохватывающий мьютекс. Стало быть, мы можем завести по одному мьютексу для head
и tail
. Но где расставить блокировки?Мы хотим обеспечить максимум возможностей для распараллеливания, поэтому блокировки должны освобождаться как можно быстрее. С функцией
push()
всё просто: мьютекс должен быть заблокирован на протяжении всех обращений к tail
, а это означает, что мы захватываем его после выделения памяти для нового узла (8) и перед тем, как записать данные в текущий последний узел (9). Затем блокировку следует удерживать до конца функции.С
try_pop()
сложнее. Прежде всего, нам нужно захватить мьютекс для head
и удерживать его, пока мы не закончим работать с head
. По сути дела, этот мьютекс определяет, какой поток производит извлечение из очереди, поэтому захватить его надо в самом начале. После того как значение head
изменено (5), мьютекс можно освободить; в момент, когда возвращается результат (6), он уже не нужен. Остается разобраться с защитой доступа к tail
. Поскольку мы обращаемся к tail
только один раз, то можно захватить мьютекс на время, требуемое для чтения. Проще всего сделать это, поместив операцию доступа в отдельную функцию. На самом деле, поскольку участок кода, в котором мьютекс для head
должен быть заблокирован, является частью одной функции-члена, то будет правильнее завести отдельную функцию и для него тоже. Окончательный код приведён в листинге 6.6.
Листинг 6.6. Потокобезопасная очередь с мелкогранулярными блокировками
template
class threadsafe_queue {
private:
struct node {
std::shared_ptr data;
std::unique_ptr next;
};
std::mutex head_mutex;
std::unique_ptr head;
std::mutex tail_mutex;
node* tail;
node* get_tail() {
std::lock_guard tail_lock(tail_mutex);
return tail;
}
std::unique_ptr pop_head() {
std::lock_guard head_lock(head_mutex);
if (head.get() == get_tail()) {
return nullptr;
}
std::unique_ptr old_head = std::move(head);
head = std::move(old_head->next);
return old_head;
}
public:
threadsafe_queue():
head(new node), tail(head.get()) {}
threadsafe_queue(const threadsafe_queue& other) = delete;
threadsafe_queue& operator=(
const threadsafe_queue& other) = delete;
std::shared_ptr try_pop() {
std::unique_ptr old_head = pop_head();
return old_head ? old_head->data : std::shared_ptr();
}
void push(T new_value) {
std::shared_ptr new_data(
std::make_shared(std::move(new_value)));
std::unique_ptr p(new node);
node* const new_tail = p.get();
std::lock_guard tail_lock(tail_mutex);
tail->data = new_data;
tail->next = std::move(p);
tail = new_tail;
}
};
Давайте взглянем на этот код критически, памятуя о рекомендациях из раздела 6.1.1. Прежде чем искать, где нарушены инварианты, надо бы их точно сформулировать:
•
tail->next == nullptr
.•
tail->data == nullptr
.•
head == tail
означает, что список пуст.• Для списка с одним элементом
head->next==tail
.• Для каждого узла
x
списка, для которого x!=tail
, x->data
указывает на экземпляр T
, a x->next
— на следующий узел списка. Если x->next==tail
, то x
— последний узел списка.• Если проследовать по указателям
next
, начиная с головы списка, то рано или поздно мы достигнем его хвоста.Сама по себе, функция
push()
очень проста: все модификации данных защищены мьютексом tail_mutex
, и инвариант при этом сохраняется, потому что новый хвостовой узел пуст и правильно установлены указатели data
и next
для старого хвостового узла, который теперь стал настоящим последним узлом списка.Самое интересное происходит в функции
try_pop()
. Как выясняется, мьютекс tail_mutex
нужен не только для защиты чтения самого указателя tail
, но и чтобы предотвратить гонку при чтении данных из головного узла. Не будь этого мьютекса, могло бы получиться, что один поток вызывает try_pop()
, а другой одновременно вызывает push()
, и эти операции никак не упорядочиваются. Хотя каждая функция-член удерживает мьютекс, но это разные мьютексы, а функции могут обращаться к одним и тем же данным — ведь все данные появляются в очереди только благодаря push()
. Раз потоки потенциально могут обращаться к одним и тем же данным без какого бы то ни было упорядочения, то возможна гонка за данными и, как следствие (см. главу 5), неопределенное поведение. К счастью, блокировка мьютекса tail_mutex
в get_tail()
решает все проблемы. Поскольку внутри get_tail()
захватывается тот же мьютекс, что в push()
, то оба вызова оказываются упорядоченными. Либо обращение к функции get_tail(
) происходит раньше обращения к push()
— тогда get_tail()
увидит старое значение tail
— либо после обращения к push()
— и тогда она увидит новое значение tail
и новые данные, присоединенные к прежнему значениюtail
.Важно также, что обращение к
get_tail()
производится под защитой захваченного мьютекса head_mutex
. Если бы это было не так, то между вызовом get_tail()
и захватом head_mutex
мог бы вклиниться вызов pop_head()
, потому что другой поток вызвал try_pop()
(и, следовательно, pop_head()
) и захватил мьютекс первым, не давая первому потоку продолжить исполнение:│
Эта реализацияstd: :unique_ptr pop_head()←┘
некорректна{
(1) Старое значение tail│
получено не node* const old_tail = get_tail();←┘
под защитой head_mutex std::lock_guard head_lock(head_mutex);
if (head.get() == old_tail) ←
(2) return nullptr;
}
std::unique_ptr old_head = std::move(head);
head = std::move(old_head->next);←
(3) return old_head;
}
При такой — некорректной — реализации в случае, когда
get_tail()
(1) вызывается вне области действия блокировки, может оказаться, что и head
, и tail
изменились к моменту, когда первому потоку удалось захватить head_mutex
, и теперь возвращенный хвост мало того что больше не является хвостом, но и вообще не принадлежит списку. Тогда сравнение head
с old_tail
(2) не прошло бы, хотя head
в действительности является последним узлом. Следовательно, после обновления (3) узел head
мог бы оказаться в списке дальше tail
, то есть за концом списка, что полностью разрушило бы структуру данных. В корректной реализации, показанной в листинге 6.6, вызов get_tail()
производится под защитой head_mutex
. Это означает, что больше никакой поток не сможет изменить head
, a tail
будет только отодвигаться от начала списка (по мере добавления новых узлов с помощью push()
) — это вполне безопасно. Указатель head
никогда не сможет оказаться дальше значения, возвращенного get_tail()
, так что инварианты соблюдаются.После того как
pop_head()
удалит узел из очереди, обновив head
, мьютекс освобождается, и try_pop()
может извлечь данные и удалить узел, если таковой был (или вернуть NULL
-экземпляр класса std::shared_ptr<>
, если узла не было), твердо зная, что она работает в единственном потоке, который имеет доступ к этому узлу.Далее, внешний интерфейс является подмножеством интерфейса из листинга 6.2, поэтому ранее выполненный анализ остается в силе: в интерфейсе нет внутренне присущих состояний гонки.
Вопрос об исключениях более интересен. Поскольку мы изменили порядок выделения памяти, исключения могут возникать в других местах. Единственные операции в
try_pop()
, способные возбудить исключение, — это захваты мьютексов, но пока мьютексы не захвачены, данные не модифицируются. Поэтому try_pop()
безопасна относительно исключений. С другой стороны, push()
выделяет из кучи память для объектов T
и node
, и каждая такая операция может возбудить исключение. Однако оба вновь созданных объекта присваиваются интеллектуальным указателям, поэтому в случае исключения память корректно освобождается. После того как мьютекс захвачен, ни одна из последующих операций внутри push()
не может возбудить исключение, так что мы снова в безопасности.Поскольку мы не изменяли интерфейс, то новых внешних возможностей для взаимоблокировки не возникло. Внутренних возможностей также нет; единственное место, где захватываются два мьютекса, — это функция
pop_head()
, но она всегда захватывает сначала head_mutex
, а потом tail_mutex
, так что взаимоблокировки не случится.Осталось рассмотреть только один вопрос — в какой мере возможно распараллеливание. Эта структура данных предоставляет куда больше таких возможностей, чем приведенная в листинге 6.2, потому что гранулярность блокировок мельче, и больше работы выполняется не под защитой блокировок. Например, в
push()
память для нового узла и нового элемента данных выделяется, когда ни одна блокировка не удерживается. Это означает, что несколько потоков могут спокойно выделять новые узлы и элементы данных в одно и то же время. В каждый момент времени только один поток может добавлять новый узел в список, но выполняющий это действие код сводится к нескольким простым присваиваниям указателей, так что блокировка удерживается совсем недолго по сравнению с реализацией на основе std::queue<>
, где мьютекс остается захваченным в течение всего времени, пока выполняются операции выделения памяти внутри std::queue<>
.Кроме того,
try_pop()
удерживает tail_mutex
лишь на очень короткое время, необходимое для защиты чтения tail
. Следовательно, почти все действия внутри try_pop()
могут производиться одновременно с вызовом push()
. Объем операций, выполняемых под защитой мьютекса head_mutex
также совсем невелик; дорогостоящая операция delete
(в деструкторе указателя на узел) производится вне блокировки. Это увеличивает потенциальное число одновременных обращений к try_pop()
; в каждый момент времени только один поток может вызывать pop_head()
, зато несколько потоков могут удалять старые узлы и безопасно возвращать данные.Ожидание поступления элементаНу хорошо, код в листинге 6.6 дает в наше распоряжение потокобезопасную очередь с мелкогранулярными блокировками, но он поддерживает только функцию
try_pop()
(и к тому же всего в одном варианте). А как насчет таких удобных функций wait_and_pop()
, которые мы написали в листинге 6.2? Сможем ли мы реализовать идентичный интерфейс, сохранив мелкогранулярные блокировки?Ответ, разумеется, — да, только вот как это сделать? Модифицировать
push()
несложно: нужно лишь добавить вызов data_cond.notify_one()
в конец функции, как и было в листинге 6.2. Но на самом деле не всё так просто; мы же связались с мелкогранулярными блокировками для того, чтобы увеличить уровень параллелизма. Если оставить мьютекс захваченным на все время вызова notify_one()
(как в листинге 6.2), то поток, разбуженный до того, как мьютекс освобожден, должен будет ждать мьютекса. С другой стороны, если освободить мьютекс до обращения к notify_one(), то ожидающий поток сможет захватить его сразу, как проснётся (если, конечно, какой-то другой поток не успеет раньше). Это небольшое улучшение, но в некоторых случаях оно бывает полезно.Функция
wait_and_pop()
сложнее, потому что мы должны решить, где поместить ожидание, какой задать предикат и какой мьютекс захватить. Мы ждем условия «очередь не пуста», оно представляется выражением head != tail
. Если записать его в таком виде, то придется захватывать и head_mutex
, и tail_mutex
, но, разбирая код в листинге 6.6, мы уже поняли, что захватывать tail_mutex
нужно только для чтения tail
, а не для самого сравнения, та же логика применима и здесь. Если записать предикат в виде head != get_tail()
, то нужно будет захватить только head_mutex
и использовать уже полученную блокировку для защиты data_cond.wait()
. Прочий код такой же, как в try_pop()
.Второй перегруженный вариант
try_pop()
и соответствующий ему вариант wait_and_pop()
нуждаются в тщательном осмыслении. Если просто заменить возврат указателя std::shared_ptr<>
, полученного из old_head
, копирующим присваиванием параметру value
, то функция перестанет быть безопасной относительно исключений. В этот момент элемент данных уже удален из очереди и мьютекс освобожден, осталось только вернуть данные вызывающей программе. Однако, если копирующее присваивание возбудит исключение (а почему бы и нет?), то элемент данных будет потерян, потому что вернуть его в то же место очереди, где он был, уже невозможно.Если фактический тип
T
, которым конкретизируется шаблон, обладает не возбуждающими исключений оператором перемещающего присваивания или операцией обмена (swap
), то так поступить можно, но ведь мы ищем общее решение, применимое к любому типу T
. В таком случае следует поместить операции, способные возбудить исключения, в защищенную область перед тем, как удалять узел из списка. Это означает, что нам необходим еще один перегруженный вариант pop_head()
, который извлекает сохраненное значение до модификации списка.Напротив, модификация функции empty() тривиальна: нужно просто захватить
head_mutex
и выполнить проверку head == get_tail()
(см. листинг 6.10). Окончательный код очереди приведён в листингах 6.7, 6.8, 6.9 и 6.10.
Листинг 6.7. Потокобезопасная очередь с блокировкой и ожиданием: внутренние данные и интерфейс
template
class threadsafe_queue {
private:
struct node {
std::shared_ptr data;
std::unique_ptr next;
};
std::mutex head_mutex;
std::unique_ptr head;
std::mutex tail_mutex;
node* tail;
std::condition_variable data_cond;
public:
threadsafe_queue():
head(new node), tail(head.get()) {}
threadsafe_queue(const threadsafe_queue& other) = delete;
threadsafe_queue& operator=(
const threadsafe_queue& other) = delete;
std::shared_ptr try_pop();
bool try_pop(T& value);
std::shared_ptr wait_and_pop();
void wait_and_pop(T& value);
void push(T new_value);
void empty();
};
Код, помещающий новые узлы в очередь, прост — его реализация (показанная в листинге ниже) близка к той, что мы видели раньше.
Листинг 6.8. Потокобезопасная очередь с блокировкой и ожиданием: добавление новых значений
template
void threadsafe_queue::push(T new_value) {
std::shared_ptr new_data(
std::make_shared(std::move(new_value)));
std::unique_ptr p(new node);
{
std::lock_guard tail_lock(tail_mutex);
tail->data = new_data;
node* const new_tail = p.get();
tail->next = std::move(p);
tail = new_tail;
}
data_cond.notify_one();
}
Как уже отмечалось, вся сложность сосредоточена в части pop. В листинге ниже показана реализация функции-члена
wait_and_pop()
и относящихся к ней вспомогательных функций.
Листинг 6.9. Потокобезопасная очередь с блокировкой и ожиданием:
wait_and_pop
template
class threadsafe_queue {
private:
node* get_tail() {
std::lock_guard tail_lock(tail_mutex);
return tail;
}
std::unique_ptr pop_head() {←
(1) std::unique_ptr old_head = std::move(head);
head = std::move(old_head->next);
return old_head;
}
std::unique_lock wait_for_data() {←
(2) std::unique_lock head_lock(head_mutex);
data_cond.wait(
head_lock, [&]{return head.get() != get_tail();});
return std::move(head_lock); ←
(3) }
std::unique_ptr wait_pop_head() {
std::unique_lock head_lock(wait_for_data());←
(4) return pop_head();
}
std::unique_ptr wait_pop_head(T& value) {
std::unique_lock head_lock(wait_for_data());←
(5) value = std::move(*head->data);
return pop_head();
}
public:
std::shared_ptr wait_and_pop() {
std::unique_ptr const old_head = wait_pop_head();
return old_head->data;
}
void wait_and_pop(T& value) {
std::unique_ptr const old_head = wait_pop_head(value);
}
};
В реализации извлечения из очереди используется несколько небольших вспомогательных функций, которые упрощают код и позволяют устранить дублирование, например:
pop_head()
(1) (модификация списка в результате удаления головного элемента) и wait_for_data()
(2) (ожидание появления данных в очереди). Особенно стоит отметить функцию wait_for_data()
, потому что она не только ждет условную переменную, используя лямбда-функцию в качестве предиката, но и возвращает объект блокировки вызывающей программе (3). Тем самым мы гарантируем, что та же самая блокировка удерживается, пока данные модифицируются в соответствующем перегруженном варианте wait_pop_head()
(4), (5). Функция pop_head()
используется также в функции try_pop()
, показанной ниже.
Листинг 6.10. Потокобозопасная очередь с блокировкой и ожиданием:
try_pop()
и empty()
template
class threadsafe_queue {
private:
std::unique_ptr try_pop_head() {
std::lock_guard head_lock(head_mutex);
if (head.get() == get_tail()) {
return std::unique_ptr();
}
return pop_head();
}
std::unique_ptr try_pop_head(T& value) {
std::lock_guard head_lock(head_mutex);
if (head.get() == get_tail()) {
return std::unique_ptr();
}
value = std::move(*head->data);
return pop_head();
}
public:
std::shared_ptr try_pop() {
std::unique_ptr old_head = try_pop_head();
return old_head ? old_head->data : std::shared_ptr();
}
bool try_pop(T& value) {
std::unique_ptr const old_head = try_pop_head(value);
return old_head;
}
void empty() {
std::lock_guard head_lock(head_mutex);
return (head.get() == get_tail());
}
};
Эта реализация очереди ляжет в основу очереди без блокировок, которую мы будем рассматривать в главе 7. Данная очередь неограниченная, то есть в нее можно помещать и помещать данные, ничего не удаляя, пока не кончится память. Альтернативой является ограниченная очередь, максимальная длина которой задается при создании. Попытка поместить элемент в заполненную очередь либо завершается ошибкой, либо приводит к приостановке потока до тех пор, пока из очереди не будет удален хотя бы один элемент. Ограниченные очереди бывают полезны для равномерного распределения задач между потоками (см. главу 8). Такая дисциплина не дает потоку (или потокам), заполняющему очередь, намного обогнать потоки, читающие из очереди.
Показанную реализацию неограниченной очереди легко преобразовать в очередь с ограниченной длиной, введя ожидание условной переменной в функцию
push()
. Вместо того чтобы ждать, пока в очереди появятся элементы (как pop()
), мы должны будем ждать, когда число элементов в ней окажется меньше максимума. Дальнейшее обсуждение ограниченных очередей выходит за рамки этой книги, мы же перейдём от очередей к более сложным структурам данных.6.3. Проектирование более сложных структур данных с блокировками