Параллельное программирование на С++ в действии — страница 41 из 53

& msg) {

  if (q){        │
Отправка сообщения сводится

   q->push(msg);←┘
к помещению его в очередь

  }

 }

};

}

Получение сообщений несколько сложнее. Мы не только должны дождаться появления сообщения в очереди, но еще и проверить, совпадает ли его тип с одним из известных нам типов, и вызвать соответствующий обработчик. Эта процедура начинается в классе

receiver
, показанном в листинге ниже.


Листинг С.3. Класс

receiver

namespace messaging {

class receiver {

 queue q; ←
receiver владеет очередью


public:              │
Разрешить неявное преобразование в объект

 operator sender() {←┘
sender, ссылающийся на эту очередь

  return sender(&q);

 }


При обращении к функции ожидания

 dispatcher wait() {←┘
очереди создается диспетчер

  return dispatcher(&q);

 }

};

}

Если

sender
только ссылается на очередь сообщений, то
receiver
ей владеет. Мы можем получить объект
sender
, ссылающийся на очередь, воспользовавшись неявным преобразованием. Процедура диспетчеризации сообщения начинается с обращения к функции
wait()
. При этом создается объект
dispatcher
, ссылающийся на очередь, которой владеет
receiver
. Класс
dispatcher
показан в следующем листинге; как видите, содержательная работа производится в его деструкторе. В данном случае работа состоит в ожидании сообщения и его диспетчеризации.


Листинг С.4. Класс

dispatcher

namespace messaging {

class close_queue {}; ←
Сообщение о закрытии очереди


class dispatcher {

 queue* q;                             │
Экземпляры

 bool chained;                         │
диспетчера нельзя

копировать

 dispatcher(dispatcher const&)=delete;←┘

 dispatcher& operator=(dispatcher const&)=delete;

 template<

  typename Dispatcher,│
Разрешить экземплярам

  typename Msg,       │
TemplateDispatcher доступ

  typename Func>←┘
к закрытым частям класса

 friend class TemplateDispatcher;

 void wait_and_dispatch()

 {          
(1) В цикле ждем и диспетчеризуем

  for (;;) {←┘
сообщения

   auto msg = q->wait_and_pop();

   dispatch(msg);

  }

 }              
(2) dispatch() смотрит, не пришло ли

сообщение close_queue, и, если

 bool dispatch (←┘
да, возбуждает исключение

  std::shared_ptr const& msg) {

  if (dynamic_cast*>(msg.get())) {

   throw close_queue();

  }

  return false;

 }


public:                          │
Экземпляры диспетчера

 dispatcher(dispatcher&& other):←┘
можно перемещать

  q(other.q), chained(other.chained) {│
Объект-источник не должен

  other.chained = true;              ←┘
ждать сообщений

 }


 explicit dispatcher(queue* q_): q(q_), chained(false) {}


 template

 TemplateDispatcher

 handle(Func&& f)←┐
Сообщения конкретного типа

 {               
(3) обрабатывает TemplateDispatcher

  return TemplateDispatcher(

   q, this, std::forward(f));

 }


 ~dispatcher() noexcept(false)←┐
Деструктор может

 {                            
(4) возбудить исключение

  if (!chained) {

   wait_and_dispatch();

  }

 }

};

}

Экземпляр

dispatcher
, возвращенный функцией
wait()
, немедленно уничтожается, так как является временным объектом, и, как уже было сказало, вся работа выполняется в его деструкторе. Деструктор вызывает функцию
wait_and_dispatch()
, которая в цикле (1) ожидает сообщения и передает его функции
dispatch()
. Сама функция
dispatch()
(2) проста, как правда: она проверяет, не получено ли сообщение типа
close_queue
, и, если так, то возбуждает исключение; в противном случае возвращает
false
, извещая, что сообщение не обработало. Именно из-за исключения
close_queue
деструктор и помечен как
noexcept(false)
(4); без этой аннотации действовала бы подразумеваемая спецификация исключений для деструктора —
noexcept(true)
, означающая, что исключения не допускаются, и тогда исключение
close_queue
привело бы к завершению программы.

Но просто вызывать функцию

wait()
особого смысла не имеет — как правило, нам нужно обработать полученное сообщение. Для этого предназначена функция-член
handle()
(3). Это шаблон, и тип сообщения нельзя вывести, поэтому необходимо явно указать, сообщение какого типа обрабатывается, и передать функцию (или допускающий вызов объект) для его обработки. Сама функция
handle()
передает очередь, текущий объект
dispatcher
и функцию-обработчик новому экземпляру шаблонного класса
TemplateDispatcher
, который обрабатывает сообщения указанного типа. Код этого класса показан в листинге С.5. Именно поэтому мы проверяем флаг
chained
в деструкторе перед тем, как приступить к ожиданию сообщения; он не только предотвращает ожидание объектами, содержимое которых перемещено, но и позволяет передать ответственность за ожидание новому экземпляру
TemplateDispatcher
.


Листинг С.5. Шаблон класса

TemplateDispatcher

namespace messaging {

template<

 typename PreviousDispatcher, typename Msg, typename Func>

class TemplateDispatcher {

 queue* q;

 PreviousDispatcher* prev;

 Func f;

 bool chained;


 TemplateDispatcher(TemplateDispatcher const&) = delete;

 TemplateDispatcher& operator=(

  TemplateDispatcher const&) = delete;


 template<

  typename Dispatcher, typename OtherMsg, typename OtherFunc>

 friend class TemplateDispatcher;←┐
Все конкретизации

 void wait_and_dispatch()         │
TemplateDispatcher

 {                                │
дружат между собой

  for (;;) {

   auto msg = q-