C++17 STL Стандартная библиотека шаблонов — страница 105 из 119

< items; ++i) {

    this_thread::sleep_for(100ms);

    {

      lock_guard lk {mut};

      q.push(i);

    }

    cv.notify_all();

  }


4. После создания всех элементов снова блокируем мьютекс, поскольку нужно задать значение для бита

finished
. Затем опять вызываем метод
cv.notify_all()
:


  {

    lock_guard lk {mut};

    finished = true;

  }

  cv.notify_all();

}


5. Теперь можем реализовать функцию потребителя. Она не принимает никаких аргументов, поскольку будет слепо перебирать все элементы до тех пор, пока очередь не опустеет. В цикле, который станет выполняться до тех пор, пока не установится значение переменной

finished
, функция будет блокировать мьютекс, который защищает очередь и флаг
finished
. В момент получения блокировки последняя вызовет функцию
cv.wait
, передав ей блокировку и лямбда-выражение в качестве аргументов. Лямбда-выражение — это предикат, указывающий, жив ли еще поток-производитель и есть ли значения в очереди.


static void consumer() {

  while (!finished) {

    unique_lock l {mut};

    cv.wait(l, [] { return !q.empty() || finished; });


6. Вызов

cv.wait
разблокирует блокировку и ждет до тех пор, пока условие, описанное предикатом, не перестанет выполняться. Затем функция снова блокирует мьютекс и перебирает все из очереди до тех пор, пока та не опустеет. Если производитель все еще жив, то она снова проитерирует по циклу. В противном случае функция завершит работу, поскольку установлен флаг
finished
— таким способом производитель указывает, что новых элементов больше не будет.


    while (!q.empty()) {

    cout << "Got " << q.front()

<< " from queue.\n"; q.pop();

    }

  }

}


7. В функции

main
запускаем поток-производитель, который создает десять элементов, а также поток-потребитель. Затем ждем их выполнения и завершаем программу.


int main()

{

  thread t1 {producer, 10};

  thread t2 {consumer};

  t1.join();

  t2.join();

  cout << "finished!\n";

}


8. Компиляция и запуск программы дадут следующий результат. При выполнении программы можно увидеть, что между появлением каждой строки проходит какое-то время (100 миллисекунд), поскольку создание элементов занимает время:


$ ./producer_consumer

Got 0 from queue.

Got 1 from queue.

Got 2 from queue.

Got 3 from queue.

Got 4 from queue.

Got 5 from queue.

Got 6 from queue.

Got 7 from queue.

Got 8 from queue.

Got 9 from queue.

finished!


Как это работает

В данном примере мы просто запустили два потока. Первый создает элементы и помещает их в очередь. Второй извлекает их из очереди. При взаимодействии с очередью один из этих потоков блокирует общий мьютекс

mut
, доступный им обоим. Таким образом, можно быть уверенными, что оба потока не будут взаимодействовать с состоянием очереди в одно и то же время.

Помимо очереди и мьютекса мы объявили четыре переменные, которые были включены во взаимодействие производителя/потребителя:


queue      q;

mutex              mut;

condition_variable cv;

bool               finished {false};


Переменную

finished
объяснить очень просто. Ее значение было установлено в
true
, когда производитель создал ограниченное количество элементов. Когда потребитель видит, что значение этой переменной равно
true
, он использует последние элементы и завершает работу. Но для чего нужна переменная
condition_variable cv
? Мы использовали
cv
в двух разных контекстах. Один из контекстов ждал наступления конкретного условия, а второй — указывал на выполнение этого условия.

Сторона-потребитель, ждущая выполнения конкретного условия, выглядит так. Поток-потребитель проходит в цикле по блоку, который изначально блокирует мьютекс

mut
с помощью
unique_lock
. Затем вызывает
cv.wait
:


while (!finished) {

  unique_lock l {mut};


cv.wait(l, [] { return !q.empty() || finished; });


  while (!q.empty()) {

    // consume

  }

}


Данный код чем-то похож на следующий эквивалентный код. Вскоре мы рассмотрим, почему эти фрагменты не похожи друг на друга:


while (!finished) {

  unique_lock l {mut};


  while (q.empty() && !finished) {

    l.unlock();

    l.lock();

  }


  while (!q.empty()) {

    // consume

  }

}


Это значит, что сначала мы получили блокировку, а затем проверили, с каким сценарием сейчас работаем.

1. Есть ли элементы, которые можно использовать? В таком случае сохраняем блокировку, потребляем эти элементы, возвращаем блокировку и начинаем сначала.

2. В противном случае, если элементов для потребления нет, но производитель все еще жив, то возвращаем мьютекс с целью дать ему шанс добавить новые элементы в очередь. Далее снова пытаемся получить блокировку в надежде, что ситуация изменится и мы перейдем к ситуации 1.


Реальная причина, по которой строка

cv.wait
не эквивалентна конструкции
while (q.empty() && ...)
, заключается в том, что мы не можем просто пройти по циклу
l.unlock(); l.lock();
. Отсутствие активности потока-производителя в течение какого-то промежутка времени приводит к постоянным блокировкам/ разблокировкам мьютекса, что не имеет смысла, поскольку мы впустую тратим циклы процессора.

Выражение наподобие

cv.wait(lock, predicate)
будет ждать до тех пор, пока вызов
predicate()
не вернет значение
true
. Но это не делается путем постоянной блокировки/разблокировки. Чтобы возобновить поток, который блокируется вызовом
wait
объекта
condition_variable
, другой поток должен вызывать методы
notify_one()
или
notify_all()
для одного объекта. Только тогда ожидающий поток будет возобновлен и проверит условие
predicate()
. (Последнее действительно и для нескольких потоков.)

Положительный момент таков: после вызова

wait
, проверившего предикат так, словно поступил внезапный сигнал к пробуждению, поток будет снова мгновенно приостановлен. Это значит, что мы не навредим рабочему потоку программы (но, возможно, пострадает производительность), если добавим в код слишком много вызовов
notify
.

На стороне производителя мы просто вызвали

cv.notify_all()
после того, как производитель добавил новый элемент в очередь, а также вслед за тем, как создал последний элемент и установил значение флага
finished
, равное
true
. Этого было достаточно для того, чтобы направить потребителя. 

Реализуем идиому «несколько производителей/потребителей» с помощью std::condition_variable