Вы также могли заметить, что есть несколько параметров send() и sendDefaul (), которые сильно отличаются от тех, которые вы использовали с JMS и Rabbit. При отправке сообщений в Kafka вы можете указать следующие параметры, которые будут определять способ отправки сообщения:
-topic - тема для отправки сообщения (требуется для send())
-partition - раздел для записи topic (необязательно)
-key - ключ для отправки на запись (необязательно)
-timestamp - время (необязательно; по умолчанию используется System.currentTimeMillis())
-полезные данные (полезная нагрузка) (обязательно)
Тема и полезные данные являются двумя наиболее важными параметрами. Разделы и ключи мало влияют на то, как вы используете KafkaTemplate, кроме дополнительной информации, предоставляемой в качестве параметров для send() и sendDefault(). Для наших целей мы собираемся сосредоточиться на отправке полезной нагрузки сообщения в заданную тему и не беспокоиться о разделах и ключах.
Для метода send() вы также можете выбрать отправку ProducerRecord, который является немногим больше, чем тип, который содержит все предыдущие параметры в одном объекте. Вы также можете отправить объект Message, но для этого потребуется преобразовать ваши доменные объекты в Message. Как правило, проще использовать один из других методов, чем создавать и отправлять объект ProducerRecord или Message.
Используя KafkaTemplate и его метод send(), вы можете написать реализацию OrderMessagingService на основе Kafka. Следующий листинг показывает, как может выглядеть такая реализация.
Листинг 8.8. Отправка заказов с помощью KafkaTemplate
package tacos.messaging;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaOrderMessagingService implements OrderMessagingService {
private KafkaTemplate
@Autowired
public KafkaOrderMessagingService(
KafkaTemplate
this.kafkaTemplate = kafkaTemplate;
}
@Override
public void sendOrder(Order order) {
kafkaTemplate.send("tacocloud.orders.topic", order);
}
}
В этой новой реализации OrderMessagingService метод sendOrder() использует метод send() внедренного KafkaTemplate для отправки Order в тему с именем tacocloud.orders.topic. За исключением слова «Кафка», разбросанного по коду, оно не сильно отличается от кода, который вы написали для JMS и Rabbit.
Если задать тему по умолчанию, можно немного упростить метод sendOrder(). Во-первых, установить тему по умолчанию в tacocloud.orders.topic, установив свойство spring.kafka.template.default-topic:
spring:
kafka:
template:
default-topic: tacocloud.orders.topic
Затем в методе sendOrder() вы можете вызвать sendDefault() вместо send() и не указывать имя темы:
@Override
public void sendOrder(Order order) {
kafkaTemplate.sendDefault(order);
}
Теперь, когда ваш код для отправки сообщений был написан, давайте обратим наше внимание на написание кода, который будет получать эти сообщения от Kafka.
8.3.3 Написание Kafka листинеров
Помимо уникальных сигнатур методов для send() и sendDefault(), KafkaTemplate отличается от JmsTemplate и RabbitTemplate тем, что не предлагает никаких методов для получения сообщений. Это означает, что единственный способ использовать сообщения из темы Kafka с помощью Spring - написать листинер сообщений.
Для Kafka листинеры сообщений определяются как методы, аннотированные @KafkaListener. Аннотация @KafkaListener примерно аналогична @JmsListener и @RabbitListener и используется практически одинаково. В следующем листинге показано, как может выглядеть получатель заказа на основе слушателя, если он написан для Kafka.
Листинг 8.9. Получение заказов с помощью @KafkaListener
package tacos.kitchen.messaging.kafka.listener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import tacos.Order;
import tacos.kitchen.KitchenUI;
@Component
public class OrderListener {
private KitchenUI ui;
@Autowired
public OrderListener(KitchenUI ui) {
this.ui = ui;
}
@KafkaListener(topics="tacocloud.orders.topic")
public void handle(Order order) {
ui.displayOrder(order);
}
}
Метод handle() имеет аннотацию @KafkaListener, чтобы указать, что его следует вызывать при поступлении сообщения в теме с именем tacocloud.orders.topic. Как написано в листинге 8.9, для handle() предоставляется только Order (полезная нагрузка). Но если вам нужны дополнительные метаданные из сообщения, он также может принять объект ConsumerRecord или Message.
Например, следующая реализация handle() принимает ConsumerRecord, чтобы вы могли регистрировать раздел и метку времени сообщения:
@KafkaListener(topics="tacocloud.orders.topic")
public void handle(Order order, ConsumerRecord
log.info("Received from partition {} with timestamp {}",
record.partition(), record.timestamp());
ui.displayOrder(order);
}
Точно так же вы можете запросить Message вместо ConsumerRecord и достичь того же:
@KafkaListener(topics="tacocloud.orders.topic")
public void handle(Order order, Message
MessageHeaders headers = message.getHeaders();
log.info("Received from partition {} with timestamp {}",
headers.get(KafkaHeaders.RECEIVED_PARTITION_ID)
headers.get(KafkaHeaders.RECEIVED_TIMESTAMP));
ui.displayOrder(order);
}
Стоит отметить, что полезная нагрузка сообщения также доступна через ConsumerRecord.value() или Message.getPayload(). Это означает, что вы можете запросить Order через эти объекты вместо того, чтобы запрашивать его напрямую как параметр handle().
ИТОГ
-Асинхронный обмен сообщениями обеспечивает уровень перенаправления между взаимодействующими приложениями, что обеспечивает более слабую связь и большую масштабируемость.
-Spring поддерживает асинхронный обмен сообщениями с JMS, RabbitMQ или Apache Kafka.
-Приложения могут использовать основанные на шаблонах клиенты (JmsTemplate, RabbitTemplate или KafkaTemplate) для отправки сообщений через брокер сообщений.
-Приложения могут использовать сообщения в модели на основе запросов, используя те же клиенты на основе шаблонов.
-Сообщения также можно передавать потребителям, применяя аннотации листенеров сообщений(@JmsListener, @RabbitListener или @KafkaListener) к методам bean.
Spring in Action Covers Spring 5.0 перевод на русский. Глава 9
9. Integrating Spring
Эта глава охватывает
-Обработка данных в режиме реального времени
-Определение интеграционных потоков
-Использование определения JAVA DSL Spring Integration
-Интеграция с электронной почтой, файловыми системами и другими внешними системами
Одна из самых неприятных вещей, с которыми я сталкиваюсь во время путешествия - это длительный перелет и плохое или несуществующее интернет-соединение в полете. Мне нравится использовать свое свободное время, чтобы выполнить некоторую работу, включая написание страниц этой книги. Если нет сетевого подключения, я в невыгодном положении, если мне нужно получить библиотеку или найти Java Doc, и я не могу выполнить большую часть работы. Я научился брать с собой книгу для чтения в таких случаям.
Точно так же, как нам необходимо подключиться к Интернету для продуктивной работы, многие приложения должны подключаться к внешним системам для выполнения своей работы. Приложению может потребоваться читать или отправлять электронные письма, взаимодействовать с внешним API или использовать данные, содержащиеся в базе данных. И поскольку данные поступают из этих внешних систем или записываются в эти внешние системы, приложению может потребоваться каким-то образом обрабатывать данные, чтобы перевести их в собственный домен приложения или из него.
В этой главе вы узнаете, как использовать общие шаблоны интеграции Spring Integration. Spring Integration - это готовая к использованию реализация многих шаблонов интеграции, которые каталогизированы в Enterprise Integration Patterns by Gregor Hohpe and Bobby Woolf (Addison-Wesley, 2003). Каждый шаблон реализован как компонент, через который сообщения передают данные в конвейер. Используя конфигурацию Spring, вы можете собрать эти компоненты в конвейер, по которому проходят потоки данных. Давайте начнем с определения простого потока интеграции, который познакомит вас со многие функциями и характеристиками работы с Spring Integration.
9.1 Объявление простого потока интеграции
Вообще говоря, Spring Integration позволяет создавать интеграционные потоки, через которые приложение может получать или отправлять данные на некоторый ресурс, внешний по отношению к самому приложению. Одним из таких ресурсов, с которым приложение может интегрироваться, является файловая система. Поэтому среди многих компонентов Spring Integration есть канальные адаптеры для чтения и записи файлов.