Spring in Action Covers Spring 5-1--11 — страница 43 из 63

public Order receiveOrder() {

    Message message = rabbit.receive("tacocloud.order.queue", 30000);

    return message != null ? (Order) converter.fromMessage(message) : null;

}

Если вы похожи на меня, то, увидев такой захардкоженный элемент, вы почувствуете дискомфорт. Вы можете подумать, что было бы неплохо создать аннотированный класс @ConfigurationProperties, чтобы можно было настроить это время ожидания с помощью свойства конфигурации Spring Boot. Я бы согласился с вами, если бы не тот факт, что Spring Boot уже предлагает такое свойство конфигурации. Если вы хотите установить время ожидания с помощью конфигурации, просто удалите значение времени ожидания в вызове receive() и установите его в своей конфигурации с помощью свойства spring.rabbitmq.template.receive-timeout:

spring:

    rabbitmq:

        template:

            receive-timeout: 30000

Вернемся к методу receiveOrder(), обратите внимание, что вам пришлось использовать конвертер сообщений из RabbitTemplate для преобразования входящего объекта Message в объект Order. Но если RabbitTemplate использует конвертер сообщений, почему он не может выполнить преобразование для вас? Именно для этого и предназначен метод receiveAndConvert(). Используя receiveAndConvert(), вы можете переписать receiveOrder() следующим образом:

public Order receiveOrder() {

    return (Order) rabbit.receiveAndConvert("tacocloud.order.queue");

}

Это намного проще, не так ли? Единственная тревожная вещь, которую я вижу, это приведение Object к Order. Однако есть альтернатива. Вместо этого вы можете передать ParameterizedTypeReference в receiveAndConvert() для непосредственного получения объекта Order:

public Order receiveOrder() {

    return rabbit.receiveAndConvert("tacocloud.order.queue", new ParameterizedTypeReference() {});

}

Спорный вопрос, чем это лучше, но это более типобезопасный подход. Единственное требование для использования ParameterizedTypeReference с receiveAndConvert() состоит в том, что конвертер сообщений должен быть реализацией SmartMessageConverter; Jackson2JsonMessageConverter - единственная готовая реализация на выбор.

Модель pull, предлагаемая JmsTemplate, подходит для многих случаев использования, но часто лучше иметь код, который прослушивает сообщения и который вызывается при поступлении сообщений. Давайте посмотрим, как вы можете создавать управляемые сообщениями bean-компоненты, которые отвечают на сообщения RabbitMQ.

ОБРАБОТКА СООБЩЕНИЙ RABBITMQ СО СЛУШАТЕЛЯМИ

Для управляемых сообщениями компонентов RabbitMQ Spring предлагает RabbitListener, аналог RabbitMQ для JmsListener. Чтобы указать, что метод должен вызываться при поступлении сообщения в очередь RabbitMQ, аннотируйте метод компонента с помощью @RabbitTemplate.

Например, в следующем листинге показана реализация OrderReceiver в RabbitMQ, аннотированная для прослушивания сообщений заказа, а не для их запроса с помощью RabbitTemplate.

Листинг 8.7 объявление метода в качестве листенера сообщений RabbitMQ

package tacos.kitchen.messaging.rabbit.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Component;

@Component

public class OrderListener {

    private KitchenUI ui;

    @Autowired

    public OrderListener(KitchenUI ui) {

        this.ui = ui;

    }

    @RabbitListener(queues = "tacocloud.order.queue")

    public void receiveOrder(Order order) {

ui.displayOrder(order);

    }

}

Вы без сомнения заметите, что это очень похоже на код из листинга 8.4. Действительно, единственное, что изменилось, это аннотация слушателя - вместо @JmsListener используется @RabbitListener. Каким бы замечательным ни был @RabbitListener, это почти дублирование кода оставляет мне мало что сказать о @RabbitListener, чего я еще не говорил о @JmsListener. Они оба отлично подходят для написания кода, который отвечает на сообщения, отправляемые им соответствующими брокерами - брокером JMS для @JmsListener и брокером RabbitMQ для @RabbitListener.

Хотя вы можете чувствовать отсутствие энтузиазма в отношении @RabbitListener в предыдущем абзаце, будьте уверены, что я не специально. По правде говоря, тот факт, что @RabbitListener работает так же, как @JmsListener, на самом деле очень интересен! Это означает, что вам не нужно изучать совершенно другую модель программирования при работе с RabbitMQ против Artemis или ActiveMQ. То же самое справедливо для сходства между RabbitTemplate и JmsTemplate.

Завершая эту главу, рассмотрим еще один вариант обмена сообщениями, поддерживаемый Spring: Apache Kafka.

8.3 Обмен сообщениями с Kafka

Apache Kafka - это новейший вариант обмена сообщениями, который мы рассмотрим в этой главе. На первый взгляд, Kafka - это брокер сообщений, такой же, как ActiveMQ, Artemis или Rabbit. Но у Кафки есть несколько уникальных козырей в рукавах.

Kafka предназначена для работы в кластере, обеспечивая отличную масштабируемость.  И разделяя свои темы по всем экземплярам в кластере добивается особой устойчивости. Принимая во внимание, что RabbitMQ имеет дело главным образом с очередями на слушателях, Kafka использует темы только для обмена сообщениями в pub/sub - сообщениях.

Темы Кафки распространяются на всех брокеров в кластере. Каждый узел в кластере выступает в качестве лидера для одной или нескольких тем, отвечая за данные этой темы и реплицируя их на другие узлы в кластере.

Далее, каждая тема может быть разделена на несколько разделов. В этом случае каждый узел в кластере является лидером для одного или нескольких разделов темы, но не для всей темы. Ответственность за тему распределена по всем узлам. Рисунок 8.2 иллюстрирует, как это работает.

Рис. 8.2. Кластер Kafka состоит из нескольких брокеров, каждый из которых выступает в роли лидера по разделам тем.

Kafka обладает уникальной архитектурой, и я призываю вас прочитать больше об этом в Kafka in Action Дилана Скотта (Manning, 2017). Для наших целей мы сосредоточимся на том, как отправлять сообщения и получать их от Kafka с помощью Spring.

8.3.1 Настройка Spring для обмена сообщениями Kafka

Чтобы начать использовать Kafka для обмена сообщениями, вам нужно добавить соответствующие зависимости в вашу сборку. Однако, в отличие от параметров JMS и RabbitMQ, для Kafka нет стартового Spring Boot. Но не бойтесь; вам понадобится только одна зависимость:

org.springframework.kafka

spring-kafka

Эта зависимость вносит все, что нужно для Кафки в проект. Более того, её присутствие инициирует автоконфигурацию Spring Boot для Kafka, которая, помимо прочего, организует KafkaTemplate в контексте приложения Spring. Все, что вам нужно сделать, это заинжектить KafkaTemplate и приступить к отправке и получению сообщений.

Однако прежде чем приступить к отправке и получению сообщений, вы должны знать о некоторых свойствах, которые пригодятся при работе с Kafka. В частности, KafkaTemplate по умолчанию работает с брокером Kafka на локальном хосте, прослушивая порт 9092. Это нормально, если брокер Kafka запускается локально при разработке приложения, но когда придет время перейти к продакшену, вам потребуется настроить другой хост и порт.

Свойство spring.kafka.bootstrap-servers задает расположение одного или нескольких серверов Kafka, используемых для установления начального соединения с кластером Kafka. Например, если один из серверов Kafka в кластере работает kafka.tacocloud.com и прослушивая порт 9092, вы можете настроить его местоположение в YAML следующим образом:

spring:

    kafka:

        bootstrap-servers:

            - kafka.tacocloud.com:9092

Но обратите внимание spring.kafka.bootstrap-servers - является множественным числом и принимает список. Таким образом, вы можете предоставить ему несколько серверов Kafka в кластере:

spring:

    kafka:

        bootstrap-servers:

            - kafka.tacocloud.com:9092

            - kafka.tacocloud.com:9093

            - kafka.tacocloud.com:9094

С настройкой Kafka в вашем проекте вы готовы отправлять и получать сообщения. Начнем с отправки объектов Order в Kafka с помощью KafkaTemplate.

8.3.2 Отправка сообщений с помощью KafkaTemplate

Во многих отношениях KafkaTemplate похож на своих аналогов в JMS и RabbitMQ. В то же время, это совсем другое. Это становится очевидным, когда мы рассмотрим методы отправки сообщений:

ListenableFuture> send(String topic, V data);

ListenableFuture> send(String topic, K key, V data);

ListenableFuture> send(String topic, Integer partition, K key, V data);

ListenableFuture> send(String topic, Integer partition, Long timestamp, K key, V data);

ListenableFuture> send(ProducerRecord record);

ListenableFuture> send(Message message);

ListenableFuture> sendDefault(V data);

ListenableFuture> sendDefault(K key, V data);

ListenableFuture> sendDefault(Integer partition, K key, V data);

ListenableFuture> sendDefault(Integer partition, Long timestamp, K key, V data);

Первое, что вы могли заметить, это то, что нет методов convertAndSend(). Это потому, что KafkaTemplate написана с помощью дженериков и может работать с типами доменов непосредственно при отправке сообщений. В некотором смысле, все методы send() выполняют функцию convertAndSend ().