Splitter-ы полезны во многих случаях, но есть два основных варианта использования, для которых вы можете использовать сплиттер:
-Полезная нагрузка сообщения содержит коллекцию элементов того же типа, которые вы хотите обработать как отдельные полезные нагрузки сообщений. Например, сообщение, содержащее список продуктов, может быть разделено на несколько сообщений с полезной нагрузкой по одному продукту в каждом
-Полезная нагрузка сообщения содержит информацию, которая, хотя и связана, может быть разделена на два или более сообщений разных типов. Например, заказ на покупку может содержать информацию о поставках, выставлении счетов и номенклатуре. Сведения о доставке могут обрабатываться одним подпотоком, выставление счетов-другим, а номенклатура - еще одним. В этом случае за разделителем обычно следует маршрутизатор, который направляет сообщения по типу полезной нагрузки, чтобы гарантировать, что данные обрабатываются правильным подпотоком.
При разделении полезной нагрузки сообщения на два или более сообщений разных типов обычно достаточно определить POJO, который извлекает отдельные части входящей полезной нагрузки и возвращает их как элементы коллекции.
Например, предположим, что вы хотите разделить сообщение, содержащее заказ на покупку, на два сообщения: одно содержит платежную информацию, а другое-список элементов списка. Следующий OrderSplitter сделает работу:
public class OrderSplitter {
public Collection
ArrayList
parts.add(po.getBillingInfo());
parts.add(po.getLineItems());
return parts;
}
}
Затем вы можете объявить bean OrderSplitter как часть потока интеграции, аннотируя его с помощью @Splitter следующим образом:
@Bean
@Splitter(inputChannel="poChannel", outputChannel="splitOrderChannel")
public OrderSplitter orderSplitter() {
return new OrderSplitter();
}
Здесь заказы на поставку поступают на канал с именем poChannel и делятся на OrderSplitter. Затем каждый элемент в возвращенной коллекции публикуется как отдельное сообщение в потоке интеграции в канал с именем splitOrderChannel. На этом этапе потока вы можете объявить PayloadTypeRouter для маршрутизации платежной информации и позиций в их собственный подпоток:
@Bean
@Router(inputChannel="splitOrderChannel")
public MessageRouter splitOrderRouter() {
PayloadTypeRouter router = new PayloadTypeRouter();
router.setChannelMapping(
BillingInfo.class.getName(), "billingInfoChannel");
router.setChannelMapping(
List.class.getName(), "lineItemsChannel");
return router;
}
Как следует из его названия, PayloadTypeRouter направляет сообщения на разные каналы на основе их типа полезной нагрузки. Здесь сконфигурировано так что, сообщения, чья полезная нагрузка имеет тип BillingInfo, направляются в канал с именем billingInfoChannel для дальнейшей обработки. Что касается позиций, они находятся в коллекции java.util.List; следовательно, вы сопоставили полезные нагрузки типа List для направления на канал с именем lineItemsChannel.
В настоящее время поток разделяется на два подпотока: один, через который проходят объекты BillingInfo, и другой, через который проходит List
@Splitter(inputChannel="lineItemsChannel", outputChannel="lineItemChannel")
public List
return lineItems;
}
Когда сообщение, несущее полезную нагрузку List
Если вы предпочитаете использовать Java DSL для объявления конфигурации сплиттера/маршрутизатора, вы можете сделать это с помощью вызовов функций split() и route():
return IntegrationFlows
…
.split(orderSplitter())
.
p -> {
if (p.getClass().isAssignableFrom(BillingInfo.class)) {
return "BILLING_INFO";
} else {
return "LINE_ITEMS";
}
}, mapping -> mapping
.subFlowMapping("BILLING_INFO", sf -> sf
.
…
}))
.subFlowMapping("LINE_ITEMS", sf -> sf
.split()
.
…
}))
)
.get();
Форма определения потока в DSL определенно более краткая, но и более трудная для понимания. Для разделения порядка используется тот же OrderSplitter, что и в примере конфигурации Java. После разделения заказ разбивается по типу на два отдельных подпотока.
9.2.6 Активаторы службы
Активаторы службы получают сообщения из входного канала и отправляют эти сообщения в реализацию MessageHandler, как показано на рисунке 9.7.
Рис. 9.7. Активаторы службы вызывают некоторую службу посредством MessageHandler при получении сообщения.
Spring Integration предлагает несколько реализаций MessageHandler «из коробки» (даже PayloadTypeRouter - это реализация MessageHandler), но вам часто потребуется предоставить некоторую пользовательскую реализацию, которая будет действовать как активатор службы. Например, в следующем коде показано, как объявить bean MessageHandler, настроенный как активатор службы:
@Bean
@ServiceActivator(inputChannel="someChannel")
public MessageHandler sysoutHandler() {
return message -> {
System.out.println("Message payload: " + message.getPayload());
};
}
Bean аннотируется @ServiceActivator, чтобы обозначить его как активатор службы, который обрабатывает сообщения из канала с именем someChannel. Что касается MessageHandler, он реализован через лямбду. Несмотря на то, что это простой MessageHandler, при получении сообщения он передает свою полезную нагрузку в стандартный поток вывода.
Кроме того, можно объявить активатор службы, который обрабатывает данные во входящем сообщении перед возвратом новой полезной нагрузки. В этом случае компонент должен быть GenericHandler, а не MessageHandler:
@Bean
@ServiceActivator(inputChannel="orderChannel", outputChannel="completeOrder")
public GenericHandler
return (payload, headers) -> {
return orderRepo.save(payload);
};
}
В этом случае активатор службы является GenericHandler, который ожидает сообщения с полезной нагрузкой типа Order. Когда заказ поступает, он сохраняется через репозиторий; полученный сохраненный заказ возвращается для отправки в выходной канал, имя которого completeChannel.
Вы могли заметить, что GenericHandler предоставляет не только полезную нагрузку, но и заголовки сообщений (даже если пример не использует эти заголовки в любом случае). Вы также можете использовать активаторы служб в стиле конфигурации JAVA DSL, передавая MessageHandler или GenericHandler для метода handle () в определении потока:
public IntegrationFlow someFlow() {
return IntegrationFlows
…
.handle(msg -> {
System.out.println("Message payload: " + msg.getPayload());
})
.get();
}
В этом случае MessageHandler задается как лямбда, но вы также можете предоставить его как ссылку на метод или даже как экземпляр класса, который реализует интерфейс MessageHandler. Если вы даете ему ссылку на лямбду или метод, имейте в виду, что он принимает сообщение в качестве параметра.
Аналогично, handle() может быть написан для принятия GenericHandler, если активатор службы не предназначен для завершения потока. Применяя ранее описанный активатор службы сохранения заказов, вы можете настроить поток с помощью Java DSL следующим образом:
public IntegrationFlow orderFlow(OrderRepository orderRepo) {
return IntegrationFlows
…
.
return orderRepo.save(payload);
})
…
.get();
}
При работе с GenericHandler, ссылка на лямбду или метод принимает полезную нагрузку и заголовки сообщений в качестве параметров. Кроме того, если вы решите использовать GenericHandler в конце потока, вам нужно будет вернуть null, иначе вы получите ошибки, указывающие, что нет указанного выходного канала.
9.2.7 Шлюзы
Шлюзы-это средства, с помощью которых приложение может передавать данные в поток интеграции и, при необходимости, получать ответ, который является результатом потока. Реализованные с помощью Spring Integration шлюзы реализуются как интерфейсы, которые приложение может вызывать для отправки сообщений в поток интеграции (рис.9.8).
Рисунок 9.8 Сервисные шлюзы - это интерфейсы, через которые приложение может отправлять сообщения в интеграционный поток.
Вы уже видели пример шлюза сообщений со шлюзом FileWriterGateway. FileWriterGateway был односторонним шлюзом с методом, принимающим String для записи в файл, возвращая void. Примерно так же легко написать двусторонний шлюз. При написании интерфейса шлюза убедитесь, что метод возвращает некоторое значение для публикации в потоке интеграции.