Параллельное программирование на С++ в действии — страница 21 из 53

Стеки и очереди — простые структуры данных, их интерфейс крайне ограничен, и используются они в очень узких целях. Но не все структуры данных так просты, как правило они поддерживают более широкий набор операций. В принципе, это может открыть больше возможностей для распараллеливания, но и проблема защиты данных сильно усложняется, так как приходится учитывать много разных способов доступа. При проектировании структур данных, предназначенных для параллельного доступа, важно принимать во внимание характер выполняемых операций.

Чтобы понять, какие трудности возникают на этом пути, рассмотрим справочную таблицу (lookup table).

6.3.1. Разработка потокобезопасной справочной таблицы с блокировками

Справочная таблица, или словарь позволяет ассоциировать значения одного типа (типа ключа) со значениями того же или другого типа (ассоциированного типа). В общем случае задача такой структуры — запрашивать данные, ассоциированные с данным ключом. В стандартной библиотеке С++ для этой цели предназначены ассоциативные контейнеры:

std::map<>
,
std::multimap<>
,
std::unordered_map<>
, и
std::unordered_multimap<>
.

Справочная таблица используется иначе, чем стек или очередь. Если большинство операций со стеком и очередью каким-то образом модифицируют структуру данных, то есть либо добавляют, либо удаляют элементы, то справочная таблица изменяется сравнительно редко. Примером может служить простой DNS-кэш из листинга 3.13, предоставляющий интерфейс, сильно урезанный по сравнению с

std::map<>
. При рассмотрении стека и очереди мы видели, что интерфейсы стандартных контейнеров не годятся для случая, когда к структуре данных одновременно обращаются несколько потоков, так как им внутренне присущи состояния гонки. Поэтому интерфейсы приходится пересматривать и сокращать.

Самая серьезная с точки зрения распараллеливания проблема в интерфейсе контейнера

std::map<>
— его итераторы. Можно написать итератор так, что доступ к контейнеру с его помощью для чтения и модификации будет безопасен, но это амбициозная задача. Для корректной работы нужно учесть, что другой поток может удалить элемент, на который итератор сейчас ссылается, а это совсем непросто. Поэтому при первом подходе к проектированию интерфейса потокобезопасной справочной таблицы итераторы мы опустим. Памятуя о том, насколько сильно интерфейс
std::map<>
(и прочих стандартных ассоциативных контейнеров) зависит от итераторов, пожалуй, будет разумнее сразу отказаться от попыток смоделировать их и вместо этого придумать новый интерфейс с нуля.

Справочной таблице нужно всего несколько основных операций:

• добавить новую пару ключ/значение;

• изменить значение, ассоциированное с данным ключом;

• удалить ключ и ассоциированное с ним значение;

• получить значение, ассоциированное с данным ключом, если такой ключ существует.

Есть также несколько полезных операций, относящихся к контейнеру в целом, например: проверить, пуст ли контейнер, получить полный текущий список ключей или полное множество пар ключ/ значение.

Если придерживаться базовых рекомендаций, касающихся потокобезопасности, например, не возвращать ссылки и защищать мьютексом все тело каждой функции-члена, то все эти операции будут безопасны. Угроза возникновения гонки возникает прежде всего при добавлении новой пары ключ/значение; если два потока одновременно добавляют новое значение, то лишь один из них будет первым, а второй, следовательно, получит ошибку. Один из способов решить эту проблему состоит в том, чтобы объединить операции добавления и изменения в одной функции-члене, как мы проделали в DNS-кэше в листинге 3.13.

С точки зрения интерфейса, остается еще лишь один интересный момент: фраза «если такой ключ существует» в описании операции получения ассоциированного значения. Можно, например, предоставить пользователю возможность задать некий результат «по умолчанию», который будет возвращен, если указанного ключа нет.

mapped_type get_value(

 key_type const& key, mapped_type default_value);

В таком случае можно использовать сконструированный по умолчанию экземпляр типа

mapped_type
, если
default_value
не было задано явно. Эту идею можно развить и возвращать не просто экземпляр
mapped_type
, а объект типа
std::pair
, где
bool
указывает, было ли найдено значение. Другой вариант — использовать интеллектуальный указатель, ссылающийся на значение; если он равен
NULL
, то значение не было найдено.

Как уже отмечалось, после определения интерфейса (в предположении, что состояний гонки в нем нет), обеспечить потокобезопасность можно, заведя единственный мьютекс, который дет захватываться в начале каждой функции-члена для защиты внутренней структуры данных. Однако тем самым мы сведем на нет все возможности распараллеливания, которые могли бы открыться в результате наличия отдельных функций для чтения и модификации структуры данных. Отчасти исправить ситуацию можно было бы, воспользовавшись мьютексом, который разрешает нескольким потокам читать данные, но только одному — изменять их, например, классом

boost::shared_mutex
из листинга 3.13. Это, конечно, несколько улучшило бы общую картину, но при таком решении модифицировать структуру данных по-прежнему может только один поток. В идеале хотелось бы добиться большего.

Проектирование структуры данных для справочной таблицы с мелкогранулярными блокировками

Как и в случае очереди (см. раздел 6.2.3), чтобы ввести мелкогранулярные блокировки, нужно внимательно изучить особенности структуры данных, а не пытаться обернуть уже имеющийся контейнер, например

std::map<>
. Есть три общепринятых подхода к реализации ассоциативного контейнера, каковым является, в частности, справочная таблица:

• двоичное, например красно-черное, дерево;

• отсортированный массив;

• хеш-таблица.

Двоичное дерево плохо приспособлено для распараллеливания — каждый просмотр или модификация должны начинается с доступа к корневому узлу, который, следовательно, нужно блокировать. Блокировку, конечно, можно освобождать по мере спуска вниз по дереву, но в целом это немногим лучше блокирования всей структуры целиком.

Отсортированный массив еще хуже, потому что заранее невозможно сказать, в каком месте массива может оказаться требуемое значение, поэтому придется заводить одну блокировку на весь массив. Остается только хеш-таблица. В предположении, что число кластеров фиксировано, вопрос о том, в каком кластере находится ключ, является свойством только лишь самого ключа и хеш-функции. А это значит, что мы сможем завести по одной блокировке на каждый кластер. Если еще и использовать мьютекс, который поддерживает несколько читателей и одного писателя, то коэффициент распараллеливания увеличится в N раз, где N — число кластеров. Недостаток в том, что нужна хорошая хеш-функция для ключа. В стандартной библиотеке С++ имеется шаблон

std::hash<>
, которым можно воспользоваться для этой цели. Он уже специализирован для таких фундаментальных типов, как
int
, и некоторых библиотечных типов, например
std::string
, а пользователь может без труда написать специализации и для других типов ключа. Если, следуя примеру стандартных неупорядоченных контейнеров, указать в качестве параметра шаблона тип объекта-функции, которая выполняет хеширование, то пользователь сможет сам решить, специализировать ли ему шаблон
std::hash<>
для типа своего ключа или предоставить отдельную хеш-функцию.

Теперь обратимся собственно к коду. Как могла бы выглядеть реализация потокобезопасной справочной таблицы? Один из вариантов показан ниже.


Листинг 6.11. Потокобезопасная справочная таблица

template

         typename Hash = std::hash>

class threadsafe_lookup_table {

private:

 class bucket_type {

 private:

  typedef std::pair bucket_value;

  typedef std::list bucket_data;

  typedef typename bucket_data::iterator bucket_iterator;


  bucket_data data;

  mutable boost::shared_mutex mutex;←
(1)


  bucket_iterator find_entry_for(Key const& key) const {←
(2)

   return std::find_if(data.begin(), data.end(),

                       [&](bucket_value const& item)

   { return item.first == key; });

  }


 public:

  Value value_for(

   Key const& key, Value const& default_value) const {

   boost::shared_lock lock(mutex);←
(3)

   bucket_iterator const found_entry = find_entry_for(key);

   return (found_entry==data.end()) ?

           default_value : found_entry->second;

  }


  void add_or_update_mapping(

   Key const& key, Value const& value) {

   std::unique_lock lock(mutex);←
(4)

   bucket_iterator const found_entry = find_entry_for(key);

   if (found_entry == data.end()) {

    data.push_back(bucket_value(key, value));

   } else {

    found_entry->second = value;

   }

  }


  void remove_mapping(Key const& key) {

   std::unique_lock lock(mutex);←
(5)

   bucket_iterator const found_entry = find_entry_for(key);

   if (found_entry != data.end()) {

    data.erase(found_entry);

   }

  }

 };


 std::vector> buckets;←
(6)

 Hash hasher;


 bucket_type& get_bucket(Key const& key) const {←
(7)

  std::size_t const bucket_index = hasher(key)%buckets.size();

  return *buckets[bucket_index];

 }

public:

 typedef Key key_type;

 typedef Value mapped_type;

 typedef Hash hash_type;


 threadsafe_lookup_table(

  unsigned num_buckets = 19,

  Hash const& hasher_ = Hash()):

  buckets(num_buckets), hasher(hasher_) {

  for (unsigned i = 0; i < num_buckets; ++i) {

   buckets[i].reset(new bucket_type);

  }

 }


 threadsafe_lookup_table(

  threadsafe_lookup_table const& other) = delete;

 threadsafe_lookup_table& operator=(

  threadsafe_lookup_table const& other) = delete;


 Value value_for(Key const& key,

  Value const& default_value = Value()) const {

  return get_bucket(key).value_for(key, default_value);←
(8)

 }


 void add_or_update_mapping(Key const& key,Value const& value) {

  get_bucket(key).add_or_update_mapping(key, value);←
(9)

 }


 void remove_mapping(Key const& key) {

  get_bucket(key).remove_mapping(key);←
(10)

 }

};

В этой реализации для хранения кластеров используется вектор

std::vector>
(6), это позволяет задавать число кластеров в конструкторе. По умолчанию оно равно 19 (произвольно выбранное простое число); оптимальные показатели работы хеш-таблиц достигаются, когда имеется простое число кластеров. Каждый кластер защищен мьютексом типа
boost::shared_mutex
(1), который позволяет нескольким потокам одновременно читать, но только одному обращаться к любой из функций модификации данного кластера.

Поскольку количество кластеров фиксировано, функцию

get_bucket()
(7) можно вызывать вообще без блокировки (8), (9), (10), а затем захватить мьютекс кластера для совместного (только для чтения) (3) или монопольного (чтение/запись) (4), (5) владения — в зависимости от вызывающей функции.

Во всех трех функциях используется функция-член кластера

find_entry_for()
(2), которая определяет, есть ли в данном кластере указанный ключ. Каждый кластер содержит всего лишь список
std::list<>
пар ключ/значение, поэтому добавлять и удалять элементы легко.

Я уже рассматривал это решение с точки зрения распараллеливания, и мы убедились, что все надежно защищено мьютексами. Но как обстоит дело с безопасностью относительно исключений? Функция

value_for
ничего не изменяет, поэтому с ней проблем нет: если она и возбудит исключение, то на структуру данных это не повлияет.

Функция

remove_mapping
модифицирует список, обращаясь к его функции-члену
erase
, которая гарантированно не возбуждает исключений, так что здесь тоже всё безопасно. Остается функция
add_or_update_mapping
, которая может возбуждать исключения в обеих ветвях
if
. Функция
push_back
безопасна относительно исключений, то есть в случае исключения оставляет список в исходном состоянии, так что с этой ветвью всё хорошо. Единственная проблема — присваивание в случае замены существующего значения; если оператор присваивания возбуждает исключение, то мы полагаемся на то, что он оставляет исходный объект неизмененным. Однако это не влияет на структуру данных в целом и является свойством пользовательского типа, так что пускай пользователь и решает эту проблему.

В начале этого раздела я упоминал, что было бы хорошо, если бы можно было получить текущее состояние справочной таблицы, например, в виде объекта

std::map<>
. Чтобы копия состояния была согласована, потребуется заблокировать контейнер целиком, то есть все кластеры сразу. Поскольку для «обычных» операций захватывается мьютекс только одного кластера, то получение копии состояния будет единственной операцией, блокирующей все кластеры. При условии, что мьютексы всегда захватываются в одном и том же порядке, взаимоблокировка не возникнет. Такая реализация приведена в следующем листинге.


Листинг 6.12. Получение содержимого таблицы

threadsafe_lookup_table
в виде
std::map<>

std::map threadsafe_lookup_table::get_map() const {

 std::vector> locks;

 for (unsigned i = 0; i < buckets.size(); ++i) {

  locks.push_back(

   std::unique_lock(buckets[i].mutex));

 }


 std::map res;

 for (unsigned i = 0; i < buckets.size(); ++i) {

  for (bucket_iterator it = buckets[i].data.begin();

  it != buckets[i].data.end(); ++it) {

   res.insert(*it);

  }

 }

 return res;

}

Реализация справочной таблицы, показанная в листинге 6.11, увеличивает уровень параллелизма таблицы в целом за счет того, что каждый кластер блокируется отдельно, и при этом используется

boost::shared_mutex
, который позволяет нескольким потокам одновременно читать кластер. Но нельзя ли добиться большего уровня параллелизма, еще уменьшив гранулярность блокировок? В следующем разделе мы покажем, как это сделать, воспользовавшись потокобезопасным списковым контейнером с поддержкой итераторов.

6.3.2. Потокобезопасный список с блокировками

Список — одна из самых простых структур данных, и, наверное, написать его потокобезопасную версию будет несложно, правда? Все зависит от того, какая вам нужна функциональность и требуется ли поддержка итераторов — та самая, которую я побоялся включать в справочную таблицу на том основании, что это слишком сложно. Основная идея итератора в духе STL состоит в том, что итератор содержит своего рода ссылку на элемент внутренней структуры данных, образующей контейнер. Если контейнер модифицируется из другого потока, то ссылка должна оставаться действительной, а это значит, что итератор должен удерживать блокировку на какую-то часть структуры. Учитывая, что контейнер никак не контролирует время жизни STL-итератора, такой подход абсолютно непродуктивен.

Альтернатива — включить функции итерирования, например

for_each
, в сам контейнер. Тогда контейнер сможет полностью управлять своим обходом и блокировкой, но зато перестаёт отвечать рекомендациям но предотвращению взаимоблокировок из главы 3. Чтобы функция
for_each
делала что-то полезное, она должна вызывать предоставленный пользователем код, удерживая блокировку. Хуже того, она должна передавать ссылку на каждый элемент контейнера пользовательскому коду, чтобы тот мог к этому элементу обратиться. Можно было бы вместо ссылки передавать копию элемента, но это обойдется слишком дорого, если размер элементов велик.

Таким образом, мы оставим на совести пользователя заботу о предотвращении взаимоблокировок, предупредив его, что в пользовательских функциях не следует ни захватывать блокировки, ни сохранять ссылки, которые могли бы использоваться для доступа к данным вне защиты блокировок и тем самым привести к гонке. В случае внутреннего списка, используемого в реализации справочной таблицы, это абсолютно безопасно, поскольку мы не собираемся делать ничего противозаконного.

Остается решить, какие операции должен поддерживать список. Вернувшись к листингам 6.11 и 6.12, вы увидите, что именно нам требуется:

• добавлять элемент в список;

• удалять элемент из списка, если он удовлетворяет некоторому условию;

• искать в списке элемент, удовлетворяющий некоторому условию;

• изменить элемент, удовлетворяющий некоторому условию;

• скопировать все элементы списка в другой контейнер.

Чтобы получился приличный списковый контейнер общего назначения, полезно было бы добавить еще кое-какие операции, например вставку в указанную позицию, но для нашей справочной таблицы это излишне, поэтому оставляю реализацию в качестве упражнения для читателя.

Основная идея связанного списка с мелкогранулярными блокировками — завести по одному мьютексу на каждый узел. Если список длинный, то получится целая куча мьютексов! Достоинство заключается в том, что операции над отдельными частями списка полностью распараллеливаются: каждая операция удерживает только блокировки на узлы, которыми манипулирует, и освобождает блокировку при переходе к следующему узлу. В листинге ниже приведена реализация такого списка.


Листинг 6.13. Потокобезопасный список с поддержкой итераторов

template

class threadsafe_list {

 struct node { ←
(1)

  std::mutex m;

  std::shared_ptr data;

  std::unique_ptr next;


  node() : ←
(2)

   next() {}


 node(T const& value) : ←
(3)

  data(std::make_shared(value)) {}

 };


 node head;


public:

 threadsafe_list() {}

 ~threadsafe_list() {

  remove_if([](node const&){return true;});

 }


 threadsafe_list(threadsafe_list const& other) = delete;

 threadsafe_list& operator=(

  threadsafe_list const& other) = delete;


 void push_front(T const& value) {

  std::unique_ptr new_node(new node(value));←
(4)

  std::lock_guard lk(head.m);

  new_node->next = std::move(head.next);          ←
(5)

  head.next = std::move(new_node);                ←
(6)

 }


 template

 void for_each(Function f) {                     ←
(7)

  node* current = &head;

  std::unique_lock lk(head.m);       ←
(8)

  while(node* const next = current->next.get()) {←
(9)

   std::unique_lock next_lk(next->m);←
(10)

   lk.unlock();            ←
(11)

   f(*next->data);         ←
(12)

   current = next;

   lk = std::move(next_lk);←
(13)

  }

 }


 template

 std::shared_ptr find_first_if(Predicate p) {←
(14)

  node* current = &head;

  std::unique_lock lk(head.m);

  while (node* const next=current->next.get()) {

   std::unique_lock next_lk(next->m);

   lk.unlock();

   if (p(*next->data)) {←
(15)

    return next->data;  ←
(16)

   }

   current = next;

   lk = std::move(next_lk);

  }

  return std::shared_ptr();

 }


 template
(17)

 void remove_if(Predicate p) {

  node* current = &head;

  std::unique_lock lk(head.m);

  while(node* const next = current->next.get()) {

   std::unique_lock next_lk(next->m);

   if (p(*next->data)) {                  ←
(18)

    std::unique_ptr old_next = std::move(current->next);

    current->next = std::move(next->next);←
(19)

    next_lk.unlock();

   }            ←
(20)

   else {

    lk.unlock();←
(21)

    current = next;

    lk = std::move(next_lk);

   }

  }

 }

};

Показанный в листинге 6.13 шаблон

threadsafe_list<>
— это реализация односвязного списка, в котором каждый элемент является структурой типа
node
(1). В роли головы
head
списка выступает сконструированный по умолчанию объект
node
, в котором указатель
next
равен
NULL
(2). Новые узлы добавляются в список функцией
push_front()
; сначала новый узел конструируется (4), при этом для хранимых в нем данных выделяется память из кучи (3), но указатель
next
остается равным
NULL
. Затем мы должны захватить мьютекс для узла
head
, чтобы получить нужное значение указателя next (5), после чего вставить узел в начало списка, записав в
head.next
указатель на новый узел (6). Пока всё хорошо: для добавления элемента в список необходимо захватить только один мьютекс, поэтому никакого риска взаимоблокировки нет. Кроме того, медленная операция выделения памяти происходит вне блокировки, так что блокировка защищает только обновление двух указателей — действия, которые не могут привести к ошибке. Переходим к функциям итерирования.

Для начала рассмотрим функцию

for_each()
(7). Она принимает объект
Function
, который применяется к каждому элементу списка; следуя примеру большинства библиотечных алгоритмов, этот объект передаётся по значению и может быть как настоящей функцией, так и объектом класса, в котором определена оператор вызова. В данном случае функция должна принимать в качестве единственного параметра значение типа
T
. Здесь мы производим передачу блокировки. Сначала захватывается мьютекс в головном узле
head
(8). Теперь можно безопасно получить указатель на следующий узел
next
(с помощью
get()
, потому что мы не принимаем на себя владение указателем). Если этот указатель не равен
NULL
(9), то захватываем мьютекс в соответствующем узле (10), чтобы обработать данные. Получив блокировку для этого узла, мы можем освободить блокировку для предыдущего узла (11) и вызвать указанную функцию (12). По выходе из этой функции мы можем обновить указатель
current
на только что обработанный узел и с помощью
move
передать владение блокировкой от
next_lk
в
lk
(13). Поскольку
for_each
передаёт каждый элемент данных напрямую пользовательской функции
Function
, мы можем обновить данные, скопировать их в другой контейнер и вообще сделать всё, что угодно. Если функция не делает того, чего нельзя, то это безопасно, потому что на всем протяжении вызова удерживается мьютекс узла, содержащего элемент данных.

Функция

find_first_if()
(14) аналогична
for_each()
; существенное отличие заключается в том, что переданный предикат
Predicate
должен вернуть
true
, если нужный элемент найден, и
false
в противном случае (15). Если элемент найден, то мы сразу возвращаем хранящиеся в нем данные (16), прерывая поиск. Можно было бы добиться того же результата с помощью
for_each()
, но тогда мы продолжили бы просмотр списка до конца, хотя после обнаружения искомого элемента в этом уже нет необходимости.

Функция

remove_if()
(17) несколько отличается, потому что она должна изменить список;
for_each()
для этой цели непригодна. Если предикат
Predicate
возвращает
true
(18), то мы удаляем узел из списка, изменяя значение
current->next
(19). Покончив с этим, мы можем освободить удерживаемый мьютекс следующего узла. Узел удаляется, когда объект
std::unique_ptr
, в который мы его переместили, покидает область видимости (20). В данном случае мы не изменяем
current
, потому что необходимо проверить следующий узел
next
. Если
Predicate
возвращает
false
, то нужно просто продолжить обход списка, как и раньше (21).

А могут ли при таком обилии мьютексов возникнуть взаимоблокировки или состояния гонки? Ответ — твердое нет, при условии, что полученные от пользователя предикаты и функции ведут себя, как положено. Итерирование всегда производится в одном направлении, начиная с узла

head
, и следующий мьютекс неизменно блокируется до освобождения текущего, поэтому не может случиться так, что в разных потоках порядок захвата мьютексов будет различен. Единственный потенциальный кандидат на возникновение гонки — удаление исключенного из списка узла в функции
remove_if()
(20), потому что это делается после освобождения мьютекса (уничтожение захваченного мьютекса приводит к неопределённому поведению). Однако, немного поразмыслив, мы придём к выводу, что это безопасно, так как в этот момент все еще удерживается мьютекс предыдущего узла (
current
), поэтому ни один другой поток не сможет попытаться захватить мьютекс удаляемого узла.

Что можно сказать по поводу распараллеливания? Вся эта возня с мелкогранулярными блокировками затевалась для того, чтобы увеличить уровень параллелизма по сравнению с единственным мьютексом. Так достигли мы своей цели или нет? Да, безусловно — теперь разные потоки могут одновременно работать с разными узлами списка, выполняя разные функции:

for_each()
для обработки каждого узла,
find_first_if()
для поиска или
remove_if()
для удаления элементов. Но, поскольку мьютексы узлов захватываются по порядку, потоки не могут обгонять друг друга. Если какой-то поток тратит много времени на обработку конкретного узла, то, дойдя до этого узла, остальные потоки вынуждены будут ждать.

6.4. Резюме