< items; ++i) {
this_thread::sleep_for(100ms);
{
lock_guard lk {mut};
q.push(i);
}
cv.notify_all();
}
4. После создания всех элементов снова блокируем мьютекс, поскольку нужно задать значение для бита
finished
. Затем опять вызываем метод cv.notify_all()
:
{
lock_guard lk {mut};
finished = true;
}
cv.notify_all();
}
5. Теперь можем реализовать функцию потребителя. Она не принимает никаких аргументов, поскольку будет слепо перебирать все элементы до тех пор, пока очередь не опустеет. В цикле, который станет выполняться до тех пор, пока не установится значение переменной
finished
, функция будет блокировать мьютекс, который защищает очередь и флаг finished
. В момент получения блокировки последняя вызовет функцию cv.wait
, передав ей блокировку и лямбда-выражение в качестве аргументов. Лямбда-выражение — это предикат, указывающий, жив ли еще поток-производитель и есть ли значения в очереди.
static void consumer() {
while (!finished) {
unique_lock l {mut};
cv.wait(l, [] { return !q.empty() || finished; });
6. Вызов
cv.wait
разблокирует блокировку и ждет до тех пор, пока условие, описанное предикатом, не перестанет выполняться. Затем функция снова блокирует мьютекс и перебирает все из очереди до тех пор, пока та не опустеет. Если производитель все еще жив, то она снова проитерирует по циклу. В противном случае функция завершит работу, поскольку установлен флаг finished
— таким способом производитель указывает, что новых элементов больше не будет.
while (!q.empty()) {
cout << "Got " << q.front()
<< " from queue.\n"; q.pop();
}
}
}
7. В функции
main
запускаем поток-производитель, который создает десять элементов, а также поток-потребитель. Затем ждем их выполнения и завершаем программу.
int main()
{
thread t1 {producer, 10};
thread t2 {consumer};
t1.join();
t2.join();
cout << "finished!\n";
}
8. Компиляция и запуск программы дадут следующий результат. При выполнении программы можно увидеть, что между появлением каждой строки проходит какое-то время (100 миллисекунд), поскольку создание элементов занимает время:
$ ./producer_consumer
Got 0 from queue.
Got 1 from queue.
Got 2 from queue.
Got 3 from queue.
Got 4 from queue.
Got 5 from queue.
Got 6 from queue.
Got 7 from queue.
Got 8 from queue.
Got 9 from queue.
finished!
Как это работает
В данном примере мы просто запустили два потока. Первый создает элементы и помещает их в очередь. Второй извлекает их из очереди. При взаимодействии с очередью один из этих потоков блокирует общий мьютекс
mut
, доступный им обоим. Таким образом, можно быть уверенными, что оба потока не будут взаимодействовать с состоянием очереди в одно и то же время.Помимо очереди и мьютекса мы объявили четыре переменные, которые были включены во взаимодействие производителя/потребителя:
queue q;
mutex mut;
condition_variable cv;
bool finished {false};
Переменную
finished
объяснить очень просто. Ее значение было установлено в true
, когда производитель создал ограниченное количество элементов. Когда потребитель видит, что значение этой переменной равно true
, он использует последние элементы и завершает работу. Но для чего нужна переменная condition_variable cv
? Мы использовали cv
в двух разных контекстах. Один из контекстов ждал наступления конкретного условия, а второй — указывал на выполнение этого условия.Сторона-потребитель, ждущая выполнения конкретного условия, выглядит так. Поток-потребитель проходит в цикле по блоку, который изначально блокирует мьютекс
mut
с помощью unique_lock
. Затем вызывает cv.wait
:
while (!finished) {
unique_lock l {mut};
cv.wait(l, [] { return !q.empty() || finished; });
while (!q.empty()) {
// consume
}
}
Данный код чем-то похож на следующий эквивалентный код. Вскоре мы рассмотрим, почему эти фрагменты не похожи друг на друга:
while (!finished) {
unique_lock l {mut};
while (q.empty() && !finished) {
l.unlock();
l.lock();
}
while (!q.empty()) {
// consume
}
}
Это значит, что сначала мы получили блокировку, а затем проверили, с каким сценарием сейчас работаем.
1. Есть ли элементы, которые можно использовать? В таком случае сохраняем блокировку, потребляем эти элементы, возвращаем блокировку и начинаем сначала.
2. В противном случае, если элементов для потребления нет, но производитель все еще жив, то возвращаем мьютекс с целью дать ему шанс добавить новые элементы в очередь. Далее снова пытаемся получить блокировку в надежде, что ситуация изменится и мы перейдем к ситуации 1.
Реальная причина, по которой строка
cv.wait
не эквивалентна конструкции while (q.empty() && ...)
, заключается в том, что мы не можем просто пройти по циклу l.unlock(); l.lock();
. Отсутствие активности потока-производителя в течение какого-то промежутка времени приводит к постоянным блокировкам/ разблокировкам мьютекса, что не имеет смысла, поскольку мы впустую тратим циклы процессора.Выражение наподобие
cv.wait(lock, predicate)
будет ждать до тех пор, пока вызов predicate()
не вернет значение true
. Но это не делается путем постоянной блокировки/разблокировки. Чтобы возобновить поток, который блокируется вызовом wait
объекта condition_variable
, другой поток должен вызывать методы notify_one()
или notify_all()
для одного объекта. Только тогда ожидающий поток будет возобновлен и проверит условие predicate()
. (Последнее действительно и для нескольких потоков.)Положительный момент таков: после вызова
wait
, проверившего предикат так, словно поступил внезапный сигнал к пробуждению, поток будет снова мгновенно приостановлен. Это значит, что мы не навредим рабочему потоку программы (но, возможно, пострадает производительность), если добавим в код слишком много вызовов notify
.На стороне производителя мы просто вызвали
cv.notify_all()
после того, как производитель добавил новый элемент в очередь, а также вслед за тем, как создал последний элемент и установил значение флага finished
, равное true
. Этого было достаточно для того, чтобы направить потребителя. Реализуем идиому «несколько производителей/потребителей» с помощью std::condition_variable